Spaces:
Running
on
Zero
Running
on
Zero
import json | |
import xgboost as xgb | |
import pandas as pd | |
import numpy as np | |
import torch | |
import zlib | |
from transformers import AutoTokenizer, AutoModelForCausalLM | |
from scipy.stats import skew, kurtosis, entropy | |
from tqdm import tqdm | |
from torch.nn import CrossEntropyLoss | |
from pathlib import Path | |
import spaces | |
import os | |
class Diversity: | |
def __init__(self, model, tokenizer, device): | |
self.tokenizer = tokenizer | |
self.model = model | |
self.device = device | |
def compute_log_likelihoods(self, text): | |
tokens = self.tokenizer.encode(text, return_tensors="pt", truncation=True, max_length=1024).to(self.device) | |
with torch.no_grad(): | |
outputs = self.model(tokens, labels=tokens) | |
logits = outputs.logits | |
shift_logits = logits[:, :-1, :].squeeze(0) | |
shift_labels = tokens[:, 1:].squeeze(0) | |
log_probs = torch.log_softmax(shift_logits.float(), dim=-1) | |
token_log_likelihoods = log_probs[range(shift_labels.shape[0]), shift_labels].cpu().numpy() | |
return token_log_likelihoods | |
def compute_surprisal(self, text): | |
log_likelihoods = self.compute_log_likelihoods(text) | |
return -log_likelihoods | |
def compute_features(self, text): | |
surprisals = self.compute_surprisal(text) | |
log_likelihoods = self.compute_log_likelihoods(text) | |
if len(surprisals) < 10 or len(log_likelihoods) < 3: | |
return None | |
s = np.array(surprisals) | |
mean_s, std_s, var_s, skew_s, kurt_s = np.mean(s), np.std(s), np.var(s), skew(s), kurtosis(s) | |
diff_s = np.diff(s) | |
mean_diff, std_diff = np.mean(diff_s), np.std(diff_s) | |
first_order_diff = np.diff(log_likelihoods) | |
second_order_diff = np.diff(first_order_diff) | |
var_2nd = np.var(second_order_diff) | |
entropy_2nd = entropy(np.histogram(second_order_diff, bins=20, density=True)[0]) | |
autocorr_2nd = np.corrcoef(second_order_diff[:-1], second_order_diff[1:])[0, 1] if len(second_order_diff) > 1 else 0 | |
comp_ratio = len(zlib.compress(text.encode('utf-8'))) / len(text.encode('utf-8')) | |
return [mean_s, std_s, var_s, skew_s, kurt_s, mean_diff, std_diff, var_2nd, entropy_2nd, autocorr_2nd, comp_ratio] | |
class BiScope: | |
def __init__(self, model, tokenizer, device): | |
self.COMPLETION_PROMPT_ONLY = "Complete the following text: " | |
self.tokenizer = tokenizer | |
self.model = model | |
self.device = device | |
def compute_fce_loss(self, logits, targets, text_slice): | |
return CrossEntropyLoss(reduction='none')( | |
logits[0, text_slice.start-1:text_slice.stop-1, :], | |
targets | |
).detach().cpu().numpy() | |
def compute_bce_loss(self, logits, targets, text_slice): | |
return CrossEntropyLoss(reduction='none')( | |
logits[0, text_slice, :], | |
targets | |
).detach().cpu().numpy() | |
def detect_single_sample(self, sample): | |
prompt_ids = self.tokenizer(self.COMPLETION_PROMPT_ONLY, return_tensors='pt').input_ids.to(self.device) | |
text_ids = self.tokenizer(sample, return_tensors='pt', max_length=2000, truncation=True).input_ids.to(self.device) | |
combined_ids = torch.cat([prompt_ids, text_ids], dim=1) | |
text_slice = slice(prompt_ids.shape[1], combined_ids.shape[1]) | |
outputs = self.model(input_ids=combined_ids) | |
logits = outputs.logits | |
targets = combined_ids[0][text_slice] | |
fce_loss = self.compute_fce_loss(logits, targets, text_slice) | |
bce_loss = self.compute_bce_loss(logits, targets, text_slice) | |
features = [] | |
for p in range(1, 10): | |
split = len(fce_loss) * p // 10 | |
fce_clipped = np.nan_to_num(np.clip(fce_loss[split:], -1e6, 1e6), nan=0.0, posinf=1e6, neginf=-1e6) | |
bce_clipped = np.nan_to_num(np.clip(bce_loss[split:], -1e6, 1e6), nan=0.0, posinf=1e6, neginf=-1e6) | |
features.extend([ | |
np.mean(fce_clipped), np.max(fce_clipped), np.min(fce_clipped), np.std(fce_clipped), | |
np.mean(bce_clipped), np.max(bce_clipped), np.min(bce_clipped), np.std(bce_clipped) | |
]) | |
return features | |
class Software: | |
def __init__(self, div_model, div_tokenizer, bi_model, bi_tokenizer, device_div="cuda", device_bi="cuda"): | |
self.div_model = div_model | |
self.div_tokenizer = div_tokenizer | |
self.bi_model = bi_model | |
self.bi_tokenizer = bi_tokenizer | |
self.device_div = device_div | |
self.device_bi = device_bi | |
self.model_path = Path(__file__).parent / "model.json" | |
self.model = xgb.XGBClassifier() | |
self.model.load_model(self.model_path) | |
def evaluate(self, text): | |
diveye = Diversity(self.div_model, self.div_tokenizer, self.device_div) | |
biscope = BiScope(self.bi_model, self.bi_tokenizer, self.device_bi) | |
diveye_features = diveye.compute_features(text) | |
biscope_features = biscope.detect_single_sample(text) | |
for f in biscope_features: | |
diveye_features.append(f) | |
return self.model.predict_proba([diveye_features])[:, 1][0].item() |