# app.py import os, re, functools, numpy as np, pandas as pd import gradio as gr from datasets import load_dataset from sklearn.metrics.pairwise import cosine_similarity # -------- Config -------- SAMPLE_SIZE = int(os.getenv("SAMPLE_SIZE", "3000")) # small by default for CPU Spaces RANDOM_STATE = 42 DEFAULT_INPUT = "I am so happy with this product" # -------- Helpers -------- def clean_text(text: str) -> str: text = (text or "").lower() text = re.sub(r"http\S+", "", text) text = re.sub(r"@\w+", "", text) text = re.sub(r"#\w+", "", text) text = re.sub(r"[^\w\s]", "", text) text = re.sub(r"\s+", " ", text).strip() return text def _to_numpy(x): try: import torch if hasattr(torch, "Tensor") and isinstance(x, torch.Tensor): return x.detach().cpu().numpy() except Exception: pass return np.asarray(x) def _l2norm(x: np.ndarray) -> np.ndarray: x = x.astype(np.float32, copy=False) if x.ndim == 1: x = x.reshape(1, -1) return x / (np.linalg.norm(x, axis=1, keepdims=True) + 1e-12) # -------- Load sample data once (FAST: only a slice) -------- @functools.lru_cache(maxsize=1) def load_sample_df(): import pandas as pd try: from datasets import load_dataset ds = load_dataset("sentiment140", split=f"train[:{SAMPLE_SIZE}]") df = ds.to_pandas() except Exception: try: from datasets import load_dataset ds = load_dataset("tweet_eval", "sentiment", split=f"train[:{SAMPLE_SIZE}]") df = ds.to_pandas().rename(columns={"text": "text"}) except Exception: fallback_texts = [ "I love this product!", "This is terrible...", "Best purchase ever", "Pretty good overall", "I am not happy with the service", "Absolutely fantastic experience", "Could be better", "Super satisfied", "Worst ever", "Not bad at all", "Amazing quality", "I will buy again" ] return pd.DataFrame({"text": fallback_texts, "clean_text": fallback_texts}) df = df.dropna(subset=["text"]).copy() df["text_length"] = df["text"].astype(str).str.len() df = df[(df["text_length"] >= 5) & (df["text_length"] <= 280)].copy() def clean_text(t): import re t = str(t).lower() t = re.sub(r"http\S+", "", t) t = re.sub(r"[@#]\w+", "", t) t = re.sub(r"[^a-z0-9\s.,!?']", " ", t) t = re.sub(r"\s+", " ", t).strip() return t df["clean_text"] = df["text"].apply(clean_text) df = df.sample(frac=1.0, random_state=RANDOM_STATE).reset_index(drop=True) return df[["text", "clean_text"]] # -------- Lazy model loaders -------- @functools.lru_cache(maxsize=None) def load_sentence_model(model_id: str): from sentence_transformers import SentenceTransformer return SentenceTransformer(model_id) @functools.lru_cache(maxsize=None) def load_generator(): from transformers import pipeline, set_seed set_seed(RANDOM_STATE) return pipeline("text-generation", model="distilgpt2") # HF model ids EMBEDDERS = { "MiniLM (fast)": "sentence-transformers/all-MiniLM-L6-v2", "MPNet (heavier)": "sentence-transformers/all-mpnet-base-v2", "DistilRoBERTa (paraphrase)": "sentence-transformers/paraphrase-distilroberta-base-v1", } # Cache for corpus embeddings per model _CORPUS_CACHE = {} def _encode_norm(model, texts): """Encode compatibly across sentence-transformers versions; return L2-normalized numpy (n,d).""" out = model.encode(texts, show_progress_bar=False) out = _to_numpy(out) return _l2norm(out) def ensure_corpus_embeddings(model_name: str, texts: list): if model_name in _CORPUS_CACHE: return _CORPUS_CACHE[model_name] model = load_sentence_model(EMBEDDERS[model_name]) emb = _encode_norm(model, texts) _CORPUS_CACHE[model_name] = emb return emb # -------- Retrieval -------- def top3_for_each_model(user_input: str, selected_models: list): df = load_sample_df() texts = df["clean_text"].tolist() rows = [] for name in selected_models: try: model = load_sentence_model(EMBEDDERS[name]) corpus_emb = ensure_corpus_embeddings(name, texts) q = _encode_norm(model, [clean_text(user_input)]) sims = cosine_similarity(q, corpus_emb)[0] top_idx = sims.argsort()[-3:][::-1] for rank, i in enumerate(top_idx, start=1): rows.append({ "Model": name, "Rank": rank, "Similarity": float(sims[i]), "Tweet (clean)": texts[i], "Tweet (orig)": df.loc[i, "text"], }) except Exception as e: rows.append({ "Model": name, "Rank": "-", "Similarity": "-", "Tweet (clean)": f"[Error: {e}]", "Tweet (orig)": "" }) return pd.DataFrame(rows, columns=["Model","Rank","Similarity","Tweet (clean)","Tweet (orig)"]) # -------- Generation + scoring (with progress) -------- def generate_and_pick_best(prompt: str, n_sequences: int, max_length: int, temperature: float, scorer_model_name: str, progress=gr.Progress()): progress(0.0, desc="Loading models…") gen = load_generator() scorer = load_sentence_model(EMBEDDERS[scorer_model_name]) progress(0.3, desc="Generating candidates…") outputs = gen( prompt, max_new_tokens=int(max_length), # number of NEW tokens to generate num_return_sequences=int(n_sequences), do_sample=True, temperature=float(temperature), pad_token_id=50256, ) candidates = [o["generated_text"].strip() for o in outputs] progress(0.7, desc="Scoring candidates…") q = _encode_norm(scorer, [prompt]) cand_vecs = _encode_norm(scorer, candidates) sims = cosine_similarity(q, cand_vecs)[0] best_idx = int(sims.argmax()) table = pd.DataFrame({ "Rank": np.argsort(-sims) + 1, "Similarity": np.sort(sims)[::-1], "Generated Tweet": [c for _, c in sorted(zip(-sims, candidates))] }) progress(1.0) return candidates[best_idx], float(sims[best_idx]), table # ---------------- UI ---------------- with gr.Blocks(title="Sentiment140 Embeddings + Generation") as demo: gr.Markdown( """ # 🧪 Sentiment140 — Embeddings & Tweet Generator Type a tweet, get similar tweets from Sentiment140, and generate a new one. """ ) with gr.Row(): test_input = gr.Textbox(label="Your input", value=DEFAULT_INPUT, lines=2) models = gr.CheckboxGroup( choices=list(EMBEDDERS.keys()), value=["MiniLM (fast)"], label="Embedding models to compare", ) run_btn = gr.Button("🔎 Find Top‑3 Similar Tweets") table_out = gr.Dataframe(interactive=False) run_btn.click(top3_for_each_model, inputs=[test_input, models], outputs=table_out) gr.Markdown("---") gr.Markdown("## 📝 Generate Tweets and Pick the Best") with gr.Row(): n_seq = gr.Slider(1, 8, value=4, step=1, label="Number of candidates") max_len = gr.Slider(20, 80, value=40, step=1, label="Max length (new tokens)") temp = gr.Slider(0.7, 1.3, value=0.9, step=0.05, label="Temperature") scorer_model = gr.Dropdown(list(EMBEDDERS.keys()), value="MiniLM (fast)", label="Scorer embedding") gen_btn = gr.Button("✨ Generate & Score") best_txt = gr.Textbox(label="Best generated tweet") best_score = gr.Number(label="Similarity (best)") gen_table = gr.Dataframe(interactive=False) gen_btn.click( generate_and_pick_best, inputs=[test_input, n_seq, max_len, temp, scorer_model], outputs=[best_txt, best_score, gen_table], ) demo.queue(max_size=32).launch()