Spaces:
Sleeping
Sleeping
import os | |
import shutil | |
from fastapi import FastAPI, Request, HTTPException, Depends | |
from fastapi.responses import JSONResponse | |
from fastapi.middleware.cors import CORSMiddleware | |
from sentence_transformers import SentenceTransformer, util | |
import torch | |
import requests | |
from typing import List, Dict, Optional | |
from pydantic import BaseModel | |
# Rate limiting | |
from slowapi import Limiter, _rate_limit_exceeded_handler | |
from slowapi.util import get_remote_address | |
from slowapi.errors import RateLimitExceeded | |
# Configuration | |
class Config: | |
SUPABASE_URL = "https://olbjfxlclotxtnpjvpfj.supabase.co" | |
SUPABASE_KEY = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJzdXBhYmFzZSIsInJlZiI6Im9sYmpmeGxjbG90eHRucGp2cGZqIiwicm9sZSI6ImFub24iLCJpYXQiOjE3NTIyMzYwMDEsImV4cCI6MjA2NzgxMjAwMX0.7q_o5DCFEAAysnWXMChH4MI5qNhIVc4OgpT5JvgYxc0" | |
MODEL_NAME = "sentence-transformers/paraphrase-MiniLM-L3-v2" | |
SIMILARITY_THRESHOLD = 0.7 | |
HF_CACHE = "/tmp/hf" | |
RATE_LIMIT = "10/minute" | |
# Initialize FastAPI | |
app = FastAPI(title="Biruu Chatbot API", version="1.0.0") | |
# CORS Middleware | |
app.add_middleware( | |
CORSMiddleware, | |
allow_origins=["*"], | |
allow_methods=["*"], | |
allow_headers=["*"], | |
) | |
# Rate Limiter | |
limiter = Limiter(key_func=get_remote_address) | |
app.state.limiter = limiter | |
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler) | |
# Setup Hugging Face cache | |
os.makedirs(Config.HF_CACHE, exist_ok=True) | |
os.environ["TRANSFORMERS_CACHE"] = Config.HF_CACHE | |
os.environ["HF_HOME"] = Config.HF_CACHE | |
# Clean up locked cache | |
lock_file = f"{Config.HF_CACHE}/models--{Config.MODEL_NAME.replace('/', '--')}.lock" | |
if os.path.exists(lock_file): | |
os.remove(lock_file) | |
model_cache = f"{Config.HF_CACHE}/models--{Config.MODEL_NAME.replace('/', '--')}" | |
if os.path.exists(model_cache): | |
shutil.rmtree(model_cache, ignore_errors=True) | |
# Initialize model | |
try: | |
model = SentenceTransformer(Config.MODEL_NAME) | |
except Exception as e: | |
raise RuntimeError(f"Failed to load model: {str(e)}") | |
# Pydantic Models | |
class ChatMessage(BaseModel): | |
admin_id: str | |
session_id: str | |
is_bot: bool | |
is_admin: bool | |
message: str | |
class DeleteRequest(BaseModel): | |
id: Optional[str] = None | |
uid: Optional[str] = None | |
# Helper Functions | |
def make_supabase_request( | |
method: str, | |
endpoint: str, | |
params: Optional[Dict] = None, | |
data: Optional[Dict] = None | |
) -> requests.Response: | |
"""Generic function to make Supabase API requests""" | |
url = f"{Config.SUPABASE_URL}{endpoint}" | |
headers = { | |
"apikey": Config.SUPABASE_KEY, | |
"Authorization": f"Bearer {Config.SUPABASE_KEY}", | |
"Content-Type": "application/json" | |
} | |
try: | |
if method == "GET": | |
response = requests.get(url, headers=headers, params=params) | |
elif method == "POST": | |
response = requests.post(url, headers=headers, json=data) | |
elif method == "DELETE": | |
response = requests.delete(url, headers=headers) | |
else: | |
raise ValueError("Unsupported HTTP method") | |
response.raise_for_status() | |
return response | |
except requests.exceptions.RequestException as e: | |
raise HTTPException( | |
status_code=500, | |
detail=f"Supabase request failed: {str(e)}" | |
) | |
def get_faq_from_supabase(admin_id: str) -> List[Dict]: | |
"""Get FAQ items for a specific admin""" | |
try: | |
response = make_supabase_request( | |
"GET", | |
"/rest/v1/faq_items", | |
params={"admin_id": f"eq.{admin_id}"} | |
) | |
return response.json() | |
except HTTPException: | |
return [] | |
# API Endpoints | |
async def predict(request: Request): | |
try: | |
body = await request.json() | |
admin_id = body.get("data", [None, None])[0] | |
question = body.get("data", [None, None])[1] | |
if not admin_id or not question: | |
return JSONResponse( | |
{"data": ["Admin ID atau pertanyaan tidak valid"]}, | |
status_code=400 | |
) | |
# Get FAQs for this admin | |
faqs = get_faq_from_supabase(admin_id) | |
if not faqs: | |
return {"data": ["Maaf, belum ada FAQ yang tersedia."]} | |
# Process question | |
questions = [f["question"] for f in faqs] | |
answers = [f["answer"] for f in faqs] | |
# Get embeddings | |
embeddings = model.encode(questions, convert_to_tensor=True) | |
query_embedding = model.encode(question, convert_to_tensor=True) | |
# Calculate similarity | |
similarity = util.pytorch_cos_sim(query_embedding, embeddings) | |
best_idx = torch.argmax(similarity).item() | |
best_score = similarity[0][best_idx].item() | |
# Threshold similarity (minimal 0.3) | |
if best_score < 0.3: | |
return {"data": ["Maaf, saya tidak mengerti pertanyaan Anda"]} | |
return {"data": [answers[best_idx]]} | |
except Exception as e: | |
print(f"Error in prediction: {str(e)}") | |
return JSONResponse( | |
{"data": ["Terjadi kesalahan saat memproses pertanyaan"]}, | |
status_code=500 | |
) | |
async def save_chat(chat: ChatMessage): | |
"""Save chat message to database""" | |
try: | |
response = make_supabase_request( | |
"POST", | |
"/rest/v1/chat_logs", | |
data={ | |
"admin_id": chat.admin_id, | |
"session_id": chat.session_id, | |
"is_bot": chat.is_bot, | |
"is_admin": chat.is_admin, | |
"message": chat.message | |
} | |
) | |
saved_data = response.json()[0] | |
return { | |
"message": "Pesan berhasil disimpan", | |
"id": saved_data["id"] | |
} | |
except HTTPException as e: | |
raise e | |
except Exception as e: | |
raise HTTPException( | |
status_code=500, | |
detail=f"Failed to save chat: {str(e)}" | |
) | |
async def get_chat_history(admin_id: str, session_id: str): | |
"""Get chat history for specific session""" | |
try: | |
response = make_supabase_request( | |
"GET", | |
"/rest/v1/chat_logs", | |
params={ | |
"admin_id": f"eq.{admin_id}", | |
"or": f"(session_id.eq.{session_id},is_bot.eq.true)", | |
"order": "created_at.asc" | |
} | |
) | |
return response.json() | |
except HTTPException as e: | |
raise e | |
except Exception as e: | |
raise HTTPException( | |
status_code=500, | |
detail=f"Failed to get chat history: {str(e)}" | |
) | |
async def delete_chat(request: DeleteRequest): | |
"""Delete specific chat message""" | |
if not request.id: | |
raise HTTPException( | |
status_code=400, | |
detail="Message ID is required" | |
) | |
try: | |
make_supabase_request( | |
"DELETE", | |
f"/rest/v1/chat_logs?id=eq.{request.id}" | |
) | |
return {"message": f"Pesan dengan ID {request.id} berhasil dihapus."} | |
except HTTPException as e: | |
raise e | |
except Exception as e: | |
raise HTTPException( | |
status_code=500, | |
detail=f"Failed to delete message: {str(e)}" | |
) | |
async def delete_all_by_uid(request: DeleteRequest): | |
"""Delete all messages for specific user""" | |
if not request.uid: | |
raise HTTPException( | |
status_code=400, | |
detail="UID is required" | |
) | |
try: | |
make_supabase_request( | |
"DELETE", | |
f"/rest/v1/chat_logs?admin_id=eq.{request.uid}" | |
) | |
return {"message": f"Semua pesan untuk UID {request.uid} berhasil dihapus."} | |
except HTTPException as e: | |
raise e | |
except Exception as e: | |
raise HTTPException( | |
status_code=500, | |
detail=f"Failed to delete messages: {str(e)}" | |
) | |
async def health_check(): | |
"""Health check endpoint""" | |
return {"status": "healthy", "version": "1.0.0"} |