Spaces:
Running
on
Zero
Running
on
Zero
File size: 7,217 Bytes
7cd2476 3d31a0f 7f33f97 7cd2476 7f33f97 10078f3 7f33f97 10078f3 1c1e204 7cd2476 10078f3 7cd2476 3d31a0f 7cd2476 10078f3 1c1e204 7cd2476 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 |
import json
import xgboost as xgb
import pandas as pd
import numpy as np
import torch
import zlib
from transformers import AutoTokenizer, AutoModelForCausalLM
from scipy.stats import skew, kurtosis, entropy
from tqdm import tqdm
from torch.nn import CrossEntropyLoss
from pathlib import Path
import spaces
import os
class Diversity:
def __init__(self, model, tokenizer, device):
self.tokenizer = tokenizer
self.model = model.to(device)
self.device = device
def compute_log_likelihoods(self, text):
tokens = self.tokenizer.encode(text, return_tensors="pt", truncation=True, max_length=1024).to(self.device)
with torch.no_grad():
outputs = self.model(tokens, labels=tokens)
logits = outputs.logits
shift_logits = logits[:, :-1, :].squeeze(0)
shift_labels = tokens[:, 1:].squeeze(0)
log_probs = torch.log_softmax(shift_logits.float(), dim=-1)
token_log_likelihoods = log_probs[range(shift_labels.shape[0]), shift_labels].cpu().numpy()
return token_log_likelihoods
def compute_surprisal(self, text):
log_likelihoods = self.compute_log_likelihoods(text)
return -log_likelihoods
def compute_features(self, text):
surprisals = self.compute_surprisal(text)
log_likelihoods = self.compute_log_likelihoods(text)
if len(surprisals) < 10 or len(log_likelihoods) < 3:
return None
s = np.array(surprisals)
mean_s, std_s, var_s, skew_s, kurt_s = np.mean(s), np.std(s), np.var(s), skew(s), kurtosis(s)
diff_s = np.diff(s)
mean_diff, std_diff = np.mean(diff_s), np.std(diff_s)
first_order_diff = np.diff(log_likelihoods)
second_order_diff = np.diff(first_order_diff)
var_2nd = np.var(second_order_diff)
entropy_2nd = entropy(np.histogram(second_order_diff, bins=20, density=True)[0])
autocorr_2nd = np.corrcoef(second_order_diff[:-1], second_order_diff[1:])[0, 1] if len(second_order_diff) > 1 else 0
comp_ratio = len(zlib.compress(text.encode('utf-8'))) / len(text.encode('utf-8'))
return [mean_s, std_s, var_s, skew_s, kurt_s, mean_diff, std_diff, var_2nd, entropy_2nd, autocorr_2nd, comp_ratio]
class BiScope:
def __init__(self, model, tokenizer, device):
self.COMPLETION_PROMPT_ONLY = "Complete the following text: "
self.tokenizer = tokenizer
self.model = model.to(device)
self.device = device
def compute_fce_loss(self, logits, targets, text_slice):
return CrossEntropyLoss(reduction='none')(
logits[0, text_slice.start-1:text_slice.stop-1, :],
targets
).detach().cpu().numpy()
def compute_bce_loss(self, logits, targets, text_slice):
return CrossEntropyLoss(reduction='none')(
logits[0, text_slice, :],
targets
).detach().cpu().numpy()
def detect_single_sample(self, sample):
prompt_ids = self.tokenizer(self.COMPLETION_PROMPT_ONLY, return_tensors='pt').input_ids.to(self.device)
text_ids = self.tokenizer(sample, return_tensors='pt', max_length=2000, truncation=True).input_ids.to(self.device)
combined_ids = torch.cat([prompt_ids, text_ids], dim=1)
text_slice = slice(prompt_ids.shape[1], combined_ids.shape[1])
outputs = self.model(input_ids=combined_ids)
logits = outputs.logits
targets = combined_ids[0][text_slice]
fce_loss = self.compute_fce_loss(logits, targets, text_slice)
bce_loss = self.compute_bce_loss(logits, targets, text_slice)
features = []
for p in range(1, 10):
split = len(fce_loss) * p // 10
fce_clipped = np.nan_to_num(np.clip(fce_loss[split:], -1e6, 1e6), nan=0.0, posinf=1e6, neginf=-1e6)
bce_clipped = np.nan_to_num(np.clip(bce_loss[split:], -1e6, 1e6), nan=0.0, posinf=1e6, neginf=-1e6)
features.extend([
np.mean(fce_clipped), np.max(fce_clipped), np.min(fce_clipped), np.std(fce_clipped),
np.mean(bce_clipped), np.max(bce_clipped), np.min(bce_clipped), np.std(bce_clipped)
])
return features
class Software:
def __init__(self):
self.token = os.getenv("HF_TOKEN")
self.device_div = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
self.device_bi = self.device_div
self.div_model = None
self.div_tokenizer = None
self.bi_model = None
self.bi_tokenizer = None
self.model_path = Path(__file__).parent / "model.json"
self.model = xgb.XGBClassifier()
self.model.load_model(self.model_path)
def _load_div_models(self):
if self.div_model is None or self.div_tokenizer is None:
self.div_tokenizer = AutoTokenizer.from_pretrained("tiiuae/falcon-7b", use_fast=False, trust_remote_code=True, use_auth_token=self.token)
self.div_model = AutoModelForCausalLM.from_pretrained(
"tiiuae/falcon-7b",
device_map="cuda",
torch_dtype=torch.float16,
trust_remote_code=True,
use_auth_token=self.token
)
self.div_model.to(self.device_div)
def _load_bi_models(self):
if self.bi_model is None or self.bi_tokenizer is None:
self.bi_tokenizer = AutoTokenizer.from_pretrained("google/gemma-1.1-2b-it", use_fast=False, trust_remote_code=True, use_auth_token=self.token)
self.bi_model = AutoModelForCausalLM.from_pretrained(
"google/gemma-1.1-2b-it",
device_map="cuda",
torch_dtype=torch.float16,
trust_remote_code=True,
use_auth_token=self.token
)
self.bi_model.to(self.device_bi)
def load_data(self, jsonl_path):
ids, texts = [], []
with open(jsonl_path, 'r') as f:
for line in f:
obj = json.loads(line)
ids.append(obj["id"])
texts.append(obj["text"])
return ids, texts
@spaces.GPU
def evaluate(self, text):
self._load_div_models()
self._load_bi_models()
# Load models to GPUs.
device_div = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
if torch.cuda.device_count() > 1:
device_bi = torch.device("cuda:1" if torch.cuda.is_available() else "cpu")
if not next(self.div_model.parameters()).is_cuda:
self.div_model = self.div_model.to(device_div)
if not next(self.bi_model.parameters()).is_cuda:
self.bi_model = self.bi_model.to(device_bi)
diveye = Diversity(self.div_model, self.div_tokenizer, device_div)
biscope = BiScope(self.bi_model, self.bi_tokenizer, self.device_bi)
diveye_features = diveye.compute_features(text)
biscope_features = biscope.detect_single_sample(text)
for f in biscope_features:
diveye_features.append(f)
return self.model.predict_proba([diveye_features])[:, 1][0].item() |