|
|
|
import os, re, functools, numpy as np, pandas as pd |
|
import gradio as gr |
|
from datasets import load_dataset |
|
from sklearn.metrics.pairwise import cosine_similarity |
|
|
|
|
|
SAMPLE_SIZE = int(os.getenv("SAMPLE_SIZE", "3000")) |
|
RANDOM_STATE = 42 |
|
DEFAULT_INPUT = "I am so happy with this product" |
|
|
|
|
|
def clean_text(text: str) -> str: |
|
text = (text or "").lower() |
|
text = re.sub(r"http\S+", "", text) |
|
text = re.sub(r"@\w+", "", text) |
|
text = re.sub(r"#\w+", "", text) |
|
text = re.sub(r"[^\w\s]", "", text) |
|
text = re.sub(r"\s+", " ", text).strip() |
|
return text |
|
|
|
def _to_numpy(x): |
|
try: |
|
import torch |
|
if hasattr(torch, "Tensor") and isinstance(x, torch.Tensor): |
|
return x.detach().cpu().numpy() |
|
except Exception: |
|
pass |
|
return np.asarray(x) |
|
|
|
def _l2norm(x: np.ndarray) -> np.ndarray: |
|
x = x.astype(np.float32, copy=False) |
|
if x.ndim == 1: |
|
x = x.reshape(1, -1) |
|
return x / (np.linalg.norm(x, axis=1, keepdims=True) + 1e-12) |
|
|
|
|
|
|
|
@functools.lru_cache(maxsize=1) |
|
def load_sample_df(): |
|
|
|
import pandas as pd |
|
|
|
try: |
|
from datasets import load_dataset |
|
ds = load_dataset("sentiment140", split=f"train[:{SAMPLE_SIZE}]") |
|
df = ds.to_pandas() |
|
except Exception: |
|
try: |
|
from datasets import load_dataset |
|
ds = load_dataset("tweet_eval", "sentiment", split=f"train[:{SAMPLE_SIZE}]") |
|
df = ds.to_pandas().rename(columns={"text": "text"}) |
|
except Exception: |
|
fallback_texts = [ |
|
"I love this product!", "This is terrible...", "Best purchase ever", |
|
"Pretty good overall", "I am not happy with the service", |
|
"Absolutely fantastic experience", "Could be better", "Super satisfied", |
|
"Worst ever", "Not bad at all", "Amazing quality", "I will buy again" |
|
] |
|
return pd.DataFrame({"text": fallback_texts, "clean_text": fallback_texts}) |
|
|
|
df = df.dropna(subset=["text"]).copy() |
|
df["text_length"] = df["text"].astype(str).str.len() |
|
df = df[(df["text_length"] >= 5) & (df["text_length"] <= 280)].copy() |
|
|
|
def clean_text(t): |
|
import re |
|
t = str(t).lower() |
|
t = re.sub(r"http\S+", "", t) |
|
t = re.sub(r"[@#]\w+", "", t) |
|
t = re.sub(r"[^a-z0-9\s.,!?']", " ", t) |
|
t = re.sub(r"\s+", " ", t).strip() |
|
return t |
|
|
|
df["clean_text"] = df["text"].apply(clean_text) |
|
df = df.sample(frac=1.0, random_state=RANDOM_STATE).reset_index(drop=True) |
|
|
|
return df[["text", "clean_text"]] |
|
|
|
|
|
|
|
@functools.lru_cache(maxsize=None) |
|
def load_sentence_model(model_id: str): |
|
from sentence_transformers import SentenceTransformer |
|
return SentenceTransformer(model_id) |
|
|
|
@functools.lru_cache(maxsize=None) |
|
def load_generator(): |
|
from transformers import pipeline, set_seed |
|
set_seed(RANDOM_STATE) |
|
return pipeline("text-generation", model="distilgpt2") |
|
|
|
|
|
EMBEDDERS = { |
|
"MiniLM (fast)": "sentence-transformers/all-MiniLM-L6-v2", |
|
"MPNet (heavier)": "sentence-transformers/all-mpnet-base-v2", |
|
"DistilRoBERTa (paraphrase)": "sentence-transformers/paraphrase-distilroberta-base-v1", |
|
} |
|
|
|
|
|
_CORPUS_CACHE = {} |
|
|
|
def _encode_norm(model, texts): |
|
"""Encode compatibly across sentence-transformers versions; return L2-normalized numpy (n,d).""" |
|
out = model.encode(texts, show_progress_bar=False) |
|
out = _to_numpy(out) |
|
return _l2norm(out) |
|
|
|
def ensure_corpus_embeddings(model_name: str, texts: list): |
|
if model_name in _CORPUS_CACHE: |
|
return _CORPUS_CACHE[model_name] |
|
model = load_sentence_model(EMBEDDERS[model_name]) |
|
emb = _encode_norm(model, texts) |
|
_CORPUS_CACHE[model_name] = emb |
|
return emb |
|
|
|
|
|
def top3_for_each_model(user_input: str, selected_models: list): |
|
df = load_sample_df() |
|
texts = df["clean_text"].tolist() |
|
rows = [] |
|
for name in selected_models: |
|
try: |
|
model = load_sentence_model(EMBEDDERS[name]) |
|
corpus_emb = ensure_corpus_embeddings(name, texts) |
|
q = _encode_norm(model, [clean_text(user_input)]) |
|
sims = cosine_similarity(q, corpus_emb)[0] |
|
top_idx = sims.argsort()[-3:][::-1] |
|
for rank, i in enumerate(top_idx, start=1): |
|
rows.append({ |
|
"Model": name, |
|
"Rank": rank, |
|
"Similarity": float(sims[i]), |
|
"Tweet (clean)": texts[i], |
|
"Tweet (orig)": df.loc[i, "text"], |
|
}) |
|
except Exception as e: |
|
rows.append({ |
|
"Model": name, "Rank": "-", "Similarity": "-", |
|
"Tweet (clean)": f"[Error: {e}]", "Tweet (orig)": "" |
|
}) |
|
return pd.DataFrame(rows, columns=["Model","Rank","Similarity","Tweet (clean)","Tweet (orig)"]) |
|
|
|
|
|
def generate_and_pick_best(prompt: str, n_sequences: int, max_length: int, |
|
temperature: float, scorer_model_name: str, |
|
progress=gr.Progress()): |
|
progress(0.0, desc="Loading models…") |
|
gen = load_generator() |
|
scorer = load_sentence_model(EMBEDDERS[scorer_model_name]) |
|
|
|
progress(0.3, desc="Generating candidates…") |
|
outputs = gen( |
|
prompt, |
|
max_new_tokens=int(max_length), |
|
num_return_sequences=int(n_sequences), |
|
do_sample=True, |
|
temperature=float(temperature), |
|
pad_token_id=50256, |
|
) |
|
candidates = [o["generated_text"].strip() for o in outputs] |
|
|
|
progress(0.7, desc="Scoring candidates…") |
|
q = _encode_norm(scorer, [prompt]) |
|
cand_vecs = _encode_norm(scorer, candidates) |
|
sims = cosine_similarity(q, cand_vecs)[0] |
|
best_idx = int(sims.argmax()) |
|
|
|
table = pd.DataFrame({ |
|
"Rank": np.argsort(-sims) + 1, |
|
"Similarity": np.sort(sims)[::-1], |
|
"Generated Tweet": [c for _, c in sorted(zip(-sims, candidates))] |
|
}) |
|
progress(1.0) |
|
return candidates[best_idx], float(sims[best_idx]), table |
|
|
|
|
|
with gr.Blocks(title="Sentiment140 Embeddings + Generation") as demo: |
|
gr.Markdown( |
|
""" |
|
# 🧪 Sentiment140 — Embeddings & Tweet Generator |
|
Type a tweet, get similar tweets from Sentiment140, and generate a new one. |
|
""" |
|
) |
|
|
|
with gr.Row(): |
|
test_input = gr.Textbox(label="Your input", value=DEFAULT_INPUT, lines=2) |
|
models = gr.CheckboxGroup( |
|
choices=list(EMBEDDERS.keys()), |
|
value=["MiniLM (fast)"], |
|
label="Embedding models to compare", |
|
) |
|
|
|
run_btn = gr.Button("🔎 Find Top‑3 Similar Tweets") |
|
table_out = gr.Dataframe(interactive=False) |
|
|
|
run_btn.click(top3_for_each_model, inputs=[test_input, models], outputs=table_out) |
|
|
|
gr.Markdown("---") |
|
gr.Markdown("## 📝 Generate Tweets and Pick the Best") |
|
|
|
with gr.Row(): |
|
n_seq = gr.Slider(1, 8, value=4, step=1, label="Number of candidates") |
|
max_len = gr.Slider(20, 80, value=40, step=1, label="Max length (new tokens)") |
|
temp = gr.Slider(0.7, 1.3, value=0.9, step=0.05, label="Temperature") |
|
scorer_model = gr.Dropdown(list(EMBEDDERS.keys()), value="MiniLM (fast)", label="Scorer embedding") |
|
|
|
gen_btn = gr.Button("✨ Generate & Score") |
|
best_txt = gr.Textbox(label="Best generated tweet") |
|
best_score = gr.Number(label="Similarity (best)") |
|
gen_table = gr.Dataframe(interactive=False) |
|
|
|
gen_btn.click( |
|
generate_and_pick_best, |
|
inputs=[test_input, n_seq, max_len, temp, scorer_model], |
|
outputs=[best_txt, best_score, gen_table], |
|
) |
|
|
|
demo.queue(max_size=32).launch() |
|
|
|
|