"""
Main Gradio interface for the Professional RAG Assistant.
"""
import gradio as gr
import asyncio
import threading
import time
import json
import sys
import signal
import logging
from typing import Any, Dict, List, Optional, Tuple, Union
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor, TimeoutError as FutureTimeoutError
from .themes import get_theme, get_custom_css
from .components import (
create_header, create_file_upload_section, create_search_interface,
create_results_display, create_document_management, create_system_status,
create_analytics_dashboard, format_document_list, format_search_results,
create_analytics_charts, format_system_overview, create_error_display,
create_success_display, create_loading_display
)
from .utils import (
save_uploaded_files, validate_file_types, cleanup_temp_files,
generate_session_id, format_file_size, parse_search_filters
)
sys.path.append(str(Path(__file__).parent.parent))
from src.rag_system import RAGSystem
from src.error_handler import RAGError
class RAGInterface:
"""Main interface class for the RAG system."""
def __init__(self, config_path: str = None):
"""Initialize the RAG interface."""
self.rag_system: Optional[RAGSystem] = None
self.config_path = config_path or "config.yaml"
self.active_sessions: Dict[str, Dict[str, Any]] = {}
self._initialization_lock = threading.Lock()
self._initialized = False
# Setup logging
self.logger = logging.getLogger(__name__)
# Initialize RAG system
self._initialize_rag_system()
def _initialize_rag_system(self) -> None:
"""Initialize the RAG system."""
try:
with self._initialization_lock:
if not self._initialized:
print("Initializing RAG system...")
self.rag_system = RAGSystem(config_path=self.config_path)
# Warm up the system
warmup_result = self.rag_system.warmup()
if warmup_result.get("success"):
print("RAG system initialized and warmed up successfully")
self._initialized = True
else:
print(f"RAG system warmup failed: {warmup_result.get('error', {}).get('message')}")
except Exception as e:
print(f"Failed to initialize RAG system: {e}")
self.rag_system = None
def get_system_status(self) -> str:
"""Get current system status HTML."""
if not self.rag_system or not self._initialized:
return create_error_display("System not initialized. Please check configuration and restart.")
try:
stats_response = self.rag_system.get_system_stats()
if not stats_response.get("success"):
return create_error_display(f"Failed to get system status: {stats_response.get('error', {}).get('message')}")
stats = stats_response["data"]
status_info = stats.get("status", {})
if status_info.get("ready"):
status_message = f"System ready - {status_info.get('documents_indexed', 0)} documents indexed"
return create_success_display(status_message)
else:
return create_error_display("System not ready")
except Exception as e:
return create_error_display(f"Error getting system status: {str(e)}")
def process_documents(
self,
files: List[gr.File],
session_id: str,
progress=gr.Progress()
) -> Tuple[str, str, bool, str]:
"""Process uploaded documents."""
if not files:
return (
create_error_display("No files uploaded"),
create_error_display("Please select files to upload"),
False, # upload button disabled
"No documents uploaded yet."
)
if not self.rag_system or not self._initialized:
return (
create_error_display("System not initialized"),
create_error_display("Please restart the application"),
False,
"No documents uploaded yet."
)
try:
self.logger.info(f"Starting document upload process - {len(files)} files received")
# Validate file types
allowed_extensions = [".pdf", ".docx", ".txt"]
valid_files, validation_errors = validate_file_types(files, allowed_extensions)
if validation_errors:
self.logger.warning(f"File validation errors: {validation_errors}")
error_html = create_error_display("\\n".join(validation_errors))
return error_html, error_html, len(valid_files) > 0, self.get_document_list()
self.logger.info(f"File validation passed - {len(valid_files)} valid files")
# Save uploaded files
progress(0.1, desc="Saving uploaded files...")
self.logger.info("Saving uploaded files to temporary directory...")
saved_files = save_uploaded_files(valid_files)
for file_path, original_name in saved_files:
file_size = Path(file_path).stat().st_size / (1024 * 1024) # Size in MB
self.logger.info(f"Saved file: {original_name} ({file_size:.2f} MB) -> {file_path}")
if not saved_files:
return (
create_error_display("No valid files to process"),
create_error_display("Please check your files and try again"),
False,
self.get_document_list()
)
# Process each file with timeout
processed_count = 0
total_files = len(saved_files)
results = []
timeout_seconds = 600 # 10 minutes
def process_single_file(file_path, original_name, session_id):
"""Process a single file - to be run with timeout."""
self.logger.info(f"Processing file: {original_name}")
start_time = time.time()
result = self.rag_system.add_document(
file_path=file_path,
filename=original_name,
user_session=session_id
)
processing_time = time.time() - start_time
self.logger.info(f"File processing completed: {original_name} (took {processing_time:.2f}s)")
return result
self.logger.info(f"Starting processing of {total_files} files with {timeout_seconds//60}-minute timeout per file")
with ThreadPoolExecutor(max_workers=1) as executor:
for i, (file_path, original_name) in enumerate(saved_files):
progress((i + 1) / total_files * 0.8 + 0.1, desc=f"Processing {original_name}...")
self.logger.info(f"Processing file {i+1}/{total_files}: {original_name}")
try:
# Submit the task with timeout
future = executor.submit(process_single_file, file_path, original_name, session_id)
result = future.result(timeout=timeout_seconds)
if result.get("success"):
processed_count += 1
chunks_created = result['data']['chunks_created']
# Log detailed success info
self.logger.info(f"SUCCESS: {original_name} - {chunks_created} chunks created")
# Log sample chunk info if available
if 'sample_chunks' in result['data']:
sample_chunks = result['data']['sample_chunks']
self.logger.info(f"Sample chunks from {original_name}:")
for idx, chunk in enumerate(sample_chunks[:3]): # Show first 3 chunks
chunk_preview = chunk['content'][:100] + "..." if len(chunk['content']) > 100 else chunk['content']
self.logger.info(f" Chunk {idx}: {chunk_preview}")
results.append(f"✅ {original_name}: {chunks_created} chunks created")
else:
error_msg = result.get("error", {}).get("message", "Unknown error")
self.logger.error(f"FAILED: {original_name} - {error_msg}")
results.append(f"❌ {original_name}: {error_msg}")
except FutureTimeoutError:
self.logger.error(f"TIMEOUT: {original_name} exceeded {timeout_seconds//60} minute limit")
results.append(f"⏰ {original_name}: Processing timed out after {timeout_seconds//60} minutes")
future.cancel() # Cancel the task if possible
except Exception as e:
self.logger.error(f"EXCEPTION: {original_name} - {str(e)}")
results.append(f"❌ {original_name}: {str(e)}")
progress(1.0, desc="Cleaning up...")
self.logger.info("Cleaning up temporary files...")
# Clean up temporary files
cleanup_temp_files([fp for fp, _ in saved_files])
# Log final summary
total_processing_time = time.time() - time.time() # This will be updated properly
self.logger.info(f"Document upload process completed:")
self.logger.info(f" - Total files: {total_files}")
self.logger.info(f" - Successfully processed: {processed_count}")
self.logger.info(f" - Failed: {total_files - processed_count}")
self.logger.info(f" - Success rate: {(processed_count/total_files*100):.1f}%")
# Create result message
if processed_count == total_files:
self.logger.info(f"✅ ALL DOCUMENTS PROCESSED SUCCESSFULLY ({processed_count}/{total_files})")
status_html = create_success_display(
f"Successfully processed {processed_count} documents:\\n" + "\\n".join(results)
)
upload_status = create_success_display(f"All {processed_count} documents processed successfully!")
elif processed_count > 0:
self.logger.warning(f"⚠️ PARTIAL SUCCESS ({processed_count}/{total_files} documents processed)")
status_html = f"""
⚠️ Partially successful ({processed_count}/{total_files} files processed)
{"
".join(results)}
"""
upload_status = status_html
else:
self.logger.error(f"❌ NO DOCUMENTS PROCESSED SUCCESSFULLY (0/{total_files})")
status_html = create_error_display(
f"Failed to process any documents:\\n" + "\\n".join(results)
)
upload_status = create_error_display("Document processing failed")
return (
status_html,
upload_status,
gr.update(interactive=True), # Enable search button
self.get_document_list()
)
except Exception as e:
# Clean up on error
try:
if 'saved_files' in locals():
cleanup_temp_files([fp for fp, _ in saved_files])
except:
pass
error_message = f"Document processing failed: {str(e)}"
error_html = create_error_display(error_message)
return error_html, error_html, gr.update(interactive=False), self.get_document_list()
def perform_search(
self,
query: str,
search_mode: str,
num_results: int,
enable_reranking: bool,
metadata_filters: str,
session_id: str
) -> Tuple[str, str, str]:
"""Perform search and return results."""
if not self.rag_system or not self._initialized:
error_html = create_error_display("System not initialized")
return error_html, "{}", ""
if not query or not query.strip():
error_html = create_error_display("Please enter a search query")
return error_html, "{}", ""
try:
# Parse metadata filters
filters = parse_search_filters(metadata_filters) if metadata_filters else None
# Perform search
result = self.rag_system.search(
query=query.strip(),
k=num_results,
search_mode=search_mode,
enable_reranking=enable_reranking,
metadata_filter=filters,
user_session=session_id
)
if not result.get("success"):
error_msg = result.get("error", {}).get("message", "Search failed")
error_html = create_error_display(error_msg)
return error_html, "{}", ""
# Format results
search_data = result["data"]
results = search_data.get("results", [])
search_time = search_data.get("search_time", 0)
# Create HTML display
results_html, stats_html = format_search_results(results, search_time, query)
# Create JSON data for detailed view
json_data = {
"query": query,
"search_mode": search_mode,
"results_count": len(results),
"search_time": search_time,
"results": results[:5], # Limit JSON display
"query_suggestions": search_data.get("query_suggestions", [])
}
return results_html, json.dumps(json_data, indent=2), stats_html
except Exception as e:
error_html = create_error_display(f"Search failed: {str(e)}")
return error_html, "{}", ""
def get_document_list(self) -> str:
"""Get formatted document list."""
if not self.rag_system or not self._initialized:
return "System not initialized
"
try:
result = self.rag_system.get_document_list()
if result.get("success"):
documents = result["data"]["documents"]
return format_document_list(documents)
else:
return create_error_display("Failed to load document list")
except Exception as e:
return create_error_display(f"Error loading documents: {str(e)}")
def clear_documents(self) -> Tuple[str, str]:
"""Clear all documents."""
if not self.rag_system or not self._initialized:
error_html = create_error_display("System not initialized")
return error_html, error_html
try:
result = self.rag_system.clear_all_documents()
if result.get("success"):
success_msg = f"Cleared {result['data']['documents_removed']} documents"
success_html = create_success_display(success_msg)
return success_html, self.get_document_list()
else:
error_msg = result.get("error", {}).get("message", "Failed to clear documents")
error_html = create_error_display(error_msg)
return error_html, self.get_document_list()
except Exception as e:
error_html = create_error_display(f"Error clearing documents: {str(e)}")
return error_html, self.get_document_list()
def get_analytics_data(self) -> Tuple[str, gr.Plot, gr.Plot, List[List[str]]]:
"""Get analytics dashboard data."""
if not self.rag_system or not self._initialized:
return (
create_error_display("System not initialized"),
gr.Plot(),
gr.Plot(),
[]
)
try:
result = self.rag_system.get_analytics_dashboard()
if not result.get("success"):
error_html = create_error_display("Failed to load analytics data")
return error_html, gr.Plot(), gr.Plot(), []
analytics_data = result["data"]
# Format system overview
overview_html = format_system_overview(analytics_data)
# Create charts
query_chart, modes_chart = create_analytics_charts(analytics_data)
# Create activity table data
activity_data = []
system_data = analytics_data.get("system", {})
activity_data.append([
"System Started",
"System Initialization",
f"Uptime: {system_data.get('uptime_hours', 0):.1f} hours",
"✅ Active"
])
if system_data.get("total_queries", 0) > 0:
activity_data.append([
"Recent",
"Search Queries",
f"{system_data.get('total_queries')} total queries",
"📊 Active"
])
if system_data.get("total_documents_processed", 0) > 0:
activity_data.append([
"Recent",
"Document Processing",
f"{system_data.get('total_documents_processed')} documents processed",
"📄 Complete"
])
return overview_html, query_chart, modes_chart, activity_data
except Exception as e:
error_html = create_error_display(f"Error loading analytics: {str(e)}")
return error_html, gr.Plot(), gr.Plot(), []
def create_interface(self) -> gr.Blocks:
"""Create the main Gradio interface."""
theme = get_theme()
css = get_custom_css()
with gr.Blocks(
theme=theme,
css=css,
title="Professional RAG Assistant",
analytics_enabled=False
) as interface:
# Session state
session_id_state = gr.State(value=generate_session_id())
# Header
create_header()
# System status
system_status = create_system_status()
# Main tabs
with gr.Tabs() as main_tabs:
# Document Upload Tab
with gr.Tab("📁 Document Upload", id="upload"):
gr.Markdown("Upload your documents to build the knowledge base. Supports PDF, DOCX, and TXT files.")
file_upload, upload_status, upload_button = create_file_upload_section()
with gr.Accordion("Upload Settings", open=False):
gr.Markdown("""
**Supported formats:** PDF, DOCX, TXT
**Maximum file size:** 50MB per file
**Processing:** Documents are split into chunks and indexed for search
""")
# Search Tab
with gr.Tab("🔍 Search", id="search"):
gr.Markdown("Search your uploaded documents using advanced AI-powered retrieval.")
with gr.Row():
with gr.Column(scale=4):
search_components = create_search_interface()
search_query, search_controls, search_button = search_components[:3]
search_mode, num_results, enable_reranking = search_components[3:]
with gr.Column(scale=1):
with gr.Accordion("Advanced Options", open=False):
metadata_filters = gr.Textbox(
label="Metadata Filters",
placeholder='{"source": "document.pdf"}',
lines=3,
info="JSON or key:value,key2:value2 format"
)
# Results display
results_html, results_json, search_stats = create_results_display()
with gr.Accordion("Detailed Results (JSON)", open=False):
results_json
# Document Management Tab
with gr.Tab("📚 Documents", id="documents"):
gr.Markdown("Manage your uploaded documents and view indexing status.")
document_list, refresh_docs_btn, clear_docs_btn = create_document_management()
# Analytics Tab
with gr.Tab("📊 Analytics", id="analytics"):
gr.Markdown("View system performance and usage analytics.")
analytics_components = create_analytics_dashboard()
system_overview, query_chart, search_modes_chart, activity_table = analytics_components
with gr.Row():
with gr.Column():
query_chart
with gr.Column():
search_modes_chart
with gr.Accordion("Recent Activity", open=False):
activity_table
refresh_analytics_btn = gr.Button("Refresh Analytics", variant="secondary")
# Event handlers
# File upload events
file_upload.change(
fn=lambda files: (
create_success_display(f"✅ {len(files)} file(s) selected! Click the green '🚀 Process Documents' button below to continue.") if files and len(files) > 0 else create_loading_display("No files selected"),
gr.update(interactive=files is not None and len(files) > 0)
),
inputs=[file_upload],
outputs=[upload_status, upload_button],
show_progress=False
)
upload_button.click(
fn=self.process_documents,
inputs=[file_upload, session_id_state],
outputs=[upload_status, system_status, search_button, document_list],
show_progress=True
)
# Search events
search_query.change(
fn=lambda query: gr.update(interactive=len(query.strip()) > 0 if query else False),
inputs=[search_query],
outputs=[search_button],
show_progress=False
)
search_button.click(
fn=lambda: create_loading_display("Searching..."),
inputs=[],
outputs=[results_html],
show_progress=False
).then(
fn=self.perform_search,
inputs=[
search_query, search_mode, num_results,
enable_reranking, metadata_filters, session_id_state
],
outputs=[results_html, results_json, search_stats],
show_progress=True
)
# Document management events
refresh_docs_btn.click(
fn=self.get_document_list,
inputs=[],
outputs=[document_list],
show_progress=False
)
clear_docs_btn.click(
fn=self.clear_documents,
inputs=[],
outputs=[system_status, document_list],
show_progress=True
)
# Analytics events
refresh_analytics_btn.click(
fn=self.get_analytics_data,
inputs=[],
outputs=[system_overview, query_chart, search_modes_chart, activity_table],
show_progress=True
)
# Initialize interface
interface.load(
fn=lambda: (
self.get_system_status(),
gr.update(interactive=False), # Upload button disabled initially
gr.update(interactive=False), # Search button disabled initially
self.get_document_list(),
*self.get_analytics_data()
),
inputs=[],
outputs=[
system_status, upload_button, search_button, document_list,
system_overview, query_chart, search_modes_chart, activity_table
],
show_progress=False
)
return interface
def create_interface(config_path: str = None) -> gr.Blocks:
"""Create and return the RAG interface."""
rag_interface = RAGInterface(config_path)
return rag_interface.create_interface()
if __name__ == "__main__":
# For testing
interface = create_interface()
interface.launch(debug=True)