Spaces:
Sleeping
Sleeping
from fastapi import FastAPI, HTTPException | |
from pydantic import BaseModel, validator | |
import re | |
import torch | |
from transformers import pipeline, AutoTokenizer, AutoModelForCausalLM | |
from collections import Counter | |
import logging | |
import numpy as np | |
# Configure logging with more detail | |
logging.basicConfig(filename="predictions.log", level=logging.DEBUG, format="%(asctime)s - %(levelname)s - %(message)s") | |
app = FastAPI(title="Improved AI Text Detector") | |
# Enable GPU if available, else use CPU | |
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
torch.manual_seed(42) | |
# Load classifier models | |
english_detectors = [ | |
pipeline("text-classification", model="Hello-SimpleAI/chatgpt-detector-roberta", device=device if device.type == "cuda" else -1, truncation=True, max_length=512), | |
pipeline("text-classification", model="openai-community/roberta-large-openai-detector", device=device if device.type == "cuda" else -1, truncation=True, max_length=512) | |
] | |
arabic_detector = pipeline("text-classification", model="sabaridsnfuji/arabic-ai-text-detector", device=device if device.type == "cuda" else -1, truncation=True, max_length=512) | |
# Load perplexity models | |
ppl_english = { | |
"tokenizer": AutoTokenizer.from_pretrained("gpt2"), | |
"model": AutoModelForCausalLM.from_pretrained("gpt2").to(device) | |
} | |
ppl_arabic = { | |
"tokenizer": AutoTokenizer.from_pretrained("aubmindlab/aragpt2-base"), | |
"model": AutoModelForCausalLM.from_pretrained("aubmindlab/aragpt2-base").to(device) | |
} | |
def detect_language(text: str) -> str: | |
"""Detect if text is Arabic or English based on Unicode character ranges.""" | |
arabic_chars = len(re.findall(r'[\u0600-\u06FF]', text)) | |
latin_chars = len(re.findall(r'[A-Za-z]', text)) | |
total_chars = arabic_chars + latin_chars | |
if total_chars == 0: | |
return 'en' | |
arabic_ratio = arabic_chars / total_chars | |
return 'ar' if arabic_ratio > 0.5 else 'en' | |
def calculate_burstiness(text: str) -> float: | |
"""Calculate burstiness (std/mean of sentence lengths) to bias toward human text.""" | |
sentences = re.split(r'[.!?]', text) | |
lengths = [len(s.split()) for s in sentences if s] | |
return np.std(lengths) / (np.mean(lengths) + 1e-6) if lengths else 0 | |
def calculate_ttr(text: str) -> float: | |
"""Calculate type-token ratio (lexical diversity) to bias toward human text.""" | |
words = text.split() | |
if not words: | |
return 0 | |
unique_words = len(set(words)) | |
total_words = len(words) | |
return unique_words / total_words | |
def clean_text(text: str, language: str) -> str: | |
"""Clean text by removing special characters and normalizing spaces. Skip lowercase for Arabic.""" | |
text = re.sub(r'\s+', ' ', text) | |
text = re.sub(r'[^\w\s.,!?]', '', text) | |
text = text.strip() | |
if language == 'en': | |
text = text.lower() | |
return text | |
def get_classifier_score(text: str, detector) -> float: | |
"""Get classifier probability for AI label.""" | |
result = detector(text, truncation=True, max_length=512)[0] | |
score = result['score'] | |
return score if result['label'] in ['AI', 'Fake'] else 1 - score | |
def get_perplexity(text: str, tokenizer, model) -> float: | |
"""Calculate perplexity using a language model.""" | |
encodings = tokenizer(text, return_tensors="pt", truncation=True, max_length=512).to(device) | |
max_length = model.config.n_positions | |
stride = 512 | |
seq_len = encodings.input_ids.size(1) | |
nlls = [] | |
prev_end_loc = 0 | |
for begin_loc in range(0, seq_len, stride): | |
end_loc = min(begin_loc + stride, seq_len) | |
trg_len = end_loc - prev_end_loc | |
input_ids = encodings.input_ids[:, begin_loc:end_loc].to(device) | |
target_ids = input_ids.clone() | |
target_ids[:, :-trg_len] = -100 | |
with torch.no_grad(): | |
outputs = model(input_ids, labels=target_ids) | |
neg_log_likelihood = outputs.loss * trg_len | |
nlls.append(neg_log_likelihood) | |
prev_end_loc = end_loc | |
if end_loc == seq_len: | |
break | |
ppl = torch.exp(torch.stack(nlls).sum() / end_loc if nlls else torch.tensor(0)).item() | |
return ppl | |
def calculate_weighted_score(clf_score: float, ppl: float, burstiness: float, ttr: float, detected_lang: str) -> float: | |
"""Calculate a weighted score combining classifier and features.""" | |
ppl_norm = min(ppl / 200, 1.0) # Normalize perplexity (cap at 200) | |
burstiness_norm = min(burstiness / (2.0 if detected_lang == 'en' else 1.5), 1.0) # Normalize burstiness | |
ttr_norm = max(0.1 / max(ttr, 0.01), 1.0) # Normalize TTR (inverse, cap at 0.1) | |
feature_score = (ppl_norm + burstiness_norm + ttr_norm) / 3 # Average feature score | |
return 0.6 * clf_score + 0.4 * feature_score # Weight classifier higher | |
def split_text(text: str, max_chars: int = 5000) -> list: | |
"""Split text into chunks of max_chars, preserving sentence boundaries.""" | |
sentences = re.split(r'(?<=[.!?])\s+', text) | |
chunks = [] | |
current_chunk = "" | |
for sentence in sentences: | |
if len(current_chunk) + len(sentence) <= max_chars: | |
current_chunk += sentence + " " | |
else: | |
if current_chunk: | |
chunks.append(current_chunk.strip()) | |
current_chunk = sentence + " " | |
if current_chunk: | |
chunks.append(current_chunk.strip()) | |
return chunks | |
class TextInput(BaseModel): | |
text: str | |
def validate_text(cls, value): | |
"""Validate input text for minimum length and content.""" | |
word_count = len(value.split()) | |
if word_count < 50: | |
raise ValueError(f"Text too short ({word_count} words). Minimum 50 words required.") | |
if not re.search(r'[\w]', value): | |
raise ValueError("Text must contain alphabetic characters.") | |
return value | |
def detect(input_text: TextInput): | |
detected_lang = detect_language(input_text.text) | |
note_lang = f"Detected language: {'Arabic' if detected_lang == 'ar' else 'English'}" | |
cleaned_text = clean_text(input_text.text, detected_lang) | |
burstiness = calculate_burstiness(cleaned_text) | |
ttr = calculate_ttr(cleaned_text) | |
ppl_model = ppl_english if detected_lang == 'en' else ppl_arabic | |
ppl = get_perplexity(cleaned_text, ppl_model["tokenizer"], ppl_model["model"]) | |
note_features = f"Burstiness: {burstiness:.2f} (high suggests human), TTR: {ttr:.2f} (low suggests human), Perplexity: {ppl:.2f} (high suggests human)" | |
# Select appropriate models | |
detectors = english_detectors if detected_lang == 'en' else [arabic_detector] | |
is_ensemble = detected_lang == 'en' | |
# Thresholds for human classification | |
ppl_threshold = 150 # Increased from 100 | |
burstiness_threshold = 1.7 if detected_lang == 'en' else 1.2 # Increased from 1.5/1.0 | |
ttr_threshold = 0.08 # Decreased from 0.10 | |
if len(cleaned_text) > 10000: | |
chunks = split_text(cleaned_text, max_chars=5000) | |
labels = [] | |
clf_scores = [] | |
ppls = [] | |
for chunk_idx, chunk in enumerate(chunks): | |
chunk_labels = [] | |
chunk_clf_scores = [] | |
for det_idx, detector in enumerate(detectors): | |
clf_score = get_classifier_score(chunk, detector) | |
label = "AI" if clf_score >= 0.90 else "Human" if clf_score < 0.60 else "Uncertain" # Adjusted from 0.95 | |
chunk_labels.append(label) | |
chunk_clf_scores.append(clf_score) | |
logging.debug(f"Chunk {chunk_idx}, Model {det_idx}: Label={label}, Classifier Score={clf_score:.4f}") | |
chunk_ppl = get_perplexity(chunk, ppl_model["tokenizer"], ppl_model["model"]) | |
chunk_final_label = Counter(chunk_labels).most_common(1)[0][0] | |
avg_clf_score = np.mean(chunk_clf_scores) | |
# Count human-like features | |
human_features = sum([ | |
chunk_ppl > ppl_threshold, | |
burstiness > burstiness_threshold, | |
ttr < ttr_threshold | |
]) | |
feature_note = f"Human-like features: {human_features}/3 (PPL={chunk_ppl:.2f}, Burstiness={burstiness:.2f}, TTR={ttr:.2f})" | |
# Calculate weighted score | |
weighted_score = calculate_weighted_score(avg_clf_score, chunk_ppl, burstiness, ttr, detected_lang) | |
chunk_final_label = "AI" if weighted_score >= 0.7 else "Human" if weighted_score < 0.4 else "Uncertain" | |
# Require all 3 features to override to Human | |
if chunk_final_label == "Uncertain" or any(l == "Human" for l in chunk_labels): | |
if human_features == 3: | |
chunk_final_label = "Human" | |
elif chunk_final_label == "AI" and avg_clf_score < 0.90 and human_features == 3: | |
chunk_final_label = "Human" | |
labels.append(chunk_final_label) | |
clf_scores.append(avg_clf_score) | |
ppls.append(chunk_ppl) | |
logging.debug(f"Chunk {chunk_idx} Final: Label={chunk_final_label}, Avg Classifier Score={avg_clf_score:.4f}, Weighted Score={weighted_score:.4f}, Perplexity={chunk_ppl:.2f}, {feature_note}") | |
label_counts = Counter(labels) | |
final_label = label_counts.most_common(1)[0][0] | |
avg_weighted_score = sum(calculate_weighted_score(clf_scores[i], ppls[i], burstiness, ttr, detected_lang) for i in range(len(clf_scores))) / len(clf_scores) if clf_scores else 0.0 | |
final_label = "AI" if avg_weighted_score >= 0.7 else "Human" if avg_weighted_score < 0.4 else "Uncertain" | |
if final_label == "Uncertain" or any(l == "Human" for l in labels): | |
human_features = sum([ | |
any(ppl > ppl_threshold for ppl in ppls), | |
burstiness > burstiness_threshold, | |
ttr < ttr_threshold | |
]) | |
if human_features == 3: | |
final_label = "Human" | |
avg_clf_score = sum(clf_scores) / len(clf_scores) if clf_scores else 0.0 | |
avg_ppl = sum(ppls) / len(ppls) if ppls else 0.0 | |
logging.info(f"Language: {detected_lang} | Text Length: {len(cleaned_text)} | Chunks: {len(chunks)} | Prediction: {final_label} | Avg Classifier Score: {avg_clf_score:.4f} | Avg Perplexity: {avg_ppl:.2f} | {note_features}") | |
return { | |
"prediction": final_label, | |
"classifier_score": round(avg_clf_score, 4), | |
"perplexity": round(avg_ppl, 2), | |
"note": f"{note_lang}. Text was split into {len(chunks)} chunks due to length > 10,000 characters. {note_features}. Weighted Score={avg_weighted_score:.4f}.", | |
"chunk_results": [ | |
{"chunk": chunk[:50] + "...", "label": labels[i], "classifier_score": clf_scores[i], "perplexity": ppls[i], "burstiness": burstiness, "ttr": ttr} | |
for i, chunk in enumerate(chunks) | |
] | |
} | |
else: | |
if is_ensemble: | |
clf_scores = [] | |
labels = [] | |
for det_idx, detector in enumerate(detectors): | |
clf_score = get_classifier_score(cleaned_text, detector) | |
label = "AI" if clf_score >= 0.90 else "Human" if clf_score < 0.60 else "Uncertain" # Adjusted from 0.95 | |
labels.append(label) | |
clf_scores.append(clf_score) | |
logging.debug(f"Model {det_idx}: Label={label}, Classifier Score={clf_score:.4f}") | |
label_counts = Counter(labels) | |
final_label = label_counts.most_common(1)[0][0] | |
avg_clf_score = sum(clf_scores) / len(clf_scores) if clf_scores else 0.0 | |
# Count human-like features | |
human_features = sum([ | |
ppl > ppl_threshold, | |
burstiness > burstiness_threshold, | |
ttr < ttr_threshold | |
]) | |
feature_note = f"Human-like features: {human_features}/3 (PPL={ppl:.2f}, Burstiness={burstiness:.2f}, TTR={ttr:.2f})" | |
# Calculate weighted score | |
weighted_score = calculate_weighted_score(avg_clf_score, ppl, burstiness, ttr, detected_lang) | |
final_label = "AI" if weighted_score >= 0.7 else "Human" if weighted_score < 0.4 else "Uncertain" | |
# Require all 3 features to override to Human | |
if final_label == "Uncertain" or any(l == "Human" for l in labels): | |
if human_features == 3: | |
final_label = "Human" | |
elif final_label == "AI" and avg_clf_score < 0.90 and human_features == 3: | |
final_label = "Human" | |
note = f"{note_lang}. Ensemble used: {len(detectors)} models. {note_features}. {feature_note}. Weighted Score={weighted_score:.4f}." | |
if 0.60 <= avg_clf_score < 0.90: | |
note += " Warning: Close to threshold, result may be uncertain." | |
logging.info(f"Language: {detected_lang} | Text Length: {len(cleaned_text)} | Prediction: {final_label} | Avg Classifier Score: {avg_clf_score:.4f} | Perplexity: {ppl:.2f} | {note_features} | {feature_note}") | |
else: | |
clf_score = get_classifier_score(cleaned_text, arabic_detector) | |
final_label = "AI" if clf_score >= 0.90 else "Human" if clf_score < 0.60 else "Uncertain" # Adjusted from 0.95 | |
# Count human-like features | |
human_features = sum([ | |
ppl > ppl_threshold, | |
burstiness > burstiness_threshold, | |
ttr < ttr_threshold | |
]) | |
feature_note = f"Human-like features: {human_features}/3 (PPL={ppl:.2f}, Burstiness={burstiness:.2f}, TTR={ttr:.2f})" | |
# Calculate weighted score | |
weighted_score = calculate_weighted_score(clf_score, ppl, burstiness, ttr, detected_lang) | |
final_label = "AI" if weighted_score >= 0.7 else "Human" if weighted_score < 0.4 else "Uncertain" | |
# Require all 3 features to override to Human | |
if final_label == "Uncertain" or final_label == "Human": | |
if human_features == 3: | |
final_label = "Human" | |
elif final_label == "AI" and clf_score < 0.90 and human_features == 3: | |
final_label = "Human" | |
note = f"{note_lang}. {note_features}. {feature_note}. Weighted Score={weighted_score:.4f}." | |
if 0.60 <= clf_score < 0.90: | |
note += " Warning: Close to threshold, result may be uncertain." | |
logging.info(f"Language: {detected_lang} | Text Length: {len(cleaned_text)} | Prediction: {final_label} | Classifier Score: {clf_score:.4f} | Perplexity: {ppl:.2f} | {note_features} | {feature_note}") | |
return { | |
"prediction": final_label, | |
"classifier_score": round(avg_clf_score, 4), | |
"perplexity": round(ppl, 2), | |
"note": note | |
} |