import streamlit as st from streamlit_chat import message from streamlit_mic_recorder import speech_to_text import base64 import asyncio import edge_tts import gtts from hashlib import md5 from io import BytesIO from app.db import supabase import os import glob import time from dotenv import load_dotenv from uuid import uuid4 load_dotenv() # Bersihkan cache audio lama (opsional) def clean_old_cache(tts_dir="cache_tts", max_age_hours=12): now = time.time() for f in glob.glob(os.path.join(tts_dir, "*.mp3")): if os.stat(f).st_mtime < now - max_age_hours * 3600: os.remove(f) # Jalankan pembersihan saat startup clean_old_cache() def save_feedback_to_supabase(feedback_text): try: data = {"message": feedback_text} supabase.table("feedback").insert(data).execute() return True except Exception as e: st.error(f"Gagal menyimpan feedback: {e}") return False def initialize_session_state(): if 'history' not in st.session_state: st.session_state['history'] = [] if 'generated' not in st.session_state: st.session_state['generated'] = ["Halo! Saya bisa membantu anda menjawab pertanyaan seputar Politeknik Negeri Padang!"] if 'past' not in st.session_state: st.session_state['past'] = ["Hai! 👋"] if 'data_len' not in st.session_state: st.session_state['data_len'] = 0 if 'vector_store' not in st.session_state: st.session_state['vector_store'] = None if 'should_speak' not in st.session_state: st.session_state['should_speak'] = True if 'input_text' not in st.session_state: st.session_state['input_text'] = "" if 'tts_output' not in st.session_state: st.session_state['tts_output'] = "" if 'tts_played' not in st.session_state: st.session_state['tts_played'] = True # Jangan auto-generate session_id di sini agar tidak membuat sesi baru tiap refresh. # Session akan dibuat/ditetapkan hanya oleh ensure_active_session() berdasarkan data di DB. def _current_user(): return st.session_state.get('user') def save_message_to_supabase(user_id: str, session_id: str, role: str, content: str): try: supabase.table("chat_messages").insert({ "user_id": user_id, "session_id": session_id, "role": role, "content": content, }).execute() except Exception as e: # Non-fatal in UI; log to console print(f"Gagal menyimpan pesan: {e}") def load_history_from_supabase(user_id: str, session_id: str, limit: int = 100): try: res = ( supabase .table("chat_messages") .select("role, content, created_at") .eq("user_id", user_id) .eq("session_id", session_id) .order("created_at", desc=False) .limit(limit) .execute() ) rows = getattr(res, 'data', None) or [] past, generated, history = [], [], [] # Bootstrap greeting if empty if not rows: return past, generated, history for r in rows: if r["role"] == "user": past.append(r["content"]) elif r["role"] == "assistant": generated.append(r["content"]) # Build history pairs progressively when both sides exist # Reconstruct history as list of (user, assistant) tuples for i in range(min(len(past), len(generated))): history.append((past[i], generated[i])) return past, generated, history except Exception as e: print(f"Gagal memuat riwayat: {e}") return [], [], [] def create_chat_session(user_id: str, title: str = None) -> str: """Create a new chat session for user and return session_id.""" try: payload = {"user_id": user_id} if title: payload["title"] = title res = ( supabase .table("chat_sessions") .insert(payload) .execute() ) data = getattr(res, 'data', None) or [] if isinstance(data, list) and data: return data[0].get('id') or str(uuid4()) if isinstance(data, dict) and data.get('id'): return data['id'] return str(uuid4()) except Exception as e: print(f"Gagal membuat sesi: {e}") return str(uuid4()) def list_chat_sessions(user_id: str, limit: int = 20): """List recent chat sessions for a user, newest first.""" try: res = ( supabase .table("chat_sessions") .select("id, title, created_at") .eq("user_id", user_id) .order("created_at", desc=True) .limit(limit) .execute() ) data = getattr(res, 'data', None) or [] return data except Exception as e: print(f"Gagal memuat daftar sesi: {e}") return [] def delete_chat_session(user_id: str, session_id: str) -> bool: """Delete a single chat session and its messages for a user.""" try: # Delete messages first (FK safety) try: supabase.table("chat_messages").delete().eq("user_id", user_id).eq("session_id", session_id).execute() except Exception as e: # If messages table missing/empty, continue print(f"Info: hapus pesan sesi gagal/abaikan: {e}") # Delete the session row supabase.table("chat_sessions").delete().eq("user_id", user_id).eq("id", session_id).execute() return True except Exception as e: print(f"Gagal menghapus sesi: {e}") return False def delete_all_chat_sessions(user_id: str) -> bool: """Delete all chat sessions and all messages for a user.""" try: # Delete all messages for user try: supabase.table("chat_messages").delete().eq("user_id", user_id).execute() except Exception as e: print(f"Info: hapus semua pesan gagal/abaikan: {e}") # Delete all sessions for user supabase.table("chat_sessions").delete().eq("user_id", user_id).execute() return True except Exception as e: print(f"Gagal menghapus semua sesi: {e}") return False def ensure_chat_session(user_id: str, session_id: str, title: str = None) -> str: """Ensure a chat session with given id exists; create if missing. Returns the session id.""" try: # Check existence chk = ( supabase .table("chat_sessions") .select("id") .eq("id", session_id) .limit(1) .execute() ) data = getattr(chk, 'data', None) or [] if (isinstance(data, list) and data) or (isinstance(data, dict) and data.get('id')): return session_id except Exception: pass # Create with explicit id try: payload = {"id": session_id, "user_id": user_id} if title: payload["title"] = title ins = ( supabase .table("chat_sessions") .insert(payload) .execute() ) data = getattr(ins, 'data', None) or [] if isinstance(data, list) and data: return data[0].get('id', session_id) if isinstance(data, dict) and data.get('id'): return data.get('id', session_id) return session_id except Exception as e: print(f"Gagal memastikan sesi: {e}") return session_id def _generate_session_title_from_text(text: str, max_len: int = 60) -> str: """Generate a concise session title from the first user message.""" if not text: return "Percakapan Baru" # Normalize whitespace and strip t = " ".join(text.strip().split()) # Remove surrounding quotes or trailing punctuation if too noisy t = t.strip('"\'\u201c\u201d') if len(t) > max_len: t = t[:max_len - 1].rstrip() + "…" return t or "Percakapan Baru" def update_chat_session_title_if_empty(user_id: str, session_id: str, candidate_title: str) -> None: """If session has no title, set it to candidate_title.""" try: chk = ( supabase .table("chat_sessions") .select("id, title") .eq("id", session_id) .eq("user_id", user_id) .limit(1) .execute() ) rows = getattr(chk, 'data', None) or [] title_val = None if isinstance(rows, list) and rows: title_val = rows[0].get("title") elif isinstance(rows, dict): title_val = rows.get("title") if not title_val: safe_title = _generate_session_title_from_text(candidate_title) ( supabase .table("chat_sessions") .update({"title": safe_title}) .eq("id", session_id) .eq("user_id", user_id) .execute() ) except Exception as e: # Non-fatal; just log print(f"Gagal mengatur judul sesi: {e}") # edge-tts fallback (cadangan) async def generate_audio_edge(text, path, voice="id-ID-GadisNeural"): communicate = edge_tts.Communicate(text, voice=voice) await communicate.save(path) # fungsi utama TTS dengan fallback def text_to_speech(text): cache_dir = "cache_tts" os.makedirs(cache_dir, exist_ok=True) filename = f"{md5(text.encode()).hexdigest()}.mp3" path = os.path.join(cache_dir, filename) if not os.path.exists(path): try: # ✅ Utama: gTTS tts = gtts.gTTS(text, lang="id") tts.save(path) except Exception as e: print(f"[gTTS gagal] {e}") try: # ✅ Cadangan: edge-tts asyncio.run(generate_audio_edge(text, path)) except Exception as e2: print(f"[Edge-TTS juga gagal] {e2}") st.warning("🔇 Gagal membuat audio TTS.") return "" try: with open(path, "rb") as audio_file: audio_base64 = base64.b64encode(audio_file.read()).decode() return f""" """ except Exception as e: print(f"[Error saat membaca audio] {e}") return "" def conversation_chat(query, chain, history): # Save user message first user = _current_user() if user: try: save_message_to_supabase(user_id=user["id"], session_id=st.session_state.get('session_id'), role="user", content=query) except Exception: pass # Try auto-name the session on first user message try: update_chat_session_title_if_empty(user_id=user["id"], session_id=st.session_state.get('session_id'), candidate_title=query) except Exception: pass result = chain({"question": query, "chat_history": history}) answer = result["answer"] history.append((query, answer)) # Save assistant reply if user: try: save_message_to_supabase(user_id=user["id"], session_id=st.session_state.get('session_id'), role="assistant", content=answer) except Exception: pass return answer def display_chat_history(chain): reply_container = st.container() user_input_obj = st.chat_input("Masukkan pertanyaan", key="chat_input_field") col2, col3 = st.columns([1, 1]) # Tombol TTS Aktif / Nonaktif with col2: if st.button("🔊 Text-to-Speech Aktif" if st.session_state['should_speak'] else "🔇 Text-to-Speech Nonaktif", key="toggle_tts", help="Aktifkan/Nonaktifkan Text-to-Speech", use_container_width=True): st.session_state['should_speak'] = not st.session_state['should_speak'] st.rerun() # Tombol Input Suara with col3: stt_text = speech_to_text( start_prompt="🎤 Input Suara", stop_prompt="🛑 Stop", language='id', just_once=True, key='stt_input', use_container_width=True, ) # Jika ada STT if stt_text: st.session_state.input_text = stt_text st.rerun() # Ambil input user user_input = user_input_obj or st.session_state.get("input_text", "") if user_input: with st.spinner('Sedang membuat jawaban...'): output = conversation_chat(user_input, chain, st.session_state['history']) st.session_state['past'].append(user_input) st.session_state['generated'].append(output) st.session_state.input_text = "" # Reset flag supaya TTS siap memutar lagi if st.session_state['should_speak'] and output: st.session_state['tts_output'] = output st.session_state['tts_played'] = False # If user just logged in and no local history, try loading from DB if not st.session_state['history']: user = _current_user() if user: past, generated, history = load_history_from_supabase(user_id=user['id'], session_id=st.session_state.get('session_id')) if past or generated: st.session_state['past'] = past or st.session_state['past'] st.session_state['generated'] = generated or st.session_state['generated'] st.session_state['history'] = history or st.session_state['history'] # Tampilkan Riwayat Chat if st.session_state['generated']: with reply_container: for i in range(len(st.session_state['generated'])): message(st.session_state["past"][i], is_user=True, key=str(i) + '_user', avatar_style="no-avatar") message(st.session_state["generated"][i], key=str(i), avatar_style="no-avatar") # Pemutaran TTS if st.session_state.get('tts_output') and not st.session_state.get('tts_played'): st.markdown(text_to_speech(st.session_state['tts_output']), unsafe_allow_html=True) st.session_state['tts_played'] = True