Liketropy-LLM-Detector / detector.py
tararad's picture
Update detector.py
cb972c3 verified
import torch
import torch.nn.functional as F
from transformers import AutoTokenizer, AutoModelForCausalLM
import os
import spaces
os.environ["TOKENIZERS_PARALLELISM"] = "false"
class CustomDetector:
def __init__(self, model_name="tiiuae/falcon-rw-1b", max_length=512):
self.device = "cuda" if torch.cuda.is_available() else "cpu"
self.model_name = model_name
self.max_length = max_length
self.tokenizer = None
self.model = None
@spaces.GPU
def load_model(self):
"""Load model and tokenizer on GPU when called."""
try:
if self.tokenizer is None:
self.tokenizer = AutoTokenizer.from_pretrained(self.model_name)
if self.model is None:
self.model = AutoModelForCausalLM.from_pretrained(self.model_name, torch_dtype=torch.float16)
self.model.to(self.device)
self.model.eval()
if self.tokenizer.pad_token is None:
self.tokenizer.pad_token = self.tokenizer.eos_token
except Exception as e:
raise RuntimeError(f"Failed to load model {self.model_name}: {str(e)}")
@spaces.GPU
def my_detector(self, texts: list[str]) -> list[float]:
if self.model is None or self.tokenizer is None:
self.load_model()
try:
with torch.no_grad():
tokenized = self.tokenizer(
texts,
truncation=True,
padding=True,
max_length=self.max_length,
return_tensors="pt",
)
tokenized = {k: v.to(self.device) for k, v in tokenized.items()}
input_ids = tokenized["input_ids"]
attention_mask = tokenized["attention_mask"]
outputs = self.model(**tokenized)
logits = outputs.logits[:, :-1, :]
labels = tokenized["input_ids"][:, 1:]
log_probs = F.log_softmax(logits, dim=-1)
ll_per_token = log_probs.gather(2, labels.unsqueeze(-1)).squeeze(-1)
attention_mask = tokenized["attention_mask"][:, 1:]
ll_per_sample = (ll_per_token * attention_mask).sum(dim=-1) / attention_mask.sum(dim=1).clamp(min=1)
neg_entropy = (log_probs.exp() * log_probs)
entropy_per_sample = -(neg_entropy.sum(dim=-1) * attention_mask).sum(-1) / attention_mask.sum(dim=1).clamp(min=1)
scores = (abs(entropy_per_sample + ll_per_sample)).cpu().tolist()
return scores
except Exception as e:
raise RuntimeError(f"Error computing score: {str(e)}")