import streamlit as st import requests import os import json import uuid from datetime import datetime, timedelta from sentence_transformers import SentenceTransformer import chromadb from langchain_text_splitters import RecursiveCharacterTextSplitter import re import shutil from git import Repo # Page configuration st.set_page_config( page_title="RAG Chat Flow ✘", page_icon="✘", initial_sidebar_state="expanded" ) # Initialize dark mode state if 'dark_mode' not in st.session_state: st.session_state.dark_mode = False # Define personality questions - reduced to general ones PERSONALITY_QUESTIONS = [ "What is [name]'s personality like?", "What does [name] do for work?", "What are [name]'s hobbies?", "What makes [name] special?", "Tell me about [name]" ] # Enhanced CSS styling with dark mode support def get_css_styles(): if st.session_state.dark_mode: return """ """ else: return """ """ # Apply CSS styles st.markdown(get_css_styles(), unsafe_allow_html=True) # File paths HISTORY_FILE = "rag_chat_history.json" SESSIONS_FILE = "rag_chat_sessions.json" USERS_FILE = "online_users.json" # ================= GITHUB INTEGRATION ================= def clone_github_repo(): """Clone or update GitHub repository with documents""" github_token = os.getenv("GITHUB_TOKEN") if not github_token: st.error("🔑 GITHUB_TOKEN not found in environment variables") return False try: repo_url = f"https://{github_token}@github.com/Umer-K/family-profiles.git" repo_dir = "family_profiles" # Clean up existing directory if it exists if os.path.exists(repo_dir): shutil.rmtree(repo_dir) # Clone the repository with st.spinner("🔄 Cloning private repository..."): Repo.clone_from(repo_url, repo_dir) # Copy txt files to documents folder documents_dir = "documents" os.makedirs(documents_dir, exist_ok=True) # Clear existing documents for file in os.listdir(documents_dir): if file.endswith('.txt'): os.remove(os.path.join(documents_dir, file)) # Copy new txt files from repo txt_files_found = 0 for root, dirs, files in os.walk(repo_dir): for file in files: if file.endswith('.txt'): src_path = os.path.join(root, file) dst_path = os.path.join(documents_dir, file) shutil.copy2(src_path, dst_path) txt_files_found += 1 # Clean up repo directory shutil.rmtree(repo_dir) st.success(f"✅ Successfully synced {txt_files_found} documents from GitHub!") return True except Exception as e: st.error(f"❌ GitHub sync failed: {str(e)}") return False def check_github_status(): """Check GitHub token availability and repo access""" github_token = os.getenv("GITHUB_TOKEN") if not github_token: return { "status": "missing", "message": "No GitHub token found", "color": "red" } try: # Test token by making a simple API call headers = { "Authorization": f"token {github_token}", "Accept": "application/vnd.github.v3+json" } response = requests.get( "https://api.github.com/repos/Umer-K/family-profiles", headers=headers, timeout=10 ) if response.status_code == 200: return { "status": "connected", "message": "GitHub access verified", "color": "green" } elif response.status_code == 404: return { "status": "not_found", "message": "Repository not found or no access", "color": "orange" } elif response.status_code == 401: return { "status": "unauthorized", "message": "Invalid GitHub token", "color": "red" } else: return { "status": "error", "message": f"GitHub API error: {response.status_code}", "color": "orange" } except Exception as e: return { "status": "error", "message": f"Connection error: {str(e)}", "color": "orange" } # ================= RAG SYSTEM CLASS ================= @st.cache_resource def initialize_rag_system(): """Initialize RAG system with caching""" return ProductionRAGSystem() class ProductionRAGSystem: def __init__(self, collection_name="streamlit_rag_docs"): self.collection_name = collection_name # Initialize embedding model try: self.model = SentenceTransformer('all-mpnet-base-v2') except Exception as e: st.error(f"Error loading embedding model: {e}") self.model = None return # Initialize ChromaDB try: self.client = chromadb.PersistentClient(path="./chroma_db") try: self.collection = self.client.get_collection(collection_name) except: self.collection = self.client.create_collection(collection_name) except Exception as e: st.error(f"Error initializing ChromaDB: {e}") self.client = None return # Initialize text splitter self.text_splitter = RecursiveCharacterTextSplitter( chunk_size=800, chunk_overlap=100, length_function=len, separators=["\n\n", "\n", ". ", " ", ""] ) def get_collection_count(self): """Get number of documents in collection""" try: return self.collection.count() if self.collection else 0 except: return 0 def load_documents_from_folder(self, folder_path="documents"): """Load documents from folder""" if not os.path.exists(folder_path): return [] txt_files = [f for f in os.listdir(folder_path) if f.endswith('.txt')] if not txt_files: return [] all_chunks = [] for filename in txt_files: filepath = os.path.join(folder_path, filename) try: with open(filepath, 'r', encoding='utf-8') as f: content = f.read().strip() if content: chunks = self.text_splitter.split_text(content) for i, chunk in enumerate(chunks): all_chunks.append({ 'content': chunk, 'source_file': filename, 'chunk_index': i, 'char_count': len(chunk) }) except Exception as e: st.error(f"Error reading {filename}: {e}") return all_chunks def index_documents(self, document_folder="documents"): """Index documents with progress bar""" if not self.model or not self.client: return False chunks = self.load_documents_from_folder(document_folder) if not chunks: return False # Clear existing collection try: self.client.delete_collection(self.collection_name) self.collection = self.client.create_collection(self.collection_name) except: pass # Create embeddings with progress bar progress_bar = st.progress(0) status_text = st.empty() chunk_texts = [chunk['content'] for chunk in chunks] try: status_text.text("Creating embeddings...") embeddings = self.model.encode(chunk_texts, show_progress_bar=False) status_text.text("Storing in database...") for i, (chunk, embedding) in enumerate(zip(chunks, embeddings)): chunk_id = f"{chunk['source_file']}_{chunk['chunk_index']}" metadata = { "source_file": chunk['source_file'], "chunk_index": chunk['chunk_index'], "char_count": chunk['char_count'] } self.collection.add( documents=[chunk['content']], ids=[chunk_id], embeddings=[embedding.tolist()], metadatas=[metadata] ) progress_bar.progress((i + 1) / len(chunks)) progress_bar.empty() status_text.empty() return True except Exception as e: st.error(f"Error during indexing: {e}") progress_bar.empty() status_text.empty() return False def expand_query_with_family_terms(self, query): """Expand query to include family relationship synonyms""" family_mappings = { 'mother': ['mama', 'mom', 'ammi'], 'mama': ['mother', 'mom', 'ammi'], 'father': ['papa', 'dad', 'abbu'], 'papa': ['father', 'dad', 'abbu'], 'brother': ['bhai', 'bro'], 'bhai': ['brother', 'bro'], 'sister': ['behn', 'sis'], 'behn': ['sister', 'sis'] } expanded_terms = [query] query_lower = query.lower() for key, synonyms in family_mappings.items(): if key in query_lower: for synonym in synonyms: expanded_terms.append(query_lower.replace(key, synonym)) return expanded_terms def search(self, query, n_results=5): """Search for relevant chunks with family relationship mapping""" if not self.model or not self.collection: return None try: # Expand query with family terms expanded_queries = self.expand_query_with_family_terms(query) all_results = [] # Search with all expanded terms for search_query in expanded_queries: query_embedding = self.model.encode([search_query])[0].tolist() results = self.collection.query( query_embeddings=[query_embedding], n_results=n_results ) if results['documents'][0]: for chunk, distance, metadata in zip( results['documents'][0], results['distances'][0], results['metadatas'][0] ): similarity = max(0, 1 - distance) all_results.append({ 'content': chunk, 'metadata': metadata, 'similarity': similarity, 'query_used': search_query }) if not all_results: return None # Remove duplicates and sort by similarity seen_chunks = set() unique_results = [] for result in all_results: chunk_id = f"{result['metadata']['source_file']}_{result['content'][:50]}" if chunk_id not in seen_chunks: seen_chunks.add(chunk_id) unique_results.append(result) # Sort by similarity and take top results unique_results.sort(key=lambda x: x['similarity'], reverse=True) search_results = unique_results[:n_results] # Debug: Show search results for troubleshooting print(f"Search for '{query}' (expanded to {len(expanded_queries)} terms) found {len(search_results)} results") for i, result in enumerate(search_results[:3]): print(f" {i+1}. Similarity: {result['similarity']:.3f} | Source: {result['metadata']['source_file']} | Query: {result['query_used']}") print(f" Content preview: {result['content'][:100]}...") return search_results except Exception as e: st.error(f"Search error: {e}") return None def extract_direct_answer(self, query, content): """Extract direct answer from content""" query_lower = query.lower() sentences = re.split(r'[.!?]+', content) sentences = [s.strip() for s in sentences if len(s.strip()) > 20] query_words = set(query_lower.split()) scored_sentences = [] for sentence in sentences: sentence_words = set(sentence.lower().split()) exact_matches = len(query_words.intersection(sentence_words)) # Bonus scoring for key terms bonus_score = 0 if '401k' in query_lower and ('401' in sentence.lower() or 'retirement' in sentence.lower()): bonus_score += 3 if 'sick' in query_lower and 'sick' in sentence.lower(): bonus_score += 3 if 'vacation' in query_lower and 'vacation' in sentence.lower(): bonus_score += 3 total_score = exact_matches * 2 + bonus_score if total_score > 0: scored_sentences.append((sentence, total_score)) if scored_sentences: scored_sentences.sort(key=lambda x: x[1], reverse=True) best_sentence = scored_sentences[0][0] if not best_sentence.endswith('.'): best_sentence += '.' return best_sentence # Fallback for sentence in sentences: if len(sentence) > 30: return sentence + ('.' if not sentence.endswith('.') else '') return content[:200] + "..." def generate_answer(self, query, search_results, use_ai_enhancement=True, unlimited_tokens=False): """Generate both AI and extracted answers with proper token handling""" if not search_results: return { 'ai_answer': "No information found in documents.", 'extracted_answer': "No information found in documents.", 'sources': [], 'confidence': 0, 'has_both': False } best_result = search_results[0] sources = list(set([r['metadata']['source_file'] for r in search_results[:2]])) avg_confidence = sum(r['similarity'] for r in search_results[:2]) / len(search_results[:2]) # Always generate extracted answer extracted_answer = self.extract_direct_answer(query, best_result['content']) # Try AI answer if requested and API key available ai_answer = None openrouter_key = os.environ.get("OPENROUTER_API_KEY") if use_ai_enhancement and openrouter_key: # Build context from search results context = "\n\n".join([f"Source: {r['metadata']['source_file']}\nContent: {r['content']}" for r in search_results[:3]]) # Create focused prompt for rich, engaging family responses if unlimited_tokens: prompt = f"""You are a warm, caring family assistant who knows everyone well. Based on the family information below, provide a rich, detailed, and engaging response. Family Document Context: {context} Question: {query} Instructions: - Use the document information as your foundation - Expand with logical personality traits and qualities someone like this would have - Add 3-4 additional lines of thoughtful insights about their character - Use 5-6 relevant emojis throughout the response to make it warm and engaging - Write in a caring, family-friend tone - If someone asks about relationships (like "mother" = "mama"), make those connections - Make the response feel personal and detailed, not just a basic fact - Include both strengths and endearing qualities - Keep it warm but informative (4-6 sentences total) - Sprinkle emojis naturally throughout, not just at the end Remember: You're helping someone learn about their family members in a meaningful way! 💝""" max_tokens = 400 # Increased for richer responses temperature = 0.3 # Slightly more creative else: # Shorter but still enhanced prompt for conservative mode prompt = f"""Based on this family info: {extracted_answer} Question: {query} Give a warm, detailed answer with 3-4 emojis spread throughout. Add 2-3 more qualities this person likely has. Make it caring and personal! 💝""" max_tokens = 150 # Better than 50 for family context temperature = 0.2 try: response = requests.post( "https://openrouter.ai/api/v1/chat/completions", headers={ "Authorization": f"Bearer {openrouter_key}", "Content-Type": "application/json", "HTTP-Referer": "https://huggingface.co/spaces", "X-Title": "RAG Chatbot" }, json={ "model": "openai/gpt-3.5-turbo", "messages": [{"role": "user", "content": prompt}], "max_tokens": max_tokens, "temperature": temperature }, timeout=15 ) if response.status_code == 200: ai_response = response.json()['choices'][0]['message']['content'].strip() ai_answer = ai_response if len(ai_response) > 10 else extracted_answer else: # Log the actual error for debugging error_detail = "" try: error_detail = response.json().get('error', {}).get('message', '') except: pass if response.status_code == 402: st.warning("💳 OpenRouter credits exhausted. Using extracted answers only.") elif response.status_code == 429: st.warning("⏱️ Rate limit reached. Using extracted answers only.") elif response.status_code == 401: st.error("🔑 Invalid API key. Check your OpenRouter key.") elif response.status_code == 400: st.error(f"❌ Bad request: {error_detail}") else: st.warning(f"API Error {response.status_code}: {error_detail}. Using extracted answers only.") except requests.exceptions.Timeout: st.warning("⏱️ API timeout. Using extracted answers only.") except Exception as e: st.warning(f"API Exception: {str(e)}. Using extracted answers only.") return { 'ai_answer': ai_answer, 'extracted_answer': extracted_answer, 'sources': sources, 'confidence': avg_confidence, 'has_both': ai_answer is not None } def get_general_ai_response(query, unlimited_tokens=False): """Get AI response for general questions with family-friendly enhancement""" openrouter_key = os.environ.get("OPENROUTER_API_KEY") if not openrouter_key: return "I can only answer questions about your family members from the uploaded documents. Please add an OpenRouter API key for general conversations. 💝" try: # Adjust parameters based on token availability if unlimited_tokens: max_tokens = 350 # Good limit for detailed family responses temperature = 0.5 prompt = f"""You are a caring family assistant. Someone is asking about their family but I couldn't find specific information in their family documents. Question: {query} Please provide a warm, helpful response that: - Acknowledges I don't have specific information about their family member - Suggests they might want to add more details to their family profiles - Offers to help in other ways - Uses a caring, family-friendly tone with appropriate emojis - Keep it supportive and understanding 💝""" else: max_tokens = 100 # Reasonable for conservative mode temperature = 0.4 prompt = f"Family question: {query[:100]} - I don't have info about this family member. Give a caring, helpful response with emojis 💝" response = requests.post( "https://openrouter.ai/api/v1/chat/completions", headers={ "Authorization": f"Bearer {openrouter_key}", "Content-Type": "application/json", "HTTP-Referer": "https://huggingface.co/spaces", "X-Title": "RAG Chatbot" }, json={ "model": "openai/gpt-3.5-turbo", "messages": [{"role": "user", "content": prompt}], "max_tokens": max_tokens, "temperature": temperature }, timeout=15 ) if response.status_code == 200: return response.json()['choices'][0]['message']['content'].strip() else: # Get detailed error information error_detail = "" try: error_detail = response.json().get('error', {}).get('message', '') except: pass if response.status_code == 402: return "Sorry, OpenRouter credits exhausted. Please add more credits or top up your account." elif response.status_code == 429: return "Rate limit reached. Please try again in a moment." elif response.status_code == 401: return "Invalid API key. Please check your OpenRouter API key configuration." elif response.status_code == 400: return f"Bad request: {error_detail}. Please try rephrasing your question." else: return f"API error (Status: {response.status_code}): {error_detail}. Please try again." except requests.exceptions.Timeout: return "Request timeout. Please try again." except Exception as e: return f"Error: {str(e)}" def get_user_id(): """Get unique ID for this user session""" if 'user_id' not in st.session_state: st.session_state.user_id = str(uuid.uuid4())[:8] return st.session_state.user_id def update_online_users(): """Update user status""" try: users = {} if os.path.exists(USERS_FILE): with open(USERS_FILE, 'r') as f: users = json.load(f) user_id = get_user_id() users[user_id] = { 'last_seen': datetime.now().isoformat(), 'name': f'User-{user_id}', 'session_start': users.get(user_id, {}).get('session_start', datetime.now().isoformat()) } # Clean up old users current_time = datetime.now() active_users = {} for uid, data in users.items(): try: last_seen = datetime.fromisoformat(data['last_seen']) if current_time - last_seen < timedelta(minutes=5): active_users[uid] = data except: continue with open(USERS_FILE, 'w') as f: json.dump(active_users, f, indent=2) return len(active_users) except: return 1 def load_chat_history(): """Load chat history""" try: if os.path.exists(HISTORY_FILE): with open(HISTORY_FILE, 'r', encoding='utf-8') as f: return json.load(f) except: pass return [] def save_chat_history(messages): """Save chat history""" try: with open(HISTORY_FILE, 'w', encoding='utf-8') as f: json.dump(messages, f, ensure_ascii=False, indent=2) except Exception as e: st.error(f"Error saving history: {e}") def start_new_chat(): """Start new chat session""" st.session_state.messages = [] st.session_state.session_id = str(uuid.uuid4()) # ================= MAIN APP ================= # Initialize session state if "messages" not in st.session_state: st.session_state.messages = load_chat_history() if "session_id" not in st.session_state: st.session_state.session_id = str(uuid.uuid4()) # Initialize RAG system rag_system = initialize_rag_system() # Header with dark mode toggle col1, col2 = st.columns([4, 1]) with col1: st.title("RAG Chat Flow ✘") st.caption("Ask questions about your documents with AI-powered retrieval") with col2: # Dark mode toggle button mode_text = "🌞 Light" if st.session_state.dark_mode else "🌙 Dark" if st.button(mode_text, use_container_width=True): st.session_state.dark_mode = not st.session_state.dark_mode st.rerun() # Sidebar with st.sidebar: # New Chat Button if st.button("➕ New Chat", use_container_width=True, type="primary"): start_new_chat() st.rerun() st.divider() # Dark Mode Toggle in Sidebar too st.header("🎨 Theme") theme_status = "Dark Mode ✨" if st.session_state.dark_mode else "Light Mode ☀️" if st.button(f"🔄 Switch to {'Light' if st.session_state.dark_mode else 'Dark'} Mode", use_container_width=True): st.session_state.dark_mode = not st.session_state.dark_mode st.rerun() st.info(f"Current: {theme_status}") st.divider() # Personality Questions Section st.header("🎭 Personality Questions") # Name input for personalizing questions name_input = st.text_input("Enter name for personalized questions:", placeholder="First name only", help="Replace [name] in questions with this name") if name_input.strip(): name = name_input.strip() st.markdown(f"""
💫 Quick Questions for {name}:
Click any question to ask about {name}
""", unsafe_allow_html=True) # Display personality questions as clickable buttons for i, question in enumerate(PERSONALITY_QUESTIONS): formatted_question = question.replace("[name]", name) if st.button(formatted_question, key=f"pq_{i}", use_container_width=True): # Add the question to chat and set flag to process it user_message = {"role": "user", "content": formatted_question} st.session_state.messages.append(user_message) st.session_state.process_personality_question = formatted_question st.rerun() else: st.markdown("""
💫 Sample Questions:
Enter a name above to personalize these questions
""", unsafe_allow_html=True) # Show sample questions without names for question in PERSONALITY_QUESTIONS[:5]: # Show first 5 as examples st.markdown(f"• {question}") st.divider() # GitHub Integration st.header("🐙 GitHub Integration") github_status = check_github_status() if github_status["status"] == "connected": st.markdown(f"""
🟢 GitHub: {github_status['message']}
📂 Repo: family-profiles (private)
""", unsafe_allow_html=True) # Sync from GitHub button if st.button("🔄 Sync from GitHub", use_container_width=True): if clone_github_repo(): # Auto-index after successful sync if rag_system and rag_system.model: with st.spinner("Auto-indexing synced documents..."): if rag_system.index_documents("documents"): st.success("✅ Documents synced and indexed!") st.rerun() else: st.warning("⚠️ Sync successful but indexing failed") else: color_map = {"red": "🔴", "orange": "🟠", "green": "🟢"} color_icon = color_map.get(github_status["color"], "🔴") st.markdown(f"""
{color_icon} GitHub: {github_status['message']}
📋 Setup: Add GITHUB_TOKEN to Hugging Face secrets
""", unsafe_allow_html=True) st.divider() # Document Management st.header("📂 Document Management") if rag_system and rag_system.model: doc_count = rag_system.get_collection_count() if doc_count > 0: st.markdown(f"""
📊 Documents Indexed: {doc_count} chunks
🔍 Status: Ready for queries
""", unsafe_allow_html=True) else: st.warning("No documents indexed. Sync from GitHub or upload documents to get started.") # Document indexing if st.button("🔄 Re-index Documents", use_container_width=True): with st.spinner("Indexing documents..."): if rag_system.index_documents("documents"): st.success("Documents indexed successfully!") st.rerun() else: st.error("Failed to index documents. Check your documents folder.") # Show document count only (hidden) if os.path.exists("documents"): txt_files = [f for f in os.listdir("documents") if f.endswith('.txt')] if txt_files: st.info(f"📄 {len(txt_files)} documents loaded (hidden)") # Manual upload interface (fallback) st.subheader("📤 Manual Upload") uploaded_files = st.file_uploader( "Upload text files (fallback)", type=['txt'], accept_multiple_files=True, help="Upload .txt files if GitHub sync is not available" ) if uploaded_files: if st.button("💾 Save & Index Files"): os.makedirs("documents", exist_ok=True) saved_files = [] for uploaded_file in uploaded_files: file_path = os.path.join("documents", uploaded_file.name) with open(file_path, "wb") as f: f.write(uploaded_file.getbuffer()) saved_files.append(uploaded_file.name) st.success(f"Saved {len(saved_files)} files!") # Auto-index with st.spinner("Auto-indexing new documents..."): if rag_system.index_documents("documents"): st.success("Documents indexed successfully!") st.rerun() else: st.error("RAG system initialization failed. Check your setup.") st.divider() # Online Users st.header("👥 Online Users") online_count = update_online_users() if online_count == 1: st.success("🟢 Just you online") else: st.success(f"🟢 {online_count} people online") st.divider() # Settings st.header("⚙️ Settings") # API Status with better checking openrouter_key = os.environ.get("OPENROUTER_API_KEY") if openrouter_key: st.success(" ✅ API Connected") # Quick API test if st.button("Test API Connection", use_container_width=True): try: test_response = requests.post( "https://openrouter.ai/api/v1/chat/completions", headers={ "Authorization": f"Bearer {openrouter_key}", "Content-Type": "application/json" }, json={ "model": "openai/gpt-3.5-turbo", "messages": [{"role": "user", "content": "test"}], "max_tokens": 5 }, timeout=5 ) if test_response.status_code == 200: st.success("✅ API working correctly!") elif test_response.status_code == 402: st.error("❌ Credits exhausted") elif test_response.status_code == 429: st.warning("⏱️ Rate limited") else: st.error(f"❌ API Error: {test_response.status_code}") except Exception as e: st.error(f"❌ API Test Failed: {str(e)}") else: st.error("❌ No OpenRouter API Key") st.info("Add OPENROUTER_API_KEY in Hugging Face Space settings → Variables and secrets") # Enhanced Settings st.subheader("🚀 Token Settings") unlimited_tokens = st.checkbox("🔥 Unlimited Tokens Mode", value=True, help="Use higher token limits for detailed responses") use_ai_enhancement = st.checkbox("👾 AI Enhancement", value=bool(openrouter_key), help="Enhance answers with AI when documents are found") st.subheader("🎛️ Display Settings") show_sources = st.checkbox("📁 Show Sources", value=True) show_confidence = st.checkbox("🎯 Show Confidence Scores", value=True) # Token mode indicator if unlimited_tokens: st.success("🔥 Unlimited mode: Detailed responses enabled") else: st.info("💰 Conservative mode: Limited tokens to save credits") st.divider() # Chat History Controls st.header("💾 Chat History") if st.session_state.messages: st.info(f"Messages: {len(st.session_state.messages)}") col1, col2 = st.columns(2) with col1: if st.button("💾 Save", use_container_width=True): save_chat_history(st.session_state.messages) st.success("Saved!") with col2: if st.button("🗑️ Clear", use_container_width=True): start_new_chat() st.success("Cleared!") st.rerun() # ================= MAIN CHAT AREA ================= # Display chat messages for message in st.session_state.messages: with st.chat_message(message["role"]): if message["role"] == "assistant" and "rag_info" in message: # Display AI answer st.markdown(message["content"]) # Display RAG information rag_info = message["rag_info"] if show_sources and rag_info.get("sources"): confidence_text = f"{rag_info['confidence']*100:.1f}%" if show_confidence else "" st.markdown(f"""
📁 Sources: {', '.join(rag_info['sources'])}
🎯 Confidence: {confidence_text}
""", unsafe_allow_html=True) # Show extracted answer if different if rag_info.get("extracted_answer") and rag_info["extracted_answer"] != message["content"]: st.markdown("**📄 Extracted Answer:**") st.markdown(f"_{rag_info['extracted_answer']}_") else: st.markdown(message["content"]) # Check if we need to process a personality question if hasattr(st.session_state, 'process_personality_question'): prompt = st.session_state.process_personality_question del st.session_state.process_personality_question # Clear the flag # Display user message with st.chat_message("user"): st.markdown(prompt) # Process the question using the same logic as chat input # Update user tracking update_online_users() # Get RAG response with st.chat_message("assistant"): if rag_system and rag_system.model and rag_system.get_collection_count() > 0: # Search documents first search_results = rag_system.search(prompt, n_results=5) # Debug output for troubleshooting if search_results: st.info(f"🔍 Found {len(search_results)} potential matches. Best similarity: {search_results[0]['similarity']:.3f}") else: st.warning("🔍 No search results returned from vector database") # Check if we found relevant documents (very low threshold) if search_results and search_results[0]['similarity'] > 0.001: # Ultra-low threshold # Generate document-based answer result = rag_system.generate_answer( prompt, search_results, use_ai_enhancement=use_ai_enhancement, unlimited_tokens=unlimited_tokens ) # Display AI answer or extracted answer if use_ai_enhancement and result['has_both']: answer_text = result['ai_answer'] st.markdown(f"👾 **AI Enhanced Answer:** {answer_text}") # Also show extracted answer for comparison if different if result['extracted_answer'] != answer_text: with st.expander("📄 View Extracted Answer"): st.markdown(result['extracted_answer']) else: answer_text = result['extracted_answer'] st.markdown(f"📄 **Document Answer:** {answer_text}") # Show why AI enhancement wasn't used if use_ai_enhancement and not result['has_both']: st.info("💡 AI enhancement failed - showing extracted answer from documents") # Show RAG info with more details if show_sources and result['sources']: confidence_text = f"{result['confidence']*100:.1f}%" if show_confidence else "" st.markdown(f"""
📁 Sources: {', '.join(result['sources'])}
🎯 Confidence: {confidence_text}
📊 Found: {len(search_results)} relevant sections
🔍 Best Match: {search_results[0]['similarity']:.3f} similarity
""", unsafe_allow_html=True) # Add to messages with RAG info assistant_message = { "role": "assistant", "content": answer_text, "rag_info": { "sources": result['sources'], "confidence": result['confidence'], "extracted_answer": result['extracted_answer'], "has_ai": result['has_both'] } } else: # No relevant documents found - show debug info if search_results: st.warning(f"📄 Found documents but similarity too low (best: {search_results[0]['similarity']:.3f}). Using general AI...") else: st.warning("📄 No documents found in search. Using general AI...") general_response = get_general_ai_response(prompt, unlimited_tokens=unlimited_tokens) st.markdown(f"💬 **General AI:** {general_response}") assistant_message = { "role": "assistant", "content": general_response, "rag_info": {"sources": [], "confidence": 0, "mode": "general"} } else: # RAG system not ready - use general AI if rag_system and rag_system.get_collection_count() == 0: st.warning("No documents indexed. Sync from GitHub or upload documents first...") else: st.error("RAG system not ready. Using general AI mode...") general_response = get_general_ai_response(prompt, unlimited_tokens=unlimited_tokens) st.markdown(f"💬 **General AI:** {general_response}") assistant_message = { "role": "assistant", "content": general_response, "rag_info": {"sources": [], "confidence": 0, "mode": "general"} } # Add assistant message to history st.session_state.messages.append(assistant_message) # Auto-save save_chat_history(st.session_state.messages) # Chat input if prompt := st.chat_input("Ask questions about your documents..."): # Update user tracking update_online_users() # Add user message user_message = {"role": "user", "content": prompt} st.session_state.messages.append(user_message) # Display user message with st.chat_message("user"): st.markdown(prompt) # Get RAG response with st.chat_message("assistant"): if rag_system and rag_system.model and rag_system.get_collection_count() > 0: # Search documents first search_results = rag_system.search(prompt, n_results=5) # Debug output for troubleshooting if search_results: st.info(f"🔍 Found {len(search_results)} potential matches. Best similarity: {search_results[0]['similarity']:.3f}") else: st.warning("🔍 No search results returned from vector database") # Check if we found relevant documents (very low threshold) if search_results and search_results[0]['similarity'] > 0.001: # Ultra-low threshold # Generate document-based answer result = rag_system.generate_answer( prompt, search_results, use_ai_enhancement=use_ai_enhancement, unlimited_tokens=unlimited_tokens ) # Display AI answer or extracted answer if use_ai_enhancement and result['has_both']: answer_text = result['ai_answer'] st.markdown(f"👾 **AI Enhanced Answer:** {answer_text}") # Also show extracted answer for comparison if different if result['extracted_answer'] != answer_text: with st.expander("📄 View Extracted Answer"): st.markdown(result['extracted_answer']) else: answer_text = result['extracted_answer'] st.markdown(f"📄 **Document Answer:** {answer_text}") # Show why AI enhancement wasn't used if use_ai_enhancement and not result['has_both']: st.info("💡 AI enhancement failed - showing extracted answer from documents") # Show RAG info with more details if show_sources and result['sources']: confidence_text = f"{result['confidence']*100:.1f}%" if show_confidence else "" st.markdown(f"""
📁 Sources: {', '.join(result['sources'])}
🎯 Confidence: {confidence_text}
📊 Found: {len(search_results)} relevant sections
🔍 Best Match: {search_results[0]['similarity']:.3f} similarity
""", unsafe_allow_html=True) # Add to messages with RAG info assistant_message = { "role": "assistant", "content": answer_text, "rag_info": { "sources": result['sources'], "confidence": result['confidence'], "extracted_answer": result['extracted_answer'], "has_ai": result['has_both'] } } else: # No relevant documents found - show debug info if search_results: st.warning(f"📄 Found documents but similarity too low (best: {search_results[0]['similarity']:.3f}). Using general AI...") else: st.warning("📄 No documents found in search. Using general AI...") general_response = get_general_ai_response(prompt, unlimited_tokens=unlimited_tokens) st.markdown(f"💬 **General AI:** {general_response}") assistant_message = { "role": "assistant", "content": general_response, "rag_info": {"sources": [], "confidence": 0, "mode": "general"} } else: # RAG system not ready - use general AI if rag_system and rag_system.get_collection_count() == 0: st.warning("No documents indexed. Sync from GitHub or upload documents first...") else: st.error("RAG system not ready. Using general AI mode...") general_response = get_general_ai_response(prompt, unlimited_tokens=unlimited_tokens) st.markdown(f"💬 **General AI:** {general_response}") assistant_message = { "role": "assistant", "content": general_response, "rag_info": {"sources": [], "confidence": 0, "mode": "general"} } # Add assistant message to history st.session_state.messages.append(assistant_message) # Auto-save save_chat_history(st.session_state.messages) # Footer info if rag_system and rag_system.model: doc_count = rag_system.get_collection_count() token_mode = "🔥 Unlimited" if unlimited_tokens else "💰 Conservative" github_status = check_github_status() github_icon = "🟢" if github_status["status"] == "connected" else "🔴" theme_icon = "🌙" if st.session_state.dark_mode else "☀️" st.caption(f"📚 Knowledge Base: {doc_count} indexed chunks | 🔍 RAG System Active | {token_mode} Token Mode | {github_icon} GitHub {github_status['status'].title()} | {theme_icon} {theme_status}")