""" Main Gradio interface for the Professional RAG Assistant. """ import gradio as gr import asyncio import threading import time import json import sys import signal import logging from typing import Any, Dict, List, Optional, Tuple, Union from pathlib import Path from concurrent.futures import ThreadPoolExecutor, TimeoutError as FutureTimeoutError def ensure_dict_safe(value: Any) -> Dict[str, Any]: """Ensure a value is a safe dictionary.""" if isinstance(value, dict): return value elif value is None or value == "" or value == []: return {} elif isinstance(value, str): # Handle HTML-formatted JSON strings from our HTML components if value.startswith('
') and value.endswith('
'): try: json_str = value[5:-6] # Remove
 and 
tags return json.loads(json_str) except: return {} elif value.startswith('{') and value.endswith('}'): try: return json.loads(value) except: return {} else: return {"value": value} elif isinstance(value, (list, tuple)): return {"items": list(value)} elif isinstance(value, (int, float, bool)): return {"value": value} else: return {"data": str(value)} def dict_to_html_json(value: Any) -> str: """Convert a value to HTML-formatted JSON for display.""" try: safe_dict = ensure_dict_safe(value) return f"
{json.dumps(safe_dict, indent=2)}
" except Exception: return "
{}
" from .themes import get_theme, get_custom_css from .components import ( create_header, create_file_upload_section, create_search_interface, create_results_display, create_document_management, create_system_status, create_analytics_dashboard, format_document_list, format_search_results, create_analytics_charts, format_system_overview, create_error_display, create_success_display, create_loading_display ) from .conversation_components import ( create_chat_interface, create_chat_history_display, create_chat_input, create_conversation_controls, create_conversation_state, create_conversation_info_panel, create_typing_indicator, create_conversation_analytics, update_conversation_analytics, update_chat_history, format_conversation_response, extract_conversation_stats, get_conversation_css, get_conversation_javascript, create_chat_message ) from .utils import ( save_uploaded_files, validate_file_types, cleanup_temp_files, generate_session_id, format_file_size, parse_search_filters ) sys.path.append(str(Path(__file__).parent.parent)) from src.rag_system import RAGSystem, EnhancedRAGSystem from src.error_handler import RAGError from src.logging_utils import setup_logging, get_safe_logger, sanitize_log_message class RAGInterface: """Main interface class for the RAG system.""" def __init__(self, config_path: str = None, use_enhanced_rag: bool = True): """Initialize the RAG interface.""" self.rag_system: Optional[Union[RAGSystem, EnhancedRAGSystem]] = None self.config_path = config_path or "config.yaml" self.use_enhanced_rag = use_enhanced_rag self.active_sessions: Dict[str, Dict[str, Any]] = {} self._initialization_lock = threading.Lock() self._initialized = False self._conversation_enabled = False # Setup logging with Unicode safety self.logger = get_safe_logger(__name__) # Initialize RAG system self._initialize_rag_system() def _initialize_rag_system(self) -> None: """Initialize the RAG system.""" try: with self._initialization_lock: if not self._initialized: if self.use_enhanced_rag: print("Initializing Enhanced RAG system with conversation capabilities...") try: self.rag_system = EnhancedRAGSystem(config_path=self.config_path) self._conversation_enabled = hasattr(self.rag_system, 'process_conversation') print(f"Enhanced RAG system initialized. Conversation enabled: {self._conversation_enabled}") except Exception as e: print(f"Failed to initialize Enhanced RAG system: {e}") print("Falling back to standard RAG system...") self.rag_system = RAGSystem(config_path=self.config_path) self._conversation_enabled = False else: print("Initializing standard RAG system...") self.rag_system = RAGSystem(config_path=self.config_path) self._conversation_enabled = False # Warm up the system warmup_result = self.rag_system.warmup() if warmup_result.get("success"): system_type = "Enhanced RAG" if self._conversation_enabled else "Standard RAG" print(f"{system_type} system initialized and warmed up successfully") self._initialized = True else: print(f"RAG system warmup failed: {warmup_result.get('error', {}).get('message')}") except Exception as e: print(f"Failed to initialize RAG system: {e}") self.rag_system = None def get_system_status(self) -> str: """Get current system status HTML.""" if not self.rag_system or not self._initialized: return create_error_display("System not initialized. Please check configuration and restart.") try: stats_response = self.rag_system.get_system_stats() if not stats_response.get("success"): return create_error_display(f"Failed to get system status: {stats_response.get('error', {}).get('message')}") stats = stats_response["data"] status_info = stats.get("status", {}) if status_info.get("ready"): status_message = f"System ready - {status_info.get('documents_indexed', 0)} documents indexed" return create_success_display(status_message) else: return create_error_display("System not ready") except Exception as e: return create_error_display(f"Error getting system status: {str(e)}") def process_documents( self, files: List[gr.File], session_id: str, progress=gr.Progress() ) -> Tuple[str, str, bool, str]: """Process uploaded documents.""" if not files: return ( create_error_display("No files uploaded"), create_error_display("Please select files to upload"), False, # upload button disabled "No documents uploaded yet." ) if not self.rag_system or not self._initialized: return ( create_error_display("System not initialized"), create_error_display("Please restart the application"), False, "No documents uploaded yet." ) try: self.logger.info(f"Starting document upload process - {len(files)} files received") # Validate file types allowed_extensions = [".pdf", ".docx", ".txt"] valid_files, validation_errors = validate_file_types(files, allowed_extensions) if validation_errors: self.logger.warning(f"File validation errors: {validation_errors}") error_html = create_error_display("\\n".join(validation_errors)) return error_html, error_html, len(valid_files) > 0, self.get_document_list() self.logger.info(f"File validation passed - {len(valid_files)} valid files") # Save uploaded files progress(0.1, desc="Saving uploaded files...") self.logger.info("Saving uploaded files to temporary directory...") saved_files = save_uploaded_files(valid_files) for file_path, original_name in saved_files: file_size = Path(file_path).stat().st_size / (1024 * 1024) # Size in MB self.logger.info(f"Saved file: {original_name} ({file_size:.2f} MB) -> {file_path}") if not saved_files: return ( create_error_display("No valid files to process"), create_error_display("Please check your files and try again"), False, self.get_document_list() ) # Process each file with timeout processed_count = 0 total_files = len(saved_files) results = [] timeout_seconds = 600 # 10 minutes def process_single_file(file_path, original_name, session_id): """Process a single file - to be run with timeout.""" self.logger.info(f"Processing file: {original_name}") start_time = time.time() result = self.rag_system.add_document( file_path=file_path, filename=original_name, user_session=session_id ) processing_time = time.time() - start_time self.logger.info(f"File processing completed: {original_name} (took {processing_time:.2f}s)") return result self.logger.info(f"Starting processing of {total_files} files with {timeout_seconds//60}-minute timeout per file") with ThreadPoolExecutor(max_workers=1) as executor: for i, (file_path, original_name) in enumerate(saved_files): progress((i + 1) / total_files * 0.8 + 0.1, desc=f"Processing {original_name}...") self.logger.info(f"Processing file {i+1}/{total_files}: {original_name}") try: # Submit the task with timeout future = executor.submit(process_single_file, file_path, original_name, session_id) result = future.result(timeout=timeout_seconds) if result.get("success"): processed_count += 1 chunks_created = result['data']['chunks_created'] # Log detailed success info self.logger.info(f"SUCCESS: {original_name} - {chunks_created} chunks created") # Log sample chunk info if available if 'sample_chunks' in result['data']: sample_chunks = result['data']['sample_chunks'] self.logger.info(f"Sample chunks from {original_name}:") for idx, chunk in enumerate(sample_chunks[:3]): # Show first 3 chunks chunk_preview = chunk['content'][:100] + "..." if len(chunk['content']) > 100 else chunk['content'] self.logger.info(f" Chunk {idx}: {chunk_preview}") results.append(f"✅ {original_name}: {chunks_created} chunks created") else: error_msg = result.get("error", {}).get("message", "Unknown error") self.logger.error(f"FAILED: {original_name} - {error_msg}") results.append(f"❌ {original_name}: {error_msg}") except FutureTimeoutError: self.logger.error(f"TIMEOUT: {original_name} exceeded {timeout_seconds//60} minute limit") results.append(f"⏰ {original_name}: Processing timed out after {timeout_seconds//60} minutes") future.cancel() # Cancel the task if possible except Exception as e: self.logger.error(f"EXCEPTION: {original_name} - {str(e)}") results.append(f"❌ {original_name}: {str(e)}") progress(1.0, desc="Cleaning up...") self.logger.info("Cleaning up temporary files...") # Clean up temporary files cleanup_temp_files([fp for fp, _ in saved_files]) # Log final summary total_processing_time = time.time() - time.time() # This will be updated properly self.logger.info(f"Document upload process completed:") self.logger.info(f" - Total files: {total_files}") self.logger.info(f" - Successfully processed: {processed_count}") self.logger.info(f" - Failed: {total_files - processed_count}") self.logger.info(f" - Success rate: {(processed_count/total_files*100):.1f}%") # Create result message if processed_count == total_files: self.logger.info(f"[SUCCESS] ALL DOCUMENTS PROCESSED SUCCESSFULLY ({processed_count}/{total_files})") status_html = create_success_display( f"Successfully processed {processed_count} documents:\\n" + "\\n".join(results) ) upload_status = create_success_display(f"All {processed_count} documents processed successfully!") elif processed_count > 0: self.logger.warning(f"[PARTIAL] PARTIAL SUCCESS ({processed_count}/{total_files} documents processed)") status_html = f"""
⚠️ Partially successful ({processed_count}/{total_files} files processed)
{"
".join(results)}
""" upload_status = status_html else: self.logger.error(f"[ERROR] NO DOCUMENTS PROCESSED SUCCESSFULLY (0/{total_files})") status_html = create_error_display( f"Failed to process any documents:\\n" + "\\n".join(results) ) upload_status = create_error_display("Document processing failed") return ( status_html, upload_status, gr.update(interactive=True), # Enable search button self.get_document_list() ) except Exception as e: # Clean up on error try: if 'saved_files' in locals(): cleanup_temp_files([fp for fp, _ in saved_files]) except: pass error_message = f"Document processing failed: {str(e)}" error_html = create_error_display(error_message) return error_html, error_html, gr.update(interactive=False), self.get_document_list() def perform_search( self, query: str, search_mode: str, num_results: int, enable_reranking: bool, metadata_filters: str, session_id: str ) -> Tuple[str, str, str]: """Perform search and return results.""" if not self.rag_system or not self._initialized: error_html = create_error_display("System not initialized") return error_html, "{}", "" if not query or not query.strip(): error_html = create_error_display("Please enter a search query") return error_html, "{}", "" try: # Parse metadata filters filters = parse_search_filters(metadata_filters) if metadata_filters else None # Perform search result = self.rag_system.search( query=query.strip(), k=num_results, search_mode=search_mode, enable_reranking=enable_reranking, metadata_filter=filters, user_session=session_id ) if not result.get("success"): error_msg = result.get("error", {}).get("message", "Search failed") error_html = create_error_display(error_msg) return error_html, "{}", "" # Format results search_data = result["data"] results = search_data.get("results", []) search_time = search_data.get("search_time", 0) # Create HTML display results_html, stats_html = format_search_results(results, search_time, query) # Create JSON data for detailed view json_data = { "query": query, "search_mode": search_mode, "results_count": len(results), "search_time": search_time, "results": results[:5], # Limit JSON display "query_suggestions": search_data.get("query_suggestions", []) } return results_html, json.dumps(json_data, indent=2), stats_html except Exception as e: error_html = create_error_display(f"Search failed: {str(e)}") return error_html, "{}", "" def get_document_list(self) -> str: """Get formatted document list.""" if not self.rag_system or not self._initialized: return "
System not initialized
" try: result = self.rag_system.get_document_list() if result.get("success"): documents = result["data"]["documents"] return format_document_list(documents) else: return create_error_display("Failed to load document list") except Exception as e: return create_error_display(f"Error loading documents: {str(e)}") def clear_documents(self) -> Tuple[str, str]: """Clear all documents.""" if not self.rag_system or not self._initialized: error_html = create_error_display("System not initialized") return error_html, error_html try: result = self.rag_system.clear_all_documents() if result.get("success"): success_msg = f"Cleared {result['data']['documents_removed']} documents" success_html = create_success_display(success_msg) return success_html, self.get_document_list() else: error_msg = result.get("error", {}).get("message", "Failed to clear documents") error_html = create_error_display(error_msg) return error_html, self.get_document_list() except Exception as e: error_html = create_error_display(f"Error clearing documents: {str(e)}") return error_html, self.get_document_list() def process_conversation( self, user_message: str, chat_history: str, session_state: Any = None, conversation_context: Any = None, mode: str = "hybrid", show_sources: bool = True, show_suggestions: bool = True, progress=gr.Progress() ) -> Tuple[str, str, str, str]: """ Process a conversation message. Args: user_message: User's input message chat_history: Current chat history HTML session_state: Current session state (can be dict or None) conversation_context: Current conversation context (can be dict or None) mode: Response mode (conversation, rag, hybrid) show_sources: Whether to show sources show_suggestions: Whether to show suggestions progress: Gradio progress indicator Returns: Tuple of (updated_chat_history, updated_session_state_html, updated_context_html, empty_input) """ # Handle session_state and conversation_context properly - ensure they're always dicts session_state = ensure_dict_safe(session_state) conversation_context = ensure_dict_safe(conversation_context) if not self.rag_system or not self._initialized: error_message = "System not initialized. Please check configuration and restart." error_html = create_chat_message(content=error_message, role="system") updated_history = update_chat_history(chat_history, error_message, "system") return updated_history, "", session_state, conversation_context if not user_message or not user_message.strip(): return chat_history, "", session_state, conversation_context try: # Show typing indicator progress(0.1, desc="Processing your message...") # Add user message to chat history user_message = user_message.strip() updated_history = update_chat_history(chat_history, user_message, "user") # Update session state if not session_state.get("session_id"): session_state["session_id"] = generate_session_id() session_state["started_at"] = time.time() session_state["message_count"] = 0 session_state["user_id"] = f"user_{int(time.time())}" session_state["message_count"] += 1 progress(0.3, desc="Generating response...") # Process with enhanced RAG system if available conversation_processed = False if self._conversation_enabled and hasattr(self.rag_system, 'process_conversation'): try: # Use enhanced conversation processing result = self.rag_system.process_conversation( user_input=user_message, session_id=session_state["session_id"], user_id=session_state.get("user_id") ) progress(0.8, desc="Formatting response...") if result.get("success"): response_data = result["data"] assistant_response = response_data.get("response", "I couldn't generate a response.") confidence = response_data.get("confidence", 0) sources = response_data.get("sources", []) if show_sources else [] suggestions = response_data.get("suggestions", []) if show_suggestions else [] # Update conversation context processing_info = response_data.get("processing_info", {}) conversation_context["last_intent"] = processing_info.get("intent", "unknown") conversation_context["last_route"] = processing_info.get("route", "unknown") conversation_context["last_confidence"] = confidence # Add assistant message to chat history updated_history = update_chat_history( updated_history, assistant_response, "assistant", sources=sources, confidence=confidence, suggestions=suggestions ) conversation_processed = True else: error_msg = result.get("error", {}).get("message", "Conversation processing failed") self.logger.warning(f"Conversation processing failed: {error_msg}") # Don't show error to user, fall back to simple response instead except Exception as e: self.logger.error(f"Exception in conversation processing: {e}") # Don't show error to user, fall back to simple response instead # Fallback: Simple greeting response for basic interactions if not conversation_processed: # Check if it's a simple greeting greeting_words = ["hi", "hello", "hey", "greetings", "good morning", "good afternoon", "good evening"] if any(word in user_message.lower() for word in greeting_words): # Simple greeting response assistant_response = "Hello! I'm your RAG assistant. I can help you search through your documents and answer questions. How can I assist you today?" updated_history = update_chat_history( updated_history, assistant_response, "assistant", suggestions=["What documents do you have?", "How can I search for information?", "What can you help me with?"] if show_suggestions else None ) conversation_processed = True # If still not processed, provide conversational response if not conversation_processed: # Try to search for relevant information first search_result = self.rag_system.search( query=user_message, k=3, search_mode=mode if mode in ["vector", "bm25", "hybrid"] else "hybrid" ) progress(0.8, desc="Generating response...") if search_result.get("success"): search_data = search_result["data"] results = search_data.get("results", []) if results: # Generate a conversational response based on the search results best_result = results[0] content_snippet = best_result.get("content", "")[:300] source_name = best_result.get("metadata", {}).get("source", "your documents") response_content = f"Based on {source_name}, I can help with that. {content_snippet}..." if len(results) > 1: response_content += f"\n\nI found {len(results)} related pieces of information that might be helpful." sources = [ { "title": result.get("metadata", {}).get("source", "Unknown Source"), "content": result.get("content", "")[:200] + "...", "score": result.get("scores", {}).get("final_score", 0) } for result in results[:2] ] if show_sources else [] suggestions = [ "Can you tell me more about this?", "What else should I know?", "Are there any related topics?" ] if show_suggestions else [] updated_history = update_chat_history( updated_history, response_content, "assistant", sources=sources, suggestions=suggestions ) else: # No documents found - provide helpful conversational response response_content = f"I understand you're asking about '{user_message}'. " if hasattr(self.rag_system, 'get_document_list'): doc_result = self.rag_system.get_document_list() if doc_result.get("success") and doc_result["data"]["documents"]: response_content += "I couldn't find specific information about this in your uploaded documents. You might want to try rephrasing your question or asking about topics that are covered in your documents." suggestions = [ "What documents do I have?", "What topics are covered in my documents?", "Can you help me search differently?" ] if show_suggestions else [] else: response_content += "It looks like you haven't uploaded any documents yet. Upload some documents first, and then I can help answer questions about them!" suggestions = [ "How do I upload documents?", "What file types do you support?", "What can you help me with?" ] if show_suggestions else [] else: response_content += "I'd be happy to help, but I need more context. Could you provide more details or try rephrasing your question?" suggestions = [ "Can you be more specific?", "What exactly are you looking for?", "How can I help you better?" ] if show_suggestions else [] updated_history = update_chat_history( updated_history, response_content, "assistant", suggestions=suggestions ) else: # Search failed - provide conversational fallback response_content = f"I'm having trouble processing your question about '{user_message}' right now. Could you try rephrasing it or asking something else?" suggestions = [ "What can you help me with?", "How does this system work?", "Can I try a different question?" ] if show_suggestions else [] updated_history = update_chat_history( updated_history, response_content, "assistant", suggestions=suggestions ) progress(1.0, desc="Complete!") # Ensure HTML-safe return values for display return updated_history, dict_to_html_json(session_state), dict_to_html_json(conversation_context), "" except Exception as e: error_msg = f"Error processing conversation: {str(e)}" updated_history = update_chat_history(chat_history, error_msg, "system") # Ensure HTML-safe return values in error case return updated_history, dict_to_html_json(session_state), dict_to_html_json(conversation_context), "" def clear_conversation( self, session_state: Any = None ) -> Tuple[str, Dict[str, Any], Dict[str, Any]]: """ Clear the conversation and reset state. Args: session_state: Current session state (can be dict or None) Returns: Tuple of (new_chat_history, reset_session_state, reset_context) """ # Handle session_state properly session_state = ensure_dict_safe(session_state) try: # Clear conversation session in enhanced RAG system if available if (self._conversation_enabled and hasattr(self.rag_system, 'clear_conversation_session') and session_state.get("session_id")): self.rag_system.clear_conversation_session(session_state["session_id"]) # Reset states new_session_state = { "session_id": None, "user_id": None, "started_at": None, "message_count": 0 } new_context = { "mentioned_entities": [], "active_topics": [], "last_query_type": None, "document_context": {} } # Create fresh chat history - get the HTML content directly initial_html = """
💬

Welcome to the RAG Assistant!

Ask questions about your documents or start a conversation. I can help you find information, explain concepts, and provide detailed answers based on your uploaded documents.

""" new_chat_history = initial_html return new_chat_history, dict_to_html_json(new_session_state), dict_to_html_json(new_context) except Exception as e: print(f"Error clearing conversation: {e}") # Still return reset values even if clearing failed initial_html = """
💬

Welcome to the RAG Assistant!

Ask questions about your documents or start a conversation. I can help you find information, explain concepts, and provide detailed answers based on your uploaded documents.

""" new_chat_history = initial_html new_session_state = {"session_id": None, "user_id": None, "started_at": None, "message_count": 0} new_context = {"mentioned_entities": [], "active_topics": [], "last_query_type": None, "document_context": {}} return new_chat_history, dict_to_html_json(new_session_state), dict_to_html_json(new_context) def get_analytics_data(self) -> Tuple[str, gr.Plot, gr.Plot, List[List[str]]]: """Get analytics dashboard data.""" if not self.rag_system or not self._initialized: return ( create_error_display("System not initialized"), gr.Plot(), gr.Plot(), [] ) try: result = self.rag_system.get_analytics_dashboard() if not result.get("success"): error_html = create_error_display("Failed to load analytics data") return error_html, gr.Plot(), gr.Plot(), [] analytics_data = result["data"] # Format system overview overview_html = format_system_overview(analytics_data) # Create charts query_chart, modes_chart = create_analytics_charts(analytics_data) # Create activity table data activity_data = [] system_data = analytics_data.get("system", {}) activity_data.append([ "System Started", "System Initialization", f"Uptime: {system_data.get('uptime_hours', 0):.1f} hours", "✅ Active" ]) if system_data.get("total_queries", 0) > 0: activity_data.append([ "Recent", "Search Queries", f"{system_data.get('total_queries')} total queries", "📊 Active" ]) # Add conversation metrics if enhanced system is enabled if self._conversation_enabled and hasattr(self.rag_system, 'conversation_manager'): try: conversation_stats = self.rag_system.conversation_manager.get_session_statistics() if conversation_stats.get("active_sessions", 0) > 0: activity_data.append([ "Now", "Active Conversations", f"{conversation_stats.get('active_sessions', 0)} sessions", "💬 Active" ]) total_conversations = conversation_stats.get("total_conversations", 0) if total_conversations > 0: activity_data.append([ "Recent", "Conversation Sessions", f"{total_conversations} total conversations", "💬 Complete" ]) total_messages = conversation_stats.get("total_messages", 0) if total_messages > 0: activity_data.append([ "Recent", "Chat Messages", f"{total_messages} messages exchanged", "📝 Active" ]) except Exception as e: self.logger.warning(f"Could not get conversation statistics: {e}") if system_data.get("total_documents_processed", 0) > 0: activity_data.append([ "Recent", "Document Processing", f"{system_data.get('total_documents_processed')} documents processed", "📄 Complete" ]) return overview_html, query_chart, modes_chart, activity_data except Exception as e: error_html = create_error_display(f"Error loading analytics: {str(e)}") return error_html, gr.Plot(), gr.Plot(), [] def create_interface(self) -> gr.Blocks: """Create the main Gradio interface.""" theme = get_theme() css = get_custom_css() # Add conversation CSS if conversation is enabled if self._conversation_enabled: css += "\n" + get_conversation_css() with gr.Blocks( theme=theme, css=css, title="Professional RAG Assistant", analytics_enabled=False ) as interface: # Add JavaScript for conversation if enabled if self._conversation_enabled: gr.HTML(get_conversation_javascript()) # Session state session_id_state = gr.State(value=generate_session_id()) # Header create_header() # System status system_status = create_system_status() # Main tabs with gr.Tabs() as main_tabs: # Chat Tab with gr.Tab("💬 Chat", id="chat"): gr.Markdown(""" ## 🤖 **Intelligent Document Chat** **Talk to your documents naturally!** This AI-powered chat interface understands your questions and provides intelligent responses based on your uploaded documents. ✨ **Key Features:** - 💬 **Natural Conversation**: Ask questions in plain English - 🎯 **Smart Responses**: Choose conversation, RAG, or hybrid modes - 📚 **Source Citations**: See exactly where information comes from - 💡 **Follow-up Suggestions**: Get AI-generated next questions 🚀 **Get Started:** Upload documents first, then return here to start chatting! """) # Chat interface components chat_components = create_chat_interface() chat_history_display, chat_input, send_button, clear_chat_button, session_state_display = chat_components[:5] conversation_context_display, mode_selector, show_sources_checkbox, show_suggestions_checkbox = chat_components[5:] with gr.Row(): with gr.Column(scale=3): chat_history_display with gr.Row(): chat_input send_button with gr.Row(): clear_chat_button mode_selector with gr.Column(scale=1): with gr.Accordion("Chat Options", open=True): show_sources_checkbox show_suggestions_checkbox # Hidden debug components - placed outside visible UI but still accessible for event handling with gr.Row(visible=False): session_state_display conversation_context_display # Document Upload Tab with gr.Tab("📁 Document Upload", id="upload"): gr.Markdown(""" ## 📁 **Document Upload & Processing** **Build your intelligent knowledge base!** Upload your documents and let our AI process them for instant search and conversation capabilities. 📄 **Supported Formats:** - 📊 **PDF files** - Research papers, reports, manuals (up to 50MB each) - 📝 **DOCX files** - Word documents, proposals, notes - 📃 **TXT files** - Plain text, transcripts, code files ⚡ **Smart Processing:** - 🧠 **Intelligent chunking** with context preservation - 📊 **Metadata extraction** for better organization - 🎯 **Vector embeddings** for semantic search - ⏱️ **Real-time progress** tracking 💡 **Pro Tip:** Upload related documents together for better cross-document insights! """) file_upload, upload_status, upload_button = create_file_upload_section() with gr.Accordion("Upload Settings", open=False): gr.Markdown(""" **Supported formats:** PDF, DOCX, TXT **Maximum file size:** 50MB per file **Processing:** Documents are split into chunks and indexed for search """) # Search Tab with gr.Tab("🔍 Search", id="search"): gr.Markdown(""" ## 🔍 **Advanced Document Search** **Find exactly what you need!** Our hybrid AI search combines vector similarity and keyword matching for superior results. 🎯 **Search Capabilities:** - 🧠 **Semantic Search** - Understands meaning, not just keywords - 📝 **Keyword Search** - Traditional exact text matching - 🔀 **Hybrid Mode** - Best of both worlds (recommended) - 📊 **Smart Re-ranking** - Improves relevance with cross-encoders ⚙️ **Advanced Features:** - 🎛️ **Configurable parameters** - Adjust search weights and result count - 🏷️ **Metadata filtering** - Filter by document properties - 📈 **Relevance scoring** - See confidence levels for each result - 📋 **JSON export** - Raw data for technical analysis 💡 **Perfect for:** Research, fact-finding, and detailed document analysis """) with gr.Row(): with gr.Column(scale=4): search_components = create_search_interface() search_query, search_controls, search_button = search_components[:3] search_mode, num_results, enable_reranking = search_components[3:] with gr.Column(scale=1): with gr.Accordion("Advanced Options", open=False): metadata_filters = gr.Textbox( label="Metadata Filters", placeholder='{"source": "document.pdf"}', lines=3, info="JSON or key:value,key2:value2 format" ) # Results display results_html, results_json, search_stats = create_results_display() with gr.Accordion("Detailed Results (JSON)", open=False): results_json # Document Management Tab with gr.Tab("📚 Documents", id="documents"): gr.Markdown(""" ## 📚 **Document Library Manager** **Organize your knowledge base!** View, manage, and monitor all your uploaded documents in one central location. 📊 **Document Overview:** - 📄 **File Details** - Name, size, format, and upload date - 🧩 **Processing Stats** - Number of chunks and processing status - 🔍 **Search Performance** - Track which documents are most useful - 📈 **Usage Analytics** - See query patterns and access frequency 🛠️ **Management Tools:** - 🗑️ **Individual Removal** - Delete specific documents - 🧹 **Bulk Clear** - Remove all documents at once - 🔄 **Refresh Status** - Update document list and statistics - 📋 **Export List** - Get document inventory 💡 **Best Practice:** Regularly review and organize your document library for optimal performance """) document_list, refresh_docs_btn, clear_docs_btn = create_document_management() # Analytics Tab with gr.Tab("📊 Analytics", id="analytics"): gr.Markdown(""" ## 📊 **System Analytics & Insights** **Monitor your RAG system performance!** Track usage patterns, system health, and optimization opportunities. 📈 **Performance Metrics:** - ⚡ **Response Times** - Average query processing speed - 🔍 **Search Accuracy** - Relevance scores and success rates - 💬 **Chat Analytics** - Conversation patterns and engagement - 📊 **System Health** - Memory usage and processing efficiency 🎯 **Usage Insights:** - 🔥 **Popular Queries** - Most frequently asked questions - 📄 **Document Utilization** - Which documents are accessed most - 🎭 **Mode Preferences** - Conversation vs RAG vs Hybrid usage - ⏰ **Activity Patterns** - Peak usage times and trends 🛠️ **Optimization Tools:** - 📋 **Performance Reports** - Detailed analytics export - 🔄 **Real-time Monitoring** - Live system status updates - 💡 **Recommendations** - AI-suggested improvements - 📈 **Trend Analysis** - Historical performance tracking 💡 **Use this data to:** Optimize document libraries, improve search strategies, and enhance user experience """) analytics_components = create_analytics_dashboard() system_overview, query_chart, search_modes_chart, activity_table = analytics_components with gr.Row(): with gr.Column(): query_chart with gr.Column(): search_modes_chart with gr.Accordion("Recent Activity", open=False): activity_table refresh_analytics_btn = gr.Button("Refresh Analytics", variant="secondary") # Event handlers # File upload events file_upload.change( fn=lambda files: ( create_success_display(f"✅ {len(files)} file(s) selected! Click the green '🚀 Process Documents' button below to continue.") if files and len(files) > 0 else "
📁 No files selected
", gr.update(interactive=files is not None and len(files) > 0) ), inputs=[file_upload], outputs=[upload_status, upload_button], show_progress=False ) upload_button.click( fn=self.process_documents, inputs=[file_upload, session_id_state], outputs=[upload_status, system_status, search_button, document_list], show_progress=True ) # Search events search_query.change( fn=lambda query: gr.update(interactive=len(query.strip()) > 0 if query else False), inputs=[search_query], outputs=[search_button], show_progress=False ) search_button.click( fn=lambda: create_loading_display("Searching..."), inputs=[], outputs=[results_html], show_progress=False ).then( fn=self.perform_search, inputs=[ search_query, search_mode, num_results, enable_reranking, metadata_filters, session_id_state ], outputs=[results_html, results_json, search_stats], show_progress=True ) # Document management events refresh_docs_btn.click( fn=self.get_document_list, inputs=[], outputs=[document_list], show_progress=False ) clear_docs_btn.click( fn=self.clear_documents, inputs=[], outputs=[system_status, document_list], show_progress=True ) # Analytics events refresh_analytics_btn.click( fn=self.get_analytics_data, inputs=[], outputs=[system_overview, query_chart, search_modes_chart, activity_table], show_progress=True ) # Chat events - full RAG with duplicate prevention def handle_chat_submit(message, history, mode, show_sources, show_suggestions): # Prevent empty messages if not message or not message.strip(): return history, dict_to_html_json({}), dict_to_html_json({}), "" user_message = message.strip() # Initialize session state session_state = { "session_id": f"session_{int(time.time())}", "message_count": 1 } conversation_context = {} # Generate intelligent response try: # Check for greetings first greeting_words = ["hi", "hello", "hey", "greetings", "good morning", "good afternoon", "good evening"] if any(word in user_message.lower() for word in greeting_words): response = "Hello! I'm your RAG assistant. I can help you search through your documents and answer questions. How can I assist you today?" suggestions = ["What documents do you have?", "How can I search for information?", "What can you help me with?"] if show_suggestions else [] sources = [] # Try to search for relevant information elif self.rag_system and self._initialized: search_result = self.rag_system.search( query=user_message, k=3, search_mode=mode if mode in ["vector", "bm25", "hybrid"] else "hybrid" ) if search_result.get("success"): results = search_result["data"].get("results", []) if results: # Generate conversational response from search results best_result = results[0] content_snippet = best_result.get("content", "")[:300] source_name = best_result.get("metadata", {}).get("source", "your documents") response = f"Based on {source_name}, I can help with that. {content_snippet}..." if len(results) > 1: response += f"\n\nI found {len(results)} related pieces of information that might be helpful." sources = [ { "title": result.get("metadata", {}).get("source", "Unknown Source"), "content": result.get("content", "")[:200] + "...", "score": result.get("scores", {}).get("final_score", 0) } for result in results[:2] ] if show_sources else [] suggestions = [ "Can you tell me more about this?", "What else should I know?", "Are there any related topics?" ] if show_suggestions else [] else: # No search results found doc_result = self.rag_system.get_document_list() if doc_result.get("success") and doc_result["data"]["documents"]: response = f"I understand you're asking about '{user_message}'. I couldn't find specific information about this in your uploaded documents. You might want to try rephrasing your question or asking about topics that are covered in your documents." suggestions = [ "What documents do I have?", "What topics are covered in my documents?", "Can you help me search differently?" ] if show_suggestions else [] else: response = f"I understand you're asking about '{user_message}'. It looks like you haven't uploaded any documents yet. Upload some documents first, and then I can help answer questions about them!" suggestions = [ "How do I upload documents?", "What file types do you support?", "What can you help me with?" ] if show_suggestions else [] sources = [] else: response = f"I'm having trouble processing your question about '{user_message}' right now. Could you try rephrasing it or asking something else?" suggestions = [ "What can you help me with?", "How does this system work?", "Can I try a different question?" ] if show_suggestions else [] sources = [] else: response = "The system is not ready yet. Please check the configuration and try again." suggestions = [] sources = [] except Exception as e: response = f"I encountered an error processing your message. Please try again." suggestions = ["Can you try rephrasing?", "What else can I help with?"] if show_suggestions else [] sources = [] # Build complete chat history from scratch to prevent duplicates new_history = f"""
{user_message}
{response}
""" # Add sources if available if sources: new_history += """

📚 Sources:

""" for source in sources: new_history += f"""
{source['title']} {source['score']:.1%}
{source['content']}
""" new_history += """
""" # Add suggestions if available if suggestions: new_history += """

💡 Suggestions:

""" for suggestion in suggestions: new_history += f""" """ new_history += """
""" new_history += """
""" return new_history, dict_to_html_json(session_state), dict_to_html_json(conversation_context), "" chat_input.submit( fn=handle_chat_submit, inputs=[ chat_input, chat_history_display, mode_selector, show_sources_checkbox, show_suggestions_checkbox ], outputs=[ chat_history_display, session_state_display, conversation_context_display, chat_input ], show_progress=True ) send_button.click( fn=handle_chat_submit, inputs=[ chat_input, chat_history_display, mode_selector, show_sources_checkbox, show_suggestions_checkbox ], outputs=[ chat_history_display, session_state_display, conversation_context_display, chat_input ], show_progress=True ) def handle_clear_chat(): # Return fresh chat history initial_html = """
🤖

Welcome to Your Intelligent Document Assistant!

🎯 What I can do for you:

💡 New here? Start by uploading documents in the "📁 Document Upload" tab, then come back to chat!

🚀 Try these to get started:

""" return initial_html, dict_to_html_json({}), dict_to_html_json({}) clear_chat_button.click( fn=handle_clear_chat, inputs=[], outputs=[chat_history_display, session_state_display, conversation_context_display], show_progress=False ) # Enable chat input when message is typed chat_input.change( fn=lambda message: gr.update(interactive=len(message.strip()) > 0 if message else False), inputs=[chat_input], outputs=[send_button], show_progress=False ) # Initialize interface interface.load( fn=lambda: ( self.get_system_status(), "
📁 No files selected
", # Initial upload status gr.update(interactive=False), # Upload button disabled initially gr.update(interactive=False), # Search button disabled initially gr.update(interactive=False), # Send button disabled initially self.get_document_list(), *self.get_analytics_data(), "", # Initial chat history dict_to_html_json({}), # Initial session state dict_to_html_json({}) # Initial conversation context ), inputs=[], outputs=[ system_status, upload_status, upload_button, search_button, send_button, document_list, system_overview, query_chart, search_modes_chart, activity_table, chat_history_display, session_state_display, conversation_context_display ], show_progress=False ) return interface def create_interface(config_path: str = None) -> gr.Blocks: """Create and return the RAG interface.""" try: # Setup logging configuration first import yaml config_file = config_path or "config.yaml" try: with open(config_file, 'r') as f: config = yaml.safe_load(f) setup_logging(config) except Exception as e: print(f"Warning: Could not setup logging from config: {e}") # Create interface rag_interface = RAGInterface(config_path) return rag_interface.create_interface() except Exception as e: print(f"Error creating interface: {e}") raise if __name__ == "__main__": # For testing interface = create_interface() interface.launch(debug=True)