Spaces:
Running
Running
from fastapi import FastAPI, Request, Header, HTTPException, Query | |
from fastapi.responses import HTMLResponse, JSONResponse | |
from fastapi.openapi.docs import get_swagger_ui_html | |
from fastapi.middleware.cors import CORSMiddleware | |
from pydantic import BaseModel | |
from datetime import datetime | |
import uuid | |
import os | |
import openai | |
from transformers import pipeline | |
import logging, traceback | |
from typing import Optional, List, Union | |
from model import ( | |
summarize_review, smart_summarize, detect_industry, | |
detect_product_category, detect_emotion, answer_followup, answer_only, | |
assess_churn_risk, extract_pain_points # β Added extract_pain_points | |
) | |
app = FastAPI( | |
title="π§ ChurnSight AI", | |
description="Multilingual GenAI for smarter feedback β summarization, sentiment, emotion, aspects, Q&A and tags.", | |
version="2025.1.0", | |
openapi_url="/openapi.json", | |
docs_url=None, | |
redoc_url="/redoc" | |
) | |
app.add_middleware( | |
CORSMiddleware, | |
allow_origins=["*"], | |
allow_credentials=True, | |
allow_methods=["*"], | |
allow_headers=["*"], | |
) | |
logging.basicConfig(level=logging.INFO) | |
VALID_API_KEY = "my-secret-key" | |
log_store = [] # β Shared in-memory churn log | |
def root(): | |
return "<h1>ChurnSight AI Backend is Running</h1>" | |
def custom_swagger_ui(): | |
return get_swagger_ui_html( | |
openapi_url=app.openapi_url, | |
title="π§ Swagger UI - ChurnSight AI", | |
swagger_favicon_url="https://cdn-icons-png.flaticon.com/512/3794/3794616.png", | |
swagger_js_url="https://cdn.jsdelivr.net/npm/swagger-ui-dist@4.18.3/swagger-ui-bundle.js", | |
swagger_css_url="https://cdn.jsdelivr.net/npm/swagger-ui-dist@4.18.3/swagger-ui.css", | |
) | |
async def exception_handler(request: Request, exc: Exception): | |
logging.error(f"Unhandled Exception: {traceback.format_exc()}") | |
return JSONResponse(status_code=500, content={"detail": "Internal Server Error. Please contact support."}) | |
# ==== SCHEMAS ==== | |
class ReviewInput(BaseModel): | |
text: str | |
model: str = "distilbert-base-uncased-finetuned-sst-2-english" | |
industry: Optional[str] = None | |
aspects: bool = False | |
follow_up: Optional[Union[str, List[str]]] = None | |
product_category: Optional[str] = None | |
device: Optional[str] = None | |
intelligence: Optional[bool] = False | |
verbosity: Optional[str] = "detailed" | |
class BulkReviewInput(BaseModel): | |
reviews: List[str] | |
model: str = "distilbert-base-uncased-finetuned-sst-2-english" | |
industry: Optional[List[str]] = None | |
aspects: bool = False | |
product_category: Optional[List[str]] = None | |
device: Optional[List[str]] = None | |
follow_up: Optional[List[Union[str, List[str]]]] = None | |
intelligence: Optional[bool] = False | |
explain_bulk: Optional[bool] = False | |
class FollowUpRequest(BaseModel): | |
text: str | |
question: str | |
verbosity: Optional[str] = "brief" | |
# ==== HELPERS ==== | |
def auto_fill(value: Optional[str], fallback: str) -> str: | |
if not value or value.lower() == "auto-detect": | |
return fallback | |
return value | |
# ==== ENDPOINTS ==== | |
async def analyze(data: ReviewInput, x_api_key: str = Header(None)): | |
if x_api_key and x_api_key != VALID_API_KEY: | |
raise HTTPException(status_code=401, detail="β Invalid API key") | |
if len(data.text.split()) < 20: | |
raise HTTPException(status_code=400, detail="β οΈ Review too short for analysis (min. 20 words).") | |
global log_store | |
try: | |
# === Generate Summary === | |
summary = ( | |
summarize_review(data.text, max_len=40, min_len=8) | |
if data.verbosity.lower() == "brief" | |
else smart_summarize(data.text, n_clusters=2 if data.intelligence else 1) | |
) | |
# === Sentiment + Emotion === | |
sentiment_pipeline = pipeline("sentiment-analysis", model=data.model) | |
sentiment = sentiment_pipeline(data.text)[0] | |
emotion = detect_emotion(data.text) | |
churn_risk = assess_churn_risk(sentiment["label"], emotion) | |
# === Auto-detect metadata === | |
industry = detect_industry(data.text) if not data.industry or "auto" in data.industry.lower() else data.industry | |
product_category = detect_product_category(data.text) if not data.product_category or "auto" in data.product_category.lower() else data.product_category | |
# === Optional: Pain Points === | |
pain_points = extract_pain_points(data.text) if data.aspects else [] | |
# === Log entry === | |
log_store.append({ | |
"timestamp": datetime.now(), | |
"product": product_category, | |
"churn_risk": churn_risk, | |
"user_id": str(uuid.uuid4()) | |
}) | |
if len(log_store) > 1000: | |
log_store = log_store[-1000:] | |
# === Final API Response === | |
response = { | |
"summary": summary, | |
"sentiment": sentiment, | |
"emotion": emotion, | |
"product_category": product_category, | |
"device": "Web", | |
"industry": industry, | |
"churn_risk": churn_risk, | |
"pain_points": pain_points | |
} | |
if data.follow_up: | |
response["follow_up"] = answer_followup(data.text, data.follow_up, verbosity=data.verbosity) | |
return response | |
except Exception as e: | |
logging.error(f"π₯ Unexpected analysis failure: {traceback.format_exc()}") | |
raise HTTPException(status_code=500, detail="Internal Server Error during analysis.") | |
async def followup(request: FollowUpRequest, x_api_key: str = Header(None)): | |
if x_api_key and x_api_key != VALID_API_KEY: | |
raise HTTPException(status_code=401, detail="Invalid API key") | |
try: | |
if not request.question or len(request.text.split()) < 10: | |
raise HTTPException(status_code=400, detail="Question or text is too short.") | |
return {"answer": answer_only(request.text, request.question)} | |
except Exception as e: | |
logging.error(f"β Follow-up failed: {traceback.format_exc()}") | |
raise HTTPException(status_code=500, detail="Follow-up generation failed.") | |
async def get_churn_log(x_api_key: str = Header(None)): | |
if x_api_key and x_api_key != VALID_API_KEY: | |
raise HTTPException(status_code=401, detail="Unauthorized") | |
return {"log": log_store} | |
async def bulk_analyze(data: BulkReviewInput, token: str = Query(None)): | |
if token != VALID_API_KEY: | |
raise HTTPException(status_code=401, detail="β Unauthorized: Invalid API token") | |
global log_store | |
try: | |
results = [] | |
sentiment_pipeline = pipeline("sentiment-analysis", model=data.model) | |
for i, review_text in enumerate(data.reviews): | |
if not review_text.strip(): | |
continue # Skip empty reviews | |
if len(review_text.split()) < 20: | |
results.append({ | |
"review": review_text, | |
"error": "Too short to analyze" | |
}) | |
continue | |
summary = smart_summarize(review_text, n_clusters=2 if data.intelligence else 1) | |
sentiment = sentiment_pipeline(review_text)[0] | |
emotion = detect_emotion(review_text) | |
churn = assess_churn_risk(sentiment["label"], emotion) | |
pain = extract_pain_points(review_text) if data.aspects else [] | |
ind = auto_fill(data.industry[i] if data.industry else None, detect_industry(review_text)) | |
prod = auto_fill(data.product_category[i] if data.product_category else None, detect_product_category(review_text)) | |
dev = auto_fill(data.device[i] if data.device else None, "Web") | |
result = { | |
"review": review_text, | |
"summary": summary, | |
"sentiment": sentiment["label"], | |
"score": sentiment["score"], | |
"emotion": emotion, | |
"industry": ind, | |
"product_category": prod, | |
"device": dev, | |
"churn_risk": churn, | |
"pain_points": pain | |
} | |
# β Optional follow-up | |
if data.follow_up and i < len(data.follow_up): | |
follow_q = data.follow_up[i] | |
result["follow_up"] = answer_followup(review_text, follow_q) | |
# β Log churn entry | |
log_store.append({ | |
"timestamp": datetime.now(), | |
"product": prod, | |
"churn_risk": churn, | |
"user_id": str(uuid.uuid4()) | |
}) | |
results.append(result) | |
# β Cap log size | |
if len(log_store) > 1000: | |
log_store = log_store[-1000:] | |
return {"results": results} | |
except Exception as e: | |
logging.error(f"π₯ Bulk processing failed: {traceback.format_exc()}") | |
raise HTTPException(status_code=500, detail="Failed to analyze bulk reviews") | |
# Already set with os.environ β nothing else needed | |
async def root_cause_analysis(payload: dict, x_api_key: str = Header(None)): | |
if x_api_key and x_api_key != VALID_API_KEY: | |
raise HTTPException(status_code=401, detail="Invalid API key") | |
try: | |
text = payload.get("text", "").strip() | |
if not text or len(text.split()) < 5: | |
raise HTTPException(status_code=400, detail="Insufficient input for root cause analysis.") | |
prompt = f""" | |
Analyze the following customer feedback and extract: | |
1. The main problem | |
2. The possible root cause | |
3. A suggested fix or which team might need to handle it | |
Feedback: '''{text}''' | |
Format your answer as: | |
Problem: ... | |
Cause: ... | |
Suggestion: ... | |
""" | |
# Models to try in order | |
models_to_try = ["gpt-4", "gpt-4o-mini", "gpt-3.5-turbo"] | |
last_error = None | |
for model_name in models_to_try: | |
try: | |
response = openai.chat.completions.create( | |
model=model_name, | |
messages=[{"role": "user", "content": prompt}] | |
) | |
output = response.choices[0].message.content | |
lines = output.splitlines() | |
def extract_line(tag): | |
for line in lines: | |
if line.lower().startswith(tag.lower()): | |
return line.split(":", 1)[-1].strip() | |
return "β" | |
return { | |
"problem": extract_line("Problem"), | |
"cause": extract_line("Cause"), | |
"suggestion": extract_line("Suggestion"), | |
"model_used": model_name | |
} | |
except Exception as e: | |
last_error = str(e) | |
logging.warning(f"Model {model_name} failed: {last_error}") | |
continue | |
# If all models fail | |
raise HTTPException(status_code=500, detail=f"All model attempts failed. Last error: {last_error}") | |
except Exception as e: | |
logging.error(f"Root cause analysis failed: {traceback.format_exc()}") | |
return JSONResponse(status_code=500, content={"detail": f"Root cause generation failed: {str(e)}"}) | |