Spaces:
Running
on
Zero
Running
on
Zero
import json | |
import xgboost as xgb | |
import pandas as pd | |
import numpy as np | |
import torch | |
import zlib | |
from transformers import AutoTokenizer, AutoModelForCausalLM | |
from scipy.stats import skew, kurtosis, entropy | |
from tqdm import tqdm | |
from torch.nn import CrossEntropyLoss | |
from pathlib import Path | |
import spaces | |
import os | |
class Diversity: | |
def __init__(self, model, tokenizer, device): | |
self.tokenizer = tokenizer | |
self.model = model.to(device) | |
self.device = device | |
def compute_log_likelihoods(self, text): | |
tokens = self.tokenizer.encode(text, return_tensors="pt", truncation=True, max_length=1024).to(self.device) | |
with torch.no_grad(): | |
outputs = self.model(tokens, labels=tokens) | |
logits = outputs.logits | |
shift_logits = logits[:, :-1, :].squeeze(0) | |
shift_labels = tokens[:, 1:].squeeze(0) | |
log_probs = torch.log_softmax(shift_logits.float(), dim=-1) | |
token_log_likelihoods = log_probs[range(shift_labels.shape[0]), shift_labels].cpu().numpy() | |
return token_log_likelihoods | |
def compute_surprisal(self, text): | |
log_likelihoods = self.compute_log_likelihoods(text) | |
return -log_likelihoods | |
def compute_features(self, text): | |
surprisals = self.compute_surprisal(text) | |
log_likelihoods = self.compute_log_likelihoods(text) | |
if len(surprisals) < 10 or len(log_likelihoods) < 3: | |
return None | |
s = np.array(surprisals) | |
mean_s, std_s, var_s, skew_s, kurt_s = np.mean(s), np.std(s), np.var(s), skew(s), kurtosis(s) | |
diff_s = np.diff(s) | |
mean_diff, std_diff = np.mean(diff_s), np.std(diff_s) | |
first_order_diff = np.diff(log_likelihoods) | |
second_order_diff = np.diff(first_order_diff) | |
var_2nd = np.var(second_order_diff) | |
entropy_2nd = entropy(np.histogram(second_order_diff, bins=20, density=True)[0]) | |
autocorr_2nd = np.corrcoef(second_order_diff[:-1], second_order_diff[1:])[0, 1] if len(second_order_diff) > 1 else 0 | |
comp_ratio = len(zlib.compress(text.encode('utf-8'))) / len(text.encode('utf-8')) | |
return [mean_s, std_s, var_s, skew_s, kurt_s, mean_diff, std_diff, var_2nd, entropy_2nd, autocorr_2nd, comp_ratio] | |
class BiScope: | |
def __init__(self, model, tokenizer, device): | |
self.COMPLETION_PROMPT_ONLY = "Complete the following text: " | |
self.tokenizer = tokenizer | |
self.model = model.to(device) | |
self.device = device | |
def compute_fce_loss(self, logits, targets, text_slice): | |
return CrossEntropyLoss(reduction='none')( | |
logits[0, text_slice.start-1:text_slice.stop-1, :], | |
targets | |
).detach().cpu().numpy() | |
def compute_bce_loss(self, logits, targets, text_slice): | |
return CrossEntropyLoss(reduction='none')( | |
logits[0, text_slice, :], | |
targets | |
).detach().cpu().numpy() | |
def detect_single_sample(self, sample): | |
prompt_ids = self.tokenizer(self.COMPLETION_PROMPT_ONLY, return_tensors='pt').input_ids.to(self.device) | |
text_ids = self.tokenizer(sample, return_tensors='pt', max_length=2000, truncation=True).input_ids.to(self.device) | |
combined_ids = torch.cat([prompt_ids, text_ids], dim=1) | |
text_slice = slice(prompt_ids.shape[1], combined_ids.shape[1]) | |
outputs = self.model(input_ids=combined_ids) | |
logits = outputs.logits | |
targets = combined_ids[0][text_slice] | |
fce_loss = self.compute_fce_loss(logits, targets, text_slice) | |
bce_loss = self.compute_bce_loss(logits, targets, text_slice) | |
features = [] | |
for p in range(1, 10): | |
split = len(fce_loss) * p // 10 | |
fce_clipped = np.nan_to_num(np.clip(fce_loss[split:], -1e6, 1e6), nan=0.0, posinf=1e6, neginf=-1e6) | |
bce_clipped = np.nan_to_num(np.clip(bce_loss[split:], -1e6, 1e6), nan=0.0, posinf=1e6, neginf=-1e6) | |
features.extend([ | |
np.mean(fce_clipped), np.max(fce_clipped), np.min(fce_clipped), np.std(fce_clipped), | |
np.mean(bce_clipped), np.max(bce_clipped), np.min(bce_clipped), np.std(bce_clipped) | |
]) | |
return features | |
class Software: | |
def __init__(self): | |
self.token = os.getenv("HF_TOKEN") | |
self.device_div = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") | |
self.device_bi = self.device_div | |
self.div_model = None | |
self.div_tokenizer = None | |
self.bi_model = None | |
self.bi_tokenizer = None | |
self.model_path = Path(__file__).parent / "model.json" | |
self.model = xgb.XGBClassifier() | |
self.model.load_model(self.model_path) | |
def _load_div_models(self): | |
if self.div_model is None or self.div_tokenizer is None: | |
self.div_tokenizer = AutoTokenizer.from_pretrained("tiiuae/falcon-7b", use_fast=False, trust_remote_code=True, use_auth_token=self.token) | |
self.div_model = AutoModelForCausalLM.from_pretrained( | |
"tiiuae/falcon-7b", | |
device_map="cuda", | |
torch_dtype=torch.float16, | |
trust_remote_code=True, | |
use_auth_token=self.token | |
) | |
self.div_model.to(self.device_div) | |
def _load_bi_models(self): | |
if self.bi_model is None or self.bi_tokenizer is None: | |
self.bi_tokenizer = AutoTokenizer.from_pretrained("google/gemma-1.1-2b-it", use_fast=False, trust_remote_code=True, use_auth_token=self.token) | |
self.bi_model = AutoModelForCausalLM.from_pretrained( | |
"google/gemma-1.1-2b-it", | |
device_map="cuda", | |
torch_dtype=torch.float16, | |
trust_remote_code=True, | |
use_auth_token=self.token | |
) | |
self.bi_model.to(self.device_bi) | |
def load_data(self, jsonl_path): | |
ids, texts = [], [] | |
with open(jsonl_path, 'r') as f: | |
for line in f: | |
obj = json.loads(line) | |
ids.append(obj["id"]) | |
texts.append(obj["text"]) | |
return ids, texts | |
def evaluate(self, text): | |
self._load_div_models() | |
self._load_bi_models() | |
# Load models to GPUs. | |
device_div = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") | |
if torch.cuda.device_count() > 1: | |
device_bi = torch.device("cuda:1" if torch.cuda.is_available() else "cpu") | |
if not next(self.div_model.parameters()).is_cuda: | |
self.div_model = self.div_model.to(device_div) | |
if not next(self.bi_model.parameters()).is_cuda: | |
self.bi_model = self.bi_model.to(device_bi) | |
diveye = Diversity(self.div_model, self.div_tokenizer, device_div) | |
biscope = BiScope(self.bi_model, self.bi_tokenizer, self.device_bi) | |
diveye_features = diveye.compute_features(text) | |
biscope_features = biscope.detect_single_sample(text) | |
for f in biscope_features: | |
diveye_features.append(f) | |
return self.model.predict_proba([diveye_features])[:, 1][0].item() |