Spaces:
Running
Running
import os | |
os.environ["TRANSFORMERS_CACHE"] = "/tmp/hf-cache" | |
os.environ["HF_HOME"] = "/tmp/hf-home" | |
import nltk | |
nltk.download("punkt", download_dir="/tmp/nltk_data") | |
from sklearn.feature_extraction.text import TfidfVectorizer | |
from sklearn.cluster import KMeans | |
from sklearn.metrics.pairwise import cosine_similarity | |
from nltk.tokenize import sent_tokenize | |
from transformers import pipeline | |
import numpy as np | |
import logging | |
import re | |
# === Pipelines === | |
summarizer = pipeline("summarization", model="google/pegasus-xsum") | |
qa_pipeline = pipeline( | |
"question-answering", | |
model="distilbert-base-cased-distilled-squad", | |
tokenizer="distilbert-base-cased-distilled-squad" | |
) | |
emotion_model = pipeline("text-classification", model="j-hartmann/emotion-english-distilroberta-base", top_k=1) | |
# === Brief Summarization === | |
def summarize_review(text, max_len=100, min_len=30): | |
try: | |
result = summarizer(text, max_length=max_len, min_length=min_len, do_sample=False) | |
if result and isinstance(result, list) and "summary_text" in result[0]: | |
return result[0]["summary_text"] | |
else: | |
logging.warning("Summarizer output malformed, falling back.") | |
return text | |
except Exception as e: | |
logging.warning(f"Fallback to raw text due to summarization error: {e}") | |
return text | |
# === Smart Summarization with Clustering === | |
def smart_summarize(text, n_clusters=1): | |
try: | |
sentences = sent_tokenize(text) | |
if len(sentences) <= 1: | |
return text | |
tfidf = TfidfVectorizer(stop_words="english") | |
tfidf_matrix = tfidf.fit_transform(sentences) | |
if len(sentences) <= n_clusters: | |
return " ".join(sentences) | |
kmeans = KMeans(n_clusters=n_clusters, random_state=42).fit(tfidf_matrix) | |
summary_sentences = [] | |
for i in range(n_clusters): | |
idx = np.where(kmeans.labels_ == i)[0] | |
if not len(idx): | |
continue | |
avg_vector = np.asarray(tfidf_matrix[idx].mean(axis=0)) | |
sim = cosine_similarity(avg_vector, tfidf_matrix[idx].toarray()) | |
most_representative = sentences[idx[np.argmax(sim)]] | |
summary_sentences.append(most_representative) | |
return " ".join(sorted(summary_sentences, key=sentences.index)) | |
except Exception as e: | |
logging.error(f"Smart summarize error: {e}") | |
return text | |
# === Emotion Detection (Fixed) === | |
def detect_emotion(text): | |
if not text.strip(): | |
return "neutral" | |
try: | |
result = emotion_model(text, top_k=1) | |
if isinstance(result, list) and isinstance(result[0], dict): | |
return result[0]["label"] | |
elif isinstance(result, dict) and "label" in result: | |
return result["label"] | |
else: | |
return "neutral" | |
except Exception as e: | |
logging.warning(f"Emotion detection failed: {e}") | |
return "neutral" | |
# === Follow-up Q&A === | |
def answer_followup(text, question, verbosity="brief"): | |
try: | |
if not question: | |
return "No question provided." | |
if isinstance(question, list): | |
answers = [] | |
for q in question: | |
if not q.strip(): | |
continue | |
response = qa_pipeline({"question": q, "context": text}) | |
ans = response.get("answer", "") | |
answers.append(f"**{q}** → {ans}" if verbosity.lower() == "detailed" else ans) | |
return answers | |
else: | |
response = qa_pipeline({"question": question, "context": text}) | |
ans = response.get("answer", "") | |
return f"**{question}** → {ans}" if verbosity.lower() == "detailed" else ans | |
except Exception as e: | |
logging.warning(f"Follow-up error: {e}") | |
return "Sorry, I couldn't generate a follow-up answer." | |
# === Direct follow-up route handler === | |
def answer_only(text, question): | |
try: | |
if not question: | |
return "No question provided." | |
return qa_pipeline({"question": question, "context": text}).get("answer", "No answer found.") | |
except Exception as e: | |
logging.warning(f"Answer-only failed: {e}") | |
return "Q&A failed." | |
# === Explanation Generator === | |
def generate_explanation(text): | |
try: | |
explanation = summarizer(text, max_length=60, min_length=20, do_sample=False)[0]["summary_text"] | |
return f"🧠 This review can be explained as: {explanation}" | |
except Exception as e: | |
logging.warning(f"Explanation failed: {e}") | |
return "⚠️ Explanation could not be generated." | |
# === Churn Risk Estimator === | |
def assess_churn_risk(sentiment_label, emotion_label): | |
if sentiment_label.lower() == "negative" and emotion_label.lower() in ["anger", "fear", "sadness", "frustrated"]: | |
return "High Risk" | |
return "Low Risk" | |
# === Pain Point Extractor === | |
def extract_pain_points(text): | |
common_issues = [ | |
"slow", "crash", "lag", "expensive", "confusing", "noisy", "poor", "rude", | |
"unhelpful", "bug", "broken", "unresponsive", "not working", "error", "delay", "disconnect", | |
"incomplete", "overpriced", "difficult", "conflict", "unclear", "inconsistent", | |
"missing", "locked", "freeze", "freeze-up", "conflicting", "conflicting answers", "outdated" | |
] | |
text_lower = text.lower() | |
matches = [kw for kw in common_issues if re.search(rf"\b{re.escape(kw)}\b", text_lower)] | |
return list(set(matches))[:5] | |
# === Industry Detector === | |
def detect_industry(text): | |
text = text.lower() | |
if any(k in text for k in ["doctor", "hospital", "health", "pill", "med"]): return "Healthcare" | |
if any(k in text for k in ["flight", "hotel", "trip", "booking"]): return "Travel" | |
if any(k in text for k in ["bank", "loan", "credit", "payment"]): return "Banking" | |
if any(k in text for k in ["gym", "trainer", "fitness", "workout"]): return "Fitness" | |
if any(k in text for k in ["movie", "series", "stream", "video"]): return "Entertainment" | |
if any(k in text for k in ["game", "gaming", "console"]): return "Gaming" | |
if any(k in text for k in ["food", "delivery", "restaurant", "order"]): return "Food Delivery" | |
if any(k in text for k in ["school", "university", "teacher", "course"]): return "Education" | |
if any(k in text for k in ["insurance", "policy", "claim"]): return "Insurance" | |
if any(k in text for k in ["property", "rent", "apartment", "house"]): return "Real Estate" | |
if any(k in text for k in ["shop", "buy", "product", "phone", "amazon", "flipkart"]): return "E-commerce" | |
return "Generic" | |
# === Product Category Detector === | |
def detect_product_category(text): | |
text = text.lower() | |
if any(k in text for k in ["mobile", "smartphone", "iphone", "samsung", "phone"]): return "Mobile Devices" | |
if any(k in text for k in ["laptop", "macbook", "notebook", "chromebook"]): return "Laptops" | |
if any(k in text for k in ["tv", "refrigerator", "microwave", "washer"]): return "Home Appliances" | |
if any(k in text for k in ["watch", "band", "fitbit", "wearable"]): return "Wearables" | |
if any(k in text for k in ["app", "portal", "site", "website"]): return "Web App" | |
return "General" | |