import streamlit as st import os import sys import tempfile from datetime import datetime import pandas as pd from typing import List, Dict, Any import time import logging # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Add project root to path for imports sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))) # Use relative imports when running as part of the app package try: from app.core.agent import AssistantAgent from app.core.ingestion import DocumentProcessor from app.core.telegram_bot import TelegramBot from app.core.chat_history import ChatHistoryManager from app.utils.helpers import get_document_path, format_sources, save_conversation, copy_uploaded_file from app.config import ( LLM_MODEL, EMBEDDING_MODEL, TELEGRAM_ENABLED, TELEGRAM_BOT_TOKEN, TELEGRAM_ALLOWED_USERS, HF_DATASET_NAME ) except ImportError: # Fallback to direct imports if app is not recognized as a package sys.path.append(os.path.abspath('.')) from app.core.agent import AssistantAgent from app.core.ingestion import DocumentProcessor from app.core.telegram_bot import TelegramBot from app.core.chat_history import ChatHistoryManager from app.utils.helpers import get_document_path, format_sources, save_conversation, copy_uploaded_file from app.config import ( LLM_MODEL, EMBEDDING_MODEL, TELEGRAM_ENABLED, TELEGRAM_BOT_TOKEN, TELEGRAM_ALLOWED_USERS, HF_DATASET_NAME ) # Set page config st.set_page_config( page_title="Personal AI Second Brain", page_icon="🧠", layout="wide" ) # Function to initialize the agent safely @st.cache_resource def get_agent(): logger.info("Initializing AssistantAgent (should only happen once)") try: return AssistantAgent() except Exception as e: logger.error(f"Error initializing agent: {e}") st.error(f"Could not initialize AI assistant: {str(e)}") # Return a dummy agent as fallback class DummyAgent: def query(self, question): return { "answer": "I'm having trouble starting up. Please try refreshing the page.", "sources": [] } def add_conversation_to_memory(self, *args, **kwargs): pass return DummyAgent() # Function to initialize document processor safely @st.cache_resource def get_document_processor(_agent): """Initialize document processor with unhashable agent parameter. The leading underscore in _agent tells Streamlit not to hash this parameter. """ logger.info("Initializing DocumentProcessor (should only happen once)") try: return DocumentProcessor(_agent.memory_manager) except Exception as e: logger.error(f"Error initializing document processor: {e}") st.error(f"Could not initialize document processor: {str(e)}") # Return a dummy processor as fallback class DummyProcessor: def ingest_file(self, *args, **kwargs): return ["dummy-id"] def ingest_text(self, *args, **kwargs): return ["dummy-id"] return DummyProcessor() # Function to initialize chat history manager @st.cache_resource def get_chat_history_manager(): logger.info("Initializing ChatHistoryManager") try: return ChatHistoryManager(dataset_name=HF_DATASET_NAME) except Exception as e: logger.error(f"Error initializing chat history manager: {e}") st.error(f"Could not initialize chat history: {str(e)}") # Return a dummy manager as fallback class DummyHistoryManager: def load_history(self, *args, **kwargs): return [] def save_conversation(self, *args, **kwargs): return True def sync_to_hub(self, *args, **kwargs): return False return DummyHistoryManager() # Function to initialize Telegram bot @st.cache_resource def get_telegram_bot(_agent): """Initialize Telegram bot with unhashable agent parameter.""" if not TELEGRAM_ENABLED or not TELEGRAM_BOT_TOKEN: logger.info("Telegram bot disabled or token missing") return None logger.info("Initializing Telegram bot") try: bot = TelegramBot( agent=_agent, token=TELEGRAM_BOT_TOKEN, allowed_user_ids=TELEGRAM_ALLOWED_USERS ) return bot except Exception as e: logger.error(f"Error initializing Telegram bot: {e}") return None # Initialize session state variables if "messages" not in st.session_state: st.session_state.messages = [] if "telegram_status" not in st.session_state: st.session_state.telegram_status = "Not started" if "history_filter" not in st.session_state: st.session_state.history_filter = "" if "current_tab" not in st.session_state: st.session_state.current_tab = "Chat" # Initialize agent and other components with caching agent = get_agent() document_processor = get_document_processor(agent) chat_history_manager = get_chat_history_manager() telegram_bot = get_telegram_bot(agent) # Load initial messages from history if not st.session_state.messages: try: recent_history = chat_history_manager.load_history() # Take the last 10 conversations and convert to messages format for conv in recent_history[-10:]: if "user_query" in conv and "assistant_response" in conv: st.session_state.messages.append({"role": "user", "content": conv["user_query"]}) st.session_state.messages.append({"role": "assistant", "content": conv["assistant_response"]}) except Exception as e: logger.error(f"Error loading initial history: {e}") # Main UI st.title("🧠 Personal AI Second Brain") # Create tabs for different functionality tabs = st.tabs(["Chat", "Documents", "History", "Settings"]) # Chat tab with tabs[0]: if st.session_state.current_tab != "Chat": st.session_state.current_tab = "Chat" # Display chat messages from history for message in st.session_state.messages: with st.chat_message(message["role"]): st.markdown(message["content"]) # Accept user input if prompt := st.chat_input("Ask me anything..."): # Add user message to chat history st.session_state.messages.append({"role": "user", "content": prompt}) # Display user message in chat with st.chat_message("user"): st.markdown(prompt) # Generate and display assistant response with st.chat_message("assistant"): message_placeholder = st.empty() message_placeholder.markdown("Thinking...") try: response = agent.query(prompt) answer = response["answer"] sources = response["sources"] # Update the placeholder with the response message_placeholder.markdown(answer) # Add assistant response to chat history st.session_state.messages.append({"role": "assistant", "content": answer}) # Save conversation to history manager chat_history_manager.save_conversation({ "user_query": prompt, "assistant_response": answer, "sources": [s["source"] for s in sources] if sources else [], "timestamp": datetime.now().isoformat() }) # Display sources if available if sources: with st.expander("Sources"): st.markdown(format_sources(sources)) # Add to agent's memory agent.add_conversation_to_memory(prompt, answer) except Exception as e: logger.error(f"Error generating response: {e}") error_message = f"I'm sorry, I encountered an error: {str(e)}" message_placeholder.markdown(error_message) st.session_state.messages.append({"role": "assistant", "content": error_message}) # Documents tab (existing functionality) with tabs[1]: if st.session_state.current_tab != "Documents": st.session_state.current_tab = "Documents" st.header("Upload & Manage Documents") col1, col2 = st.columns(2) with col1: st.subheader("Upload a File") # Show supported file types info with st.expander("Supported File Types"): st.markdown(""" - **PDF** (.pdf) - Best for formatted documents - **Text** (.txt) - Simple text files - **CSV** (.csv) - Structured data - **Word** (.doc, .docx) - Microsoft Word documents - **Markdown** (.md) - Formatted text - **HTML** (.html, .htm) - Web pages Other file types may work but are not fully supported. """) uploaded_file = st.file_uploader("Choose a file", type=["pdf", "txt", "csv", "doc", "docx", "md", "html", "htm", "xml", "json"]) if uploaded_file is not None: # Display file info file_details = { "Filename": uploaded_file.name, "File size": f"{uploaded_file.size / 1024:.1f} KB", "File type": uploaded_file.type } st.json(file_details) # Handle the uploaded file if st.button("Process Document"): with st.spinner("Processing document..."): status_placeholder = st.empty() status_placeholder.info("Starting document processing...") try: # Create a temporary file with proper error handling status_placeholder.info("Creating temporary file...") temp_dir = tempfile.gettempdir() temp_path = os.path.join(temp_dir, uploaded_file.name) logger.info(f"Saving uploaded file to temporary path: {temp_path}") # Write the file data to the temporary file with open(temp_path, "wb") as temp_file: temp_file.write(uploaded_file.getvalue()) # Get a path to store the document permanently status_placeholder.info("Preparing document storage location...") doc_path = get_document_path(uploaded_file.name) # Copy the file to the documents directory logger.info(f"Copying file to documents directory: {doc_path}") copy_success = copy_uploaded_file(temp_path, doc_path) if not copy_success: logger.warning("Using temporary file path instead of documents directory") doc_path = temp_path status_placeholder.warning("Using temporary storage (document won't be permanently saved)") # Ingest the document with retry logic for 403 errors status_placeholder.info("Analyzing and indexing document content...") progress_bar = st.progress(0) max_retries = 3 for attempt in range(max_retries): try: progress_bar.progress((attempt * 30) / 100) # Show progress as we attempt ids = document_processor.ingest_file(temp_path, {"original_name": uploaded_file.name}) progress_bar.progress(100) break except Exception as e: error_str = str(e).lower() if ("403" in error_str or "forbidden" in error_str or "permission" in error_str) and attempt < max_retries - 1: status_placeholder.warning(f"Permission error ({attempt+1}/{max_retries}), retrying...") logger.warning(f"Permission error ({attempt+1}/{max_retries}), retrying...") time.sleep(1.5) # Add delay between retries elif attempt < max_retries - 1: # General retry for any error status_placeholder.warning(f"Error ({attempt+1}/{max_retries}), retrying...") logger.warning(f"Error during ingestion ({attempt+1}/{max_retries}): {e}") time.sleep(1.5) else: raise # Re-raise on last attempt # Clean up the temporary file if different from doc_path if temp_path != doc_path and os.path.exists(temp_path): try: os.unlink(temp_path) logger.info(f"Temporary file removed: {temp_path}") except Exception as e: logger.warning(f"Could not remove temporary file: {e}") # Check if ingestion was successful based on IDs if ids and not all(str(id).startswith("error-") for id in ids): status_placeholder.success(f"✅ Document processed successfully!") st.balloons() # Celebrate success else: status_placeholder.warning("⚠️ Document processed with warnings. Some content may not be fully indexed.") except Exception as e: progress_bar = st.progress(100) if 'progress_bar' in locals() else st.progress(0) logger.error(f"Error processing document: {str(e)}") status_placeholder.error(f"❌ Error processing document: {str(e)}") if "403" in str(e) or "forbidden" in str(e).lower(): st.warning("This appears to be a permissions issue. Try using a different file format or using the text input option instead.") elif "unsupported" in str(e).lower() or "not supported" in str(e).lower() or "no specific loader" in str(e).lower(): st.warning("This file format may not be supported. Try converting to PDF or TXT first.") with col2: st.subheader("Add Text Directly") # Text input for adding content directly text_content = st.text_area("Enter text to add to your knowledge base:", height=200) text_title = st.text_input("Give this text a title:") if st.button("Process Text") and text_content and text_title: with st.spinner("Processing text..."): status_placeholder = st.empty() status_placeholder.info("Processing your text...") try: # Process the text content metadata = {"title": text_title, "source": "direct_input"} ids = document_processor.ingest_text(text_content, metadata) if ids: status_placeholder.success("✅ Text processed successfully!") else: status_placeholder.warning("⚠️ Text processed with warnings.") except Exception as e: logger.error(f"Error processing text: {str(e)}") status_placeholder.error(f"❌ Error processing text: {str(e)}") # History tab (new) with tabs[2]: if st.session_state.current_tab != "History": st.session_state.current_tab = "History" st.header("Chat History") # Search and filtering options col1, col2, col3 = st.columns([2, 1, 1]) with col1: search_query = st.text_input("Search conversations:", st.session_state.history_filter) if search_query != st.session_state.history_filter: st.session_state.history_filter = search_query with col2: st.text("Date Range (optional)") start_date = st.date_input("Start date", None) with col3: st.text("\u00A0") # Non-breaking space for alignment end_date = st.date_input("End date", None) # Load and filter history try: history = chat_history_manager.load_history() # Apply search filter if provided if search_query: history = chat_history_manager.search_conversations(search_query) # Apply date filtering if provided if start_date or end_date: # Convert datetime.date to datetime.datetime for filtering start_datetime = datetime.combine(start_date, datetime.min.time()) if start_date else None end_datetime = datetime.combine(end_date, datetime.max.time()) if end_date else None history = chat_history_manager.get_conversations_by_date(start_datetime, end_datetime) # Display history if not history: st.info("No conversation history found matching your criteria.") else: # Sort by timestamp (newest first) history.sort(key=lambda x: x.get("timestamp", ""), reverse=True) # Create a DataFrame for display df = pd.DataFrame(history) if not df.empty: # Select and rename columns for display if all(col in df.columns for col in ["timestamp", "user_query", "assistant_response"]): display_df = df[["timestamp", "user_query", "assistant_response"]] display_df = display_df.rename(columns={ "timestamp": "Date", "user_query": "Your Question", "assistant_response": "AI Response" }) # Format timestamp if "Date" in display_df.columns: display_df["Date"] = pd.to_datetime(display_df["Date"]).dt.strftime('%Y-%m-%d %H:%M') # Truncate long text for col in ["Your Question", "AI Response"]: if col in display_df.columns: display_df[col] = display_df[col].apply(lambda x: x[:100] + "..." if isinstance(x, str) and len(x) > 100 else x) # Display as table st.dataframe(display_df, use_container_width=True) # Add option to view full conversation if not df.empty: selected_idx = st.selectbox("Select conversation to view details:", range(len(df)), format_func=lambda i: f"{df.iloc[i].get('timestamp', 'Unknown')} - {df.iloc[i].get('user_query', '')[:30]}...") if selected_idx is not None: selected_conv = df.iloc[selected_idx] st.subheader("Conversation Details") st.markdown("**Your Question:**") st.markdown(selected_conv.get("user_query", "")) st.markdown("**AI Response:**") st.markdown(selected_conv.get("assistant_response", "")) # Display sources if available if "sources" in selected_conv and selected_conv["sources"]: st.markdown("**Sources:**") for src in selected_conv["sources"]: st.markdown(f"- {src}") # Option to use this conversation in chat if st.button("Continue this conversation"): # Add to current chat session st.session_state.messages.append({"role": "user", "content": selected_conv.get("user_query", "")}) st.session_state.messages.append({"role": "assistant", "content": selected_conv.get("assistant_response", "")}) # Switch to chat tab st.session_state.current_tab = "Chat" st.experimental_rerun() else: st.error("Unexpected history format. Some columns are missing.") else: st.info("No conversation history found.") except Exception as e: logger.error(f"Error displaying history: {e}") st.error(f"Error loading conversation history: {str(e)}") # Sync to Hugging Face Hub button if HF_DATASET_NAME: if st.button("Sync History to Hugging Face Hub"): with st.spinner("Syncing history..."): success = chat_history_manager.sync_to_hub() if success: st.success("History successfully synced to Hugging Face Hub!") else: st.error("Failed to sync history. Check logs for details.") # Settings tab (new) with tabs[3]: if st.session_state.current_tab != "Settings": st.session_state.current_tab = "Settings" st.header("Settings") # System information st.subheader("System Information") system_info = { "LLM Model": LLM_MODEL, "Embedding Model": EMBEDDING_MODEL, "HF Dataset": HF_DATASET_NAME or "Not configured", "Telegram Enabled": "Yes" if TELEGRAM_ENABLED else "No" } for key, value in system_info.items(): st.markdown(f"**{key}:** {value}") # Telegram settings st.subheader("Telegram Integration") telegram_status = "Not configured" if telegram_bot: telegram_status = st.session_state.telegram_status st.markdown(f"**Status:** {telegram_status}") col1, col2 = st.columns(2) with col1: if telegram_bot and st.session_state.telegram_status != "Running": if st.button("Start Telegram Bot"): try: success = telegram_bot.start() if success: st.session_state.telegram_status = "Running" st.success("Telegram bot started!") else: st.error("Failed to start Telegram bot. Check logs for details.") except Exception as e: logger.error(f"Error starting Telegram bot: {e}") st.error(f"Error: {str(e)}") with col2: if telegram_bot and st.session_state.telegram_status == "Running": if st.button("Stop Telegram Bot"): try: telegram_bot.stop() st.session_state.telegram_status = "Stopped" st.info("Telegram bot stopped.") except Exception as e: logger.error(f"Error stopping Telegram bot: {e}") st.error(f"Error: {str(e)}") if telegram_bot: with st.expander("Telegram Bot Settings"): st.markdown(""" To configure the Telegram bot, set these environment variables: - `TELEGRAM_ENABLED`: Set to `true` to enable the bot - `TELEGRAM_BOT_TOKEN`: Your Telegram bot token - `TELEGRAM_ALLOWED_USERS`: Comma-separated list of Telegram user IDs (optional) """) if telegram_bot.allowed_user_ids: st.markdown("**Allowed User IDs:**") for user_id in telegram_bot.allowed_user_ids: st.markdown(f"- {user_id}") else: st.markdown("The bot will respond to all users (no user restrictions configured).") # Show Telegram bot instructions st.markdown("### Telegram Bot Commands") st.markdown(""" - **/start**: Start a conversation with the bot - **/help**: Shows available commands - **/search**: Use `/search your query` to search your knowledge base - **Direct messages**: Send any message to chat with your second brain #### How to Set Up Your Telegram Bot 1. Talk to [@BotFather](https://t.me/botfather) on Telegram 2. Use the `/newbot` command to create a new bot 3. Get your bot token and add it to your `.env` file 4. Set `TELEGRAM_ENABLED=true` in your `.env` file 5. To find your Telegram user ID, talk to [@userinfobot](https://t.me/userinfobot) """) else: st.info("Telegram integration is not enabled. Configure your .env file to enable it.") # Settings for Hugging Face Dataset persistence st.subheader("Hugging Face Dataset Settings") if HF_DATASET_NAME: st.markdown(f"**Dataset Name:** {HF_DATASET_NAME}") st.markdown(f"**Local History File:** {chat_history_manager.local_file}") # HF Dataset instructions with st.expander("Setup Instructions"): st.markdown(""" ### Setting up Hugging Face Dataset Persistence 1. Create a private dataset repository on Hugging Face Hub 2. Set your API token in the `.env` file as `HF_API_KEY` 3. Set your dataset name as `HF_DATASET_NAME` (format: username/repo-name) Your chat history will be automatically synced to the Hub. """) else: st.info("Hugging Face Dataset persistence is not configured. Set HF_DATASET_NAME in your .env file.") # Run Telegram bot on startup if enabled if telegram_bot and TELEGRAM_ENABLED and st.session_state.telegram_status == "Not started": try: success = telegram_bot.start() if success: st.session_state.telegram_status = "Running" logger.info("Telegram bot started automatically") except Exception as e: logger.error(f"Error auto-starting Telegram bot: {e}") st.session_state.telegram_status = "Error" if __name__ == "__main__": # This is used when running the file directly pass