File size: 7,970 Bytes
c5cf83b 3814bb1 c5cf83b 3814bb1 c5cf83b 3814bb1 c5cf83b a54bd7b c5cf83b 889615c c5cf83b 889615c c5cf83b 889615c c5cf83b 889615c c5cf83b 889615c c5cf83b 3814bb1 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 |
# app.py
import os, re, functools, numpy as np, pandas as pd
import gradio as gr
from datasets import load_dataset
from sklearn.metrics.pairwise import cosine_similarity
# -------- Config --------
SAMPLE_SIZE = int(os.getenv("SAMPLE_SIZE", "3000")) # small by default for CPU Spaces
RANDOM_STATE = 42
DEFAULT_INPUT = "I am so happy with this product"
# -------- Helpers --------
def clean_text(text: str) -> str:
text = (text or "").lower()
text = re.sub(r"http\S+", "", text)
text = re.sub(r"@\w+", "", text)
text = re.sub(r"#\w+", "", text)
text = re.sub(r"[^\w\s]", "", text)
text = re.sub(r"\s+", " ", text).strip()
return text
def _to_numpy(x):
try:
import torch
if hasattr(torch, "Tensor") and isinstance(x, torch.Tensor):
return x.detach().cpu().numpy()
except Exception:
pass
return np.asarray(x)
def _l2norm(x: np.ndarray) -> np.ndarray:
x = x.astype(np.float32, copy=False)
if x.ndim == 1:
x = x.reshape(1, -1)
return x / (np.linalg.norm(x, axis=1, keepdims=True) + 1e-12)
# -------- Load sample data once (FAST: only a slice) --------
@functools.lru_cache(maxsize=1)
def load_sample_df():
import pandas as pd
try:
from datasets import load_dataset
ds = load_dataset("sentiment140", split=f"train[:{SAMPLE_SIZE}]")
df = ds.to_pandas()
except Exception:
try:
from datasets import load_dataset
ds = load_dataset("tweet_eval", "sentiment", split=f"train[:{SAMPLE_SIZE}]")
df = ds.to_pandas().rename(columns={"text": "text"})
except Exception:
fallback_texts = [
"I love this product!", "This is terrible...", "Best purchase ever",
"Pretty good overall", "I am not happy with the service",
"Absolutely fantastic experience", "Could be better", "Super satisfied",
"Worst ever", "Not bad at all", "Amazing quality", "I will buy again"
]
return pd.DataFrame({"text": fallback_texts, "clean_text": fallback_texts})
df = df.dropna(subset=["text"]).copy()
df["text_length"] = df["text"].astype(str).str.len()
df = df[(df["text_length"] >= 5) & (df["text_length"] <= 280)].copy()
def clean_text(t):
import re
t = str(t).lower()
t = re.sub(r"http\S+", "", t)
t = re.sub(r"[@#]\w+", "", t)
t = re.sub(r"[^a-z0-9\s.,!?']", " ", t)
t = re.sub(r"\s+", " ", t).strip()
return t
df["clean_text"] = df["text"].apply(clean_text)
df = df.sample(frac=1.0, random_state=RANDOM_STATE).reset_index(drop=True)
return df[["text", "clean_text"]]
# -------- Lazy model loaders --------
@functools.lru_cache(maxsize=None)
def load_sentence_model(model_id: str):
from sentence_transformers import SentenceTransformer
return SentenceTransformer(model_id)
@functools.lru_cache(maxsize=None)
def load_generator():
from transformers import pipeline, set_seed
set_seed(RANDOM_STATE)
return pipeline("text-generation", model="distilgpt2")
# HF model ids
EMBEDDERS = {
"MiniLM (fast)": "sentence-transformers/all-MiniLM-L6-v2",
"MPNet (heavier)": "sentence-transformers/all-mpnet-base-v2",
"DistilRoBERTa (paraphrase)": "sentence-transformers/paraphrase-distilroberta-base-v1",
}
# Cache for corpus embeddings per model
_CORPUS_CACHE = {}
def _encode_norm(model, texts):
"""Encode compatibly across sentence-transformers versions; return L2-normalized numpy (n,d)."""
out = model.encode(texts, show_progress_bar=False)
out = _to_numpy(out)
return _l2norm(out)
def ensure_corpus_embeddings(model_name: str, texts: list):
if model_name in _CORPUS_CACHE:
return _CORPUS_CACHE[model_name]
model = load_sentence_model(EMBEDDERS[model_name])
emb = _encode_norm(model, texts)
_CORPUS_CACHE[model_name] = emb
return emb
# -------- Retrieval --------
def top3_for_each_model(user_input: str, selected_models: list):
df = load_sample_df()
texts = df["clean_text"].tolist()
rows = []
for name in selected_models:
try:
model = load_sentence_model(EMBEDDERS[name])
corpus_emb = ensure_corpus_embeddings(name, texts)
q = _encode_norm(model, [clean_text(user_input)])
sims = cosine_similarity(q, corpus_emb)[0]
top_idx = sims.argsort()[-3:][::-1]
for rank, i in enumerate(top_idx, start=1):
rows.append({
"Model": name,
"Rank": rank,
"Similarity": float(sims[i]),
"Tweet (clean)": texts[i],
"Tweet (orig)": df.loc[i, "text"],
})
except Exception as e:
rows.append({
"Model": name, "Rank": "-", "Similarity": "-",
"Tweet (clean)": f"[Error: {e}]", "Tweet (orig)": ""
})
return pd.DataFrame(rows, columns=["Model","Rank","Similarity","Tweet (clean)","Tweet (orig)"])
# -------- Generation + scoring (with progress) --------
def generate_and_pick_best(prompt: str, n_sequences: int, max_length: int,
temperature: float, scorer_model_name: str,
progress=gr.Progress()):
progress(0.0, desc="Loading models…")
gen = load_generator()
scorer = load_sentence_model(EMBEDDERS[scorer_model_name])
progress(0.3, desc="Generating candidates…")
outputs = gen(
prompt,
max_new_tokens=int(max_length), # number of NEW tokens to generate
num_return_sequences=int(n_sequences),
do_sample=True,
temperature=float(temperature),
pad_token_id=50256,
)
candidates = [o["generated_text"].strip() for o in outputs]
progress(0.7, desc="Scoring candidates…")
q = _encode_norm(scorer, [prompt])
cand_vecs = _encode_norm(scorer, candidates)
sims = cosine_similarity(q, cand_vecs)[0]
best_idx = int(sims.argmax())
table = pd.DataFrame({
"Rank": np.argsort(-sims) + 1,
"Similarity": np.sort(sims)[::-1],
"Generated Tweet": [c for _, c in sorted(zip(-sims, candidates))]
})
progress(1.0)
return candidates[best_idx], float(sims[best_idx]), table
# ---------------- UI ----------------
with gr.Blocks(title="Sentiment140 Embeddings + Generation") as demo:
gr.Markdown(
"""
# 🧪 Sentiment140 — Embeddings & Tweet Generator
Type a tweet, get similar tweets from Sentiment140, and generate a new one.
"""
)
with gr.Row():
test_input = gr.Textbox(label="Your input", value=DEFAULT_INPUT, lines=2)
models = gr.CheckboxGroup(
choices=list(EMBEDDERS.keys()),
value=["MiniLM (fast)"],
label="Embedding models to compare",
)
run_btn = gr.Button("🔎 Find Top‑3 Similar Tweets")
table_out = gr.Dataframe(interactive=False)
run_btn.click(top3_for_each_model, inputs=[test_input, models], outputs=table_out)
gr.Markdown("---")
gr.Markdown("## 📝 Generate Tweets and Pick the Best")
with gr.Row():
n_seq = gr.Slider(1, 8, value=4, step=1, label="Number of candidates")
max_len = gr.Slider(20, 80, value=40, step=1, label="Max length (new tokens)")
temp = gr.Slider(0.7, 1.3, value=0.9, step=0.05, label="Temperature")
scorer_model = gr.Dropdown(list(EMBEDDERS.keys()), value="MiniLM (fast)", label="Scorer embedding")
gen_btn = gr.Button("✨ Generate & Score")
best_txt = gr.Textbox(label="Best generated tweet")
best_score = gr.Number(label="Similarity (best)")
gen_table = gr.Dataframe(interactive=False)
gen_btn.click(
generate_and_pick_best,
inputs=[test_input, n_seq, max_len, temp, scorer_model],
outputs=[best_txt, best_score, gen_table],
)
demo.queue(max_size=32).launch()
|