Advik
initial commit
f1b0dfd
raw
history blame
6.23 kB
import json
import xgboost as xgb
import pandas as pd
import numpy as np
import torch
import zlib
from transformers import AutoTokenizer, AutoModelForCausalLM
from scipy.stats import skew, kurtosis, entropy
from tqdm import tqdm
from torch.nn import CrossEntropyLoss
from pathlib import Path
class Diversity:
def __init__(self, model, tokenizer, device):
self.tokenizer = tokenizer
self.model = model.to(device)
self.device = device
def compute_log_likelihoods(self, text):
tokens = self.tokenizer.encode(text, return_tensors="pt", truncation=True, max_length=1024).to(self.device)
with torch.no_grad():
outputs = self.model(tokens, labels=tokens)
logits = outputs.logits
shift_logits = logits[:, :-1, :].squeeze(0)
shift_labels = tokens[:, 1:].squeeze(0)
log_probs = torch.log_softmax(shift_logits.float(), dim=-1)
token_log_likelihoods = log_probs[range(shift_labels.shape[0]), shift_labels].cpu().numpy()
return token_log_likelihoods
def compute_surprisal(self, text):
log_likelihoods = self.compute_log_likelihoods(text)
return -log_likelihoods
def compute_features(self, text):
surprisals = self.compute_surprisal(text)
log_likelihoods = self.compute_log_likelihoods(text)
if len(surprisals) < 10 or len(log_likelihoods) < 3:
return None
s = np.array(surprisals)
mean_s, std_s, var_s, skew_s, kurt_s = np.mean(s), np.std(s), np.var(s), skew(s), kurtosis(s)
diff_s = np.diff(s)
mean_diff, std_diff = np.mean(diff_s), np.std(diff_s)
first_order_diff = np.diff(log_likelihoods)
second_order_diff = np.diff(first_order_diff)
var_2nd = np.var(second_order_diff)
entropy_2nd = entropy(np.histogram(second_order_diff, bins=20, density=True)[0])
autocorr_2nd = np.corrcoef(second_order_diff[:-1], second_order_diff[1:])[0, 1] if len(second_order_diff) > 1 else 0
comp_ratio = len(zlib.compress(text.encode('utf-8'))) / len(text.encode('utf-8'))
return [mean_s, std_s, var_s, skew_s, kurt_s, mean_diff, std_diff, var_2nd, entropy_2nd, autocorr_2nd, comp_ratio]
class BiScope:
def __init__(self, model, tokenizer, device):
self.COMPLETION_PROMPT_ONLY = "Complete the following text: "
self.tokenizer = tokenizer
self.model = model.to(device)
self.device = device
def compute_fce_loss(self, logits, targets, text_slice):
return CrossEntropyLoss(reduction='none')(
logits[0, text_slice.start-1:text_slice.stop-1, :],
targets
).detach().cpu().numpy()
def compute_bce_loss(self, logits, targets, text_slice):
return CrossEntropyLoss(reduction='none')(
logits[0, text_slice, :],
targets
).detach().cpu().numpy()
def detect_single_sample(self, sample):
prompt_ids = self.tokenizer(self.COMPLETION_PROMPT_ONLY, return_tensors='pt').input_ids.to(self.device)
text_ids = self.tokenizer(sample, return_tensors='pt', max_length=2000, truncation=True).input_ids.to(self.device)
combined_ids = torch.cat([prompt_ids, text_ids], dim=1)
text_slice = slice(prompt_ids.shape[1], combined_ids.shape[1])
outputs = self.model(input_ids=combined_ids)
logits = outputs.logits
targets = combined_ids[0][text_slice]
fce_loss = self.compute_fce_loss(logits, targets, text_slice)
bce_loss = self.compute_bce_loss(logits, targets, text_slice)
features = []
for p in range(1, 10):
split = len(fce_loss) * p // 10
fce_clipped = np.nan_to_num(np.clip(fce_loss[split:], -1e6, 1e6), nan=0.0, posinf=1e6, neginf=-1e6)
bce_clipped = np.nan_to_num(np.clip(bce_loss[split:], -1e6, 1e6), nan=0.0, posinf=1e6, neginf=-1e6)
features.extend([
np.mean(fce_clipped), np.max(fce_clipped), np.min(fce_clipped), np.std(fce_clipped),
np.mean(bce_clipped), np.max(bce_clipped), np.min(bce_clipped), np.std(bce_clipped)
])
return features
class Software:
def __init__(self):
self.device_div = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
if torch.cuda.device_count() > 1:
self.device_bi = torch.device("cuda:1" if torch.cuda.is_available() else "cpu")
else:
self.device_bi = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
self.div_tokenizer = AutoTokenizer.from_pretrained("tiiuae/falcon-7b", use_fast=False, trust_remote_code=True, local_files_only=True)
self.div_model = AutoModelForCausalLM.from_pretrained(
"tiiuae/falcon-7b", device_map=self.device_div, torch_dtype=torch.float16, trust_remote_code=True, local_files_only=True
)
self.bi_tokenizer = AutoTokenizer.from_pretrained("google/gemma-1.1-2b-it", use_fast=False, trust_remote_code=True, local_files_only=True)
self.bi_model = AutoModelForCausalLM.from_pretrained(
"google/gemma-1.1-2b-it", device_map=self.device_bi, torch_dtype=torch.float16, trust_remote_code=True, local_files_only=True
)
self.diveye = Diversity(self.div_model, self.div_tokenizer, self.device_div)
self.biscope = BiScope(self.bi_model, self.bi_tokenizer, self.device_bi)
self.model_path = Path(__file__).parent / "model.json"
self.model = xgb.XGBClassifier()
self.model.load_model(self.model_path)
def load_data(self, jsonl_path):
ids, texts = [], []
with open(jsonl_path, 'r') as f:
for line in f:
obj = json.loads(line)
ids.append(obj["id"])
texts.append(obj["text"])
return ids, texts
def evaluate(self, text):
diveye_features = self.diveye.compute_features(text)
biscope_features = self.biscope.detect_single_sample(text)
for f in biscope_features:
diveye_features.append(f)
return self.model.predict_proba([diveye_features])[:, 1][0].item()