File size: 7,970 Bytes
c5cf83b
 
3814bb1
c5cf83b
 
3814bb1
c5cf83b
 
 
 
3814bb1
c5cf83b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a54bd7b
c5cf83b
 
 
889615c
 
c5cf83b
889615c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c5cf83b
889615c
 
 
 
 
 
 
 
 
 
c5cf83b
 
889615c
c5cf83b
 
889615c
c5cf83b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3814bb1
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
# app.py
import os, re, functools, numpy as np, pandas as pd
import gradio as gr
from datasets import load_dataset
from sklearn.metrics.pairwise import cosine_similarity

# -------- Config --------
SAMPLE_SIZE = int(os.getenv("SAMPLE_SIZE", "3000"))  # small by default for CPU Spaces
RANDOM_STATE = 42
DEFAULT_INPUT = "I am so happy with this product"

# -------- Helpers --------
def clean_text(text: str) -> str:
    text = (text or "").lower()
    text = re.sub(r"http\S+", "", text)
    text = re.sub(r"@\w+", "", text)
    text = re.sub(r"#\w+", "", text)
    text = re.sub(r"[^\w\s]", "", text)
    text = re.sub(r"\s+", " ", text).strip()
    return text

def _to_numpy(x):
    try:
        import torch
        if hasattr(torch, "Tensor") and isinstance(x, torch.Tensor):
            return x.detach().cpu().numpy()
    except Exception:
        pass
    return np.asarray(x)

def _l2norm(x: np.ndarray) -> np.ndarray:
    x = x.astype(np.float32, copy=False)
    if x.ndim == 1:
        x = x.reshape(1, -1)
    return x / (np.linalg.norm(x, axis=1, keepdims=True) + 1e-12)


# -------- Load sample data once (FAST: only a slice) --------
@functools.lru_cache(maxsize=1)
def load_sample_df():
   
    import pandas as pd

    try:
        from datasets import load_dataset
        ds = load_dataset("sentiment140", split=f"train[:{SAMPLE_SIZE}]")
        df = ds.to_pandas()
    except Exception:
        try:
            from datasets import load_dataset
            ds = load_dataset("tweet_eval", "sentiment", split=f"train[:{SAMPLE_SIZE}]")
            df = ds.to_pandas().rename(columns={"text": "text"})
        except Exception:
            fallback_texts = [
                "I love this product!", "This is terrible...", "Best purchase ever",
                "Pretty good overall", "I am not happy with the service",
                "Absolutely fantastic experience", "Could be better", "Super satisfied",
                "Worst ever", "Not bad at all", "Amazing quality", "I will buy again"
            ]
            return pd.DataFrame({"text": fallback_texts, "clean_text": fallback_texts})

    df = df.dropna(subset=["text"]).copy()
    df["text_length"] = df["text"].astype(str).str.len()
    df = df[(df["text_length"] >= 5) & (df["text_length"] <= 280)].copy()

    def clean_text(t):
        import re
        t = str(t).lower()
        t = re.sub(r"http\S+", "", t)
        t = re.sub(r"[@#]\w+", "", t)
        t = re.sub(r"[^a-z0-9\s.,!?']", " ", t)
        t = re.sub(r"\s+", " ", t).strip()
        return t

    df["clean_text"] = df["text"].apply(clean_text)
    df = df.sample(frac=1.0, random_state=RANDOM_STATE).reset_index(drop=True)

    return df[["text", "clean_text"]]


# -------- Lazy model loaders --------
@functools.lru_cache(maxsize=None)
def load_sentence_model(model_id: str):
    from sentence_transformers import SentenceTransformer
    return SentenceTransformer(model_id)

@functools.lru_cache(maxsize=None)
def load_generator():
    from transformers import pipeline, set_seed
    set_seed(RANDOM_STATE)
    return pipeline("text-generation", model="distilgpt2")

# HF model ids
EMBEDDERS = {
    "MiniLM (fast)": "sentence-transformers/all-MiniLM-L6-v2",
    "MPNet (heavier)": "sentence-transformers/all-mpnet-base-v2",
    "DistilRoBERTa (paraphrase)": "sentence-transformers/paraphrase-distilroberta-base-v1",
}

# Cache for corpus embeddings per model
_CORPUS_CACHE = {}

def _encode_norm(model, texts):
    """Encode compatibly across sentence-transformers versions; return L2-normalized numpy (n,d)."""
    out = model.encode(texts, show_progress_bar=False)
    out = _to_numpy(out)
    return _l2norm(out)

def ensure_corpus_embeddings(model_name: str, texts: list):
    if model_name in _CORPUS_CACHE:
        return _CORPUS_CACHE[model_name]
    model = load_sentence_model(EMBEDDERS[model_name])
    emb = _encode_norm(model, texts)
    _CORPUS_CACHE[model_name] = emb
    return emb

# -------- Retrieval --------
def top3_for_each_model(user_input: str, selected_models: list):
    df = load_sample_df()
    texts = df["clean_text"].tolist()
    rows = []
    for name in selected_models:
        try:
            model = load_sentence_model(EMBEDDERS[name])
            corpus_emb = ensure_corpus_embeddings(name, texts)
            q = _encode_norm(model, [clean_text(user_input)])
            sims = cosine_similarity(q, corpus_emb)[0]
            top_idx = sims.argsort()[-3:][::-1]
            for rank, i in enumerate(top_idx, start=1):
                rows.append({
                    "Model": name,
                    "Rank": rank,
                    "Similarity": float(sims[i]),
                    "Tweet (clean)": texts[i],
                    "Tweet (orig)": df.loc[i, "text"],
                })
        except Exception as e:
            rows.append({
                "Model": name, "Rank": "-", "Similarity": "-",
                "Tweet (clean)": f"[Error: {e}]", "Tweet (orig)": ""
            })
    return pd.DataFrame(rows, columns=["Model","Rank","Similarity","Tweet (clean)","Tweet (orig)"])

# -------- Generation + scoring (with progress) --------
def generate_and_pick_best(prompt: str, n_sequences: int, max_length: int,
                           temperature: float, scorer_model_name: str,
                           progress=gr.Progress()):
    progress(0.0, desc="Loading models…")
    gen = load_generator()
    scorer = load_sentence_model(EMBEDDERS[scorer_model_name])

    progress(0.3, desc="Generating candidates…")
    outputs = gen(
        prompt,
        max_new_tokens=int(max_length),   # number of NEW tokens to generate
        num_return_sequences=int(n_sequences),
        do_sample=True,
        temperature=float(temperature),
        pad_token_id=50256,
    )
    candidates = [o["generated_text"].strip() for o in outputs]

    progress(0.7, desc="Scoring candidates…")
    q = _encode_norm(scorer, [prompt])
    cand_vecs = _encode_norm(scorer, candidates)
    sims = cosine_similarity(q, cand_vecs)[0]
    best_idx = int(sims.argmax())

    table = pd.DataFrame({
        "Rank": np.argsort(-sims) + 1,
        "Similarity": np.sort(sims)[::-1],
        "Generated Tweet": [c for _, c in sorted(zip(-sims, candidates))]
    })
    progress(1.0)
    return candidates[best_idx], float(sims[best_idx]), table

# ---------------- UI ----------------
with gr.Blocks(title="Sentiment140 Embeddings + Generation") as demo:
    gr.Markdown(
        """
# 🧪 Sentiment140 — Embeddings & Tweet Generator
Type a tweet, get similar tweets from Sentiment140, and generate a new one.
        """
    )

    with gr.Row():
        test_input = gr.Textbox(label="Your input", value=DEFAULT_INPUT, lines=2)
        models = gr.CheckboxGroup(
            choices=list(EMBEDDERS.keys()),
            value=["MiniLM (fast)"],
            label="Embedding models to compare",
        )

    run_btn = gr.Button("🔎 Find Top‑3 Similar Tweets")
    table_out = gr.Dataframe(interactive=False)

    run_btn.click(top3_for_each_model, inputs=[test_input, models], outputs=table_out)

    gr.Markdown("---")
    gr.Markdown("## 📝 Generate Tweets and Pick the Best")

    with gr.Row():
        n_seq = gr.Slider(1, 8, value=4, step=1, label="Number of candidates")
        max_len = gr.Slider(20, 80, value=40, step=1, label="Max length (new tokens)")
        temp = gr.Slider(0.7, 1.3, value=0.9, step=0.05, label="Temperature")
        scorer_model = gr.Dropdown(list(EMBEDDERS.keys()), value="MiniLM (fast)", label="Scorer embedding")

    gen_btn = gr.Button("✨ Generate & Score")
    best_txt = gr.Textbox(label="Best generated tweet")
    best_score = gr.Number(label="Similarity (best)")
    gen_table = gr.Dataframe(interactive=False)

    gen_btn.click(
        generate_and_pick_best,
        inputs=[test_input, n_seq, max_len, temp, scorer_model],
        outputs=[best_txt, best_score, gen_table],
    )

demo.queue(max_size=32).launch()