tweet-UI / app.py
omer15699's picture
Update app.py
a54bd7b verified
# app.py
import os, re, functools, numpy as np, pandas as pd
import gradio as gr
from datasets import load_dataset
from sklearn.metrics.pairwise import cosine_similarity
# -------- Config --------
SAMPLE_SIZE = int(os.getenv("SAMPLE_SIZE", "3000")) # small by default for CPU Spaces
RANDOM_STATE = 42
DEFAULT_INPUT = "I am so happy with this product"
# -------- Helpers --------
def clean_text(text: str) -> str:
text = (text or "").lower()
text = re.sub(r"http\S+", "", text)
text = re.sub(r"@\w+", "", text)
text = re.sub(r"#\w+", "", text)
text = re.sub(r"[^\w\s]", "", text)
text = re.sub(r"\s+", " ", text).strip()
return text
def _to_numpy(x):
try:
import torch
if hasattr(torch, "Tensor") and isinstance(x, torch.Tensor):
return x.detach().cpu().numpy()
except Exception:
pass
return np.asarray(x)
def _l2norm(x: np.ndarray) -> np.ndarray:
x = x.astype(np.float32, copy=False)
if x.ndim == 1:
x = x.reshape(1, -1)
return x / (np.linalg.norm(x, axis=1, keepdims=True) + 1e-12)
# -------- Load sample data once (FAST: only a slice) --------
@functools.lru_cache(maxsize=1)
def load_sample_df():
import pandas as pd
try:
from datasets import load_dataset
ds = load_dataset("sentiment140", split=f"train[:{SAMPLE_SIZE}]")
df = ds.to_pandas()
except Exception:
try:
from datasets import load_dataset
ds = load_dataset("tweet_eval", "sentiment", split=f"train[:{SAMPLE_SIZE}]")
df = ds.to_pandas().rename(columns={"text": "text"})
except Exception:
fallback_texts = [
"I love this product!", "This is terrible...", "Best purchase ever",
"Pretty good overall", "I am not happy with the service",
"Absolutely fantastic experience", "Could be better", "Super satisfied",
"Worst ever", "Not bad at all", "Amazing quality", "I will buy again"
]
return pd.DataFrame({"text": fallback_texts, "clean_text": fallback_texts})
df = df.dropna(subset=["text"]).copy()
df["text_length"] = df["text"].astype(str).str.len()
df = df[(df["text_length"] >= 5) & (df["text_length"] <= 280)].copy()
def clean_text(t):
import re
t = str(t).lower()
t = re.sub(r"http\S+", "", t)
t = re.sub(r"[@#]\w+", "", t)
t = re.sub(r"[^a-z0-9\s.,!?']", " ", t)
t = re.sub(r"\s+", " ", t).strip()
return t
df["clean_text"] = df["text"].apply(clean_text)
df = df.sample(frac=1.0, random_state=RANDOM_STATE).reset_index(drop=True)
return df[["text", "clean_text"]]
# -------- Lazy model loaders --------
@functools.lru_cache(maxsize=None)
def load_sentence_model(model_id: str):
from sentence_transformers import SentenceTransformer
return SentenceTransformer(model_id)
@functools.lru_cache(maxsize=None)
def load_generator():
from transformers import pipeline, set_seed
set_seed(RANDOM_STATE)
return pipeline("text-generation", model="distilgpt2")
# HF model ids
EMBEDDERS = {
"MiniLM (fast)": "sentence-transformers/all-MiniLM-L6-v2",
"MPNet (heavier)": "sentence-transformers/all-mpnet-base-v2",
"DistilRoBERTa (paraphrase)": "sentence-transformers/paraphrase-distilroberta-base-v1",
}
# Cache for corpus embeddings per model
_CORPUS_CACHE = {}
def _encode_norm(model, texts):
"""Encode compatibly across sentence-transformers versions; return L2-normalized numpy (n,d)."""
out = model.encode(texts, show_progress_bar=False)
out = _to_numpy(out)
return _l2norm(out)
def ensure_corpus_embeddings(model_name: str, texts: list):
if model_name in _CORPUS_CACHE:
return _CORPUS_CACHE[model_name]
model = load_sentence_model(EMBEDDERS[model_name])
emb = _encode_norm(model, texts)
_CORPUS_CACHE[model_name] = emb
return emb
# -------- Retrieval --------
def top3_for_each_model(user_input: str, selected_models: list):
df = load_sample_df()
texts = df["clean_text"].tolist()
rows = []
for name in selected_models:
try:
model = load_sentence_model(EMBEDDERS[name])
corpus_emb = ensure_corpus_embeddings(name, texts)
q = _encode_norm(model, [clean_text(user_input)])
sims = cosine_similarity(q, corpus_emb)[0]
top_idx = sims.argsort()[-3:][::-1]
for rank, i in enumerate(top_idx, start=1):
rows.append({
"Model": name,
"Rank": rank,
"Similarity": float(sims[i]),
"Tweet (clean)": texts[i],
"Tweet (orig)": df.loc[i, "text"],
})
except Exception as e:
rows.append({
"Model": name, "Rank": "-", "Similarity": "-",
"Tweet (clean)": f"[Error: {e}]", "Tweet (orig)": ""
})
return pd.DataFrame(rows, columns=["Model","Rank","Similarity","Tweet (clean)","Tweet (orig)"])
# -------- Generation + scoring (with progress) --------
def generate_and_pick_best(prompt: str, n_sequences: int, max_length: int,
temperature: float, scorer_model_name: str,
progress=gr.Progress()):
progress(0.0, desc="Loading models…")
gen = load_generator()
scorer = load_sentence_model(EMBEDDERS[scorer_model_name])
progress(0.3, desc="Generating candidates…")
outputs = gen(
prompt,
max_new_tokens=int(max_length), # number of NEW tokens to generate
num_return_sequences=int(n_sequences),
do_sample=True,
temperature=float(temperature),
pad_token_id=50256,
)
candidates = [o["generated_text"].strip() for o in outputs]
progress(0.7, desc="Scoring candidates…")
q = _encode_norm(scorer, [prompt])
cand_vecs = _encode_norm(scorer, candidates)
sims = cosine_similarity(q, cand_vecs)[0]
best_idx = int(sims.argmax())
table = pd.DataFrame({
"Rank": np.argsort(-sims) + 1,
"Similarity": np.sort(sims)[::-1],
"Generated Tweet": [c for _, c in sorted(zip(-sims, candidates))]
})
progress(1.0)
return candidates[best_idx], float(sims[best_idx]), table
# ---------------- UI ----------------
with gr.Blocks(title="Sentiment140 Embeddings + Generation") as demo:
gr.Markdown(
"""
# 🧪 Sentiment140 — Embeddings & Tweet Generator
Type a tweet, get similar tweets from Sentiment140, and generate a new one.
"""
)
with gr.Row():
test_input = gr.Textbox(label="Your input", value=DEFAULT_INPUT, lines=2)
models = gr.CheckboxGroup(
choices=list(EMBEDDERS.keys()),
value=["MiniLM (fast)"],
label="Embedding models to compare",
)
run_btn = gr.Button("🔎 Find Top‑3 Similar Tweets")
table_out = gr.Dataframe(interactive=False)
run_btn.click(top3_for_each_model, inputs=[test_input, models], outputs=table_out)
gr.Markdown("---")
gr.Markdown("## 📝 Generate Tweets and Pick the Best")
with gr.Row():
n_seq = gr.Slider(1, 8, value=4, step=1, label="Number of candidates")
max_len = gr.Slider(20, 80, value=40, step=1, label="Max length (new tokens)")
temp = gr.Slider(0.7, 1.3, value=0.9, step=0.05, label="Temperature")
scorer_model = gr.Dropdown(list(EMBEDDERS.keys()), value="MiniLM (fast)", label="Scorer embedding")
gen_btn = gr.Button("✨ Generate & Score")
best_txt = gr.Textbox(label="Best generated tweet")
best_score = gr.Number(label="Similarity (best)")
gen_table = gr.Dataframe(interactive=False)
gen_btn.click(
generate_and_pick_best,
inputs=[test_input, n_seq, max_len, temp, scorer_model],
outputs=[best_txt, best_score, gen_table],
)
demo.queue(max_size=32).launch()