|
import streamlit as st |
|
import os |
|
import sys |
|
import tempfile |
|
from datetime import datetime |
|
import pandas as pd |
|
from typing import List, Dict, Any |
|
import time |
|
import logging |
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))) |
|
|
|
|
|
try: |
|
from app.core.agent import AssistantAgent |
|
from app.core.ingestion import DocumentProcessor |
|
from app.core.telegram_bot import TelegramBot |
|
from app.core.chat_history import ChatHistoryManager |
|
from app.utils.helpers import get_document_path, format_sources, save_conversation, copy_uploaded_file |
|
from app.config import ( |
|
LLM_MODEL, EMBEDDING_MODEL, TELEGRAM_ENABLED, |
|
TELEGRAM_BOT_TOKEN, TELEGRAM_ALLOWED_USERS, |
|
HF_DATASET_NAME |
|
) |
|
except ImportError: |
|
|
|
sys.path.append(os.path.abspath('.')) |
|
from app.core.agent import AssistantAgent |
|
from app.core.ingestion import DocumentProcessor |
|
from app.core.telegram_bot import TelegramBot |
|
from app.core.chat_history import ChatHistoryManager |
|
from app.utils.helpers import get_document_path, format_sources, save_conversation, copy_uploaded_file |
|
from app.config import ( |
|
LLM_MODEL, EMBEDDING_MODEL, TELEGRAM_ENABLED, |
|
TELEGRAM_BOT_TOKEN, TELEGRAM_ALLOWED_USERS, |
|
HF_DATASET_NAME |
|
) |
|
|
|
|
|
st.set_page_config( |
|
page_title="Personal AI Second Brain", |
|
page_icon="🧠", |
|
layout="wide" |
|
) |
|
|
|
|
|
@st.cache_resource |
|
def get_agent(): |
|
logger.info("Initializing AssistantAgent (should only happen once)") |
|
try: |
|
return AssistantAgent() |
|
except Exception as e: |
|
logger.error(f"Error initializing agent: {e}") |
|
st.error(f"Could not initialize AI assistant: {str(e)}") |
|
|
|
class DummyAgent: |
|
def query(self, question): |
|
return { |
|
"answer": "I'm having trouble starting up. Please try refreshing the page.", |
|
"sources": [] |
|
} |
|
def add_conversation_to_memory(self, *args, **kwargs): |
|
pass |
|
return DummyAgent() |
|
|
|
|
|
@st.cache_resource |
|
def get_document_processor(_agent): |
|
"""Initialize document processor with unhashable agent parameter. |
|
The leading underscore in _agent tells Streamlit not to hash this parameter. |
|
""" |
|
logger.info("Initializing DocumentProcessor (should only happen once)") |
|
try: |
|
return DocumentProcessor(_agent.memory_manager) |
|
except Exception as e: |
|
logger.error(f"Error initializing document processor: {e}") |
|
st.error(f"Could not initialize document processor: {str(e)}") |
|
|
|
class DummyProcessor: |
|
def ingest_file(self, *args, **kwargs): |
|
return ["dummy-id"] |
|
def ingest_text(self, *args, **kwargs): |
|
return ["dummy-id"] |
|
return DummyProcessor() |
|
|
|
|
|
@st.cache_resource |
|
def get_chat_history_manager(): |
|
logger.info("Initializing ChatHistoryManager") |
|
try: |
|
return ChatHistoryManager(dataset_name=HF_DATASET_NAME) |
|
except Exception as e: |
|
logger.error(f"Error initializing chat history manager: {e}") |
|
st.error(f"Could not initialize chat history: {str(e)}") |
|
|
|
class DummyHistoryManager: |
|
def load_history(self, *args, **kwargs): |
|
return [] |
|
def save_conversation(self, *args, **kwargs): |
|
return True |
|
def sync_to_hub(self, *args, **kwargs): |
|
return False |
|
return DummyHistoryManager() |
|
|
|
|
|
@st.cache_resource |
|
def get_telegram_bot(_agent): |
|
"""Initialize Telegram bot with unhashable agent parameter.""" |
|
if not TELEGRAM_ENABLED or not TELEGRAM_BOT_TOKEN: |
|
logger.info("Telegram bot disabled or token missing") |
|
return None |
|
|
|
logger.info("Initializing Telegram bot") |
|
try: |
|
bot = TelegramBot( |
|
agent=_agent, |
|
token=TELEGRAM_BOT_TOKEN, |
|
allowed_user_ids=TELEGRAM_ALLOWED_USERS |
|
) |
|
return bot |
|
except Exception as e: |
|
logger.error(f"Error initializing Telegram bot: {e}") |
|
return None |
|
|
|
|
|
if "messages" not in st.session_state: |
|
st.session_state.messages = [] |
|
if "telegram_status" not in st.session_state: |
|
st.session_state.telegram_status = "Not started" |
|
if "history_filter" not in st.session_state: |
|
st.session_state.history_filter = "" |
|
if "current_tab" not in st.session_state: |
|
st.session_state.current_tab = "Chat" |
|
|
|
|
|
agent = get_agent() |
|
document_processor = get_document_processor(agent) |
|
chat_history_manager = get_chat_history_manager() |
|
telegram_bot = get_telegram_bot(agent) |
|
|
|
|
|
if not st.session_state.messages: |
|
try: |
|
recent_history = chat_history_manager.load_history() |
|
|
|
for conv in recent_history[-10:]: |
|
if "user_query" in conv and "assistant_response" in conv: |
|
st.session_state.messages.append({"role": "user", "content": conv["user_query"]}) |
|
st.session_state.messages.append({"role": "assistant", "content": conv["assistant_response"]}) |
|
except Exception as e: |
|
logger.error(f"Error loading initial history: {e}") |
|
|
|
|
|
st.title("🧠 Personal AI Second Brain") |
|
|
|
|
|
tabs = st.tabs(["Chat", "Documents", "History", "Settings"]) |
|
|
|
|
|
with tabs[0]: |
|
if st.session_state.current_tab != "Chat": |
|
st.session_state.current_tab = "Chat" |
|
|
|
|
|
for message in st.session_state.messages: |
|
with st.chat_message(message["role"]): |
|
st.markdown(message["content"]) |
|
|
|
|
|
if prompt := st.chat_input("Ask me anything..."): |
|
|
|
st.session_state.messages.append({"role": "user", "content": prompt}) |
|
|
|
|
|
with st.chat_message("user"): |
|
st.markdown(prompt) |
|
|
|
|
|
with st.chat_message("assistant"): |
|
message_placeholder = st.empty() |
|
message_placeholder.markdown("Thinking...") |
|
|
|
try: |
|
response = agent.query(prompt) |
|
answer = response["answer"] |
|
sources = response["sources"] |
|
|
|
|
|
message_placeholder.markdown(answer) |
|
|
|
|
|
st.session_state.messages.append({"role": "assistant", "content": answer}) |
|
|
|
|
|
chat_history_manager.save_conversation({ |
|
"user_query": prompt, |
|
"assistant_response": answer, |
|
"sources": [s["source"] for s in sources] if sources else [], |
|
"timestamp": datetime.now().isoformat() |
|
}) |
|
|
|
|
|
if sources: |
|
with st.expander("Sources"): |
|
st.markdown(format_sources(sources)) |
|
|
|
|
|
agent.add_conversation_to_memory(prompt, answer) |
|
|
|
except Exception as e: |
|
logger.error(f"Error generating response: {e}") |
|
error_message = f"I'm sorry, I encountered an error: {str(e)}" |
|
message_placeholder.markdown(error_message) |
|
st.session_state.messages.append({"role": "assistant", "content": error_message}) |
|
|
|
|
|
with tabs[1]: |
|
if st.session_state.current_tab != "Documents": |
|
st.session_state.current_tab = "Documents" |
|
|
|
st.header("Upload & Manage Documents") |
|
|
|
col1, col2 = st.columns(2) |
|
|
|
with col1: |
|
st.subheader("Upload a File") |
|
|
|
|
|
with st.expander("Supported File Types"): |
|
st.markdown(""" |
|
- **PDF** (.pdf) - Best for formatted documents |
|
- **Text** (.txt) - Simple text files |
|
- **CSV** (.csv) - Structured data |
|
- **Word** (.doc, .docx) - Microsoft Word documents |
|
- **Markdown** (.md) - Formatted text |
|
- **HTML** (.html, .htm) - Web pages |
|
|
|
Other file types may work but are not fully supported. |
|
""") |
|
|
|
uploaded_file = st.file_uploader("Choose a file", type=["pdf", "txt", "csv", "doc", "docx", "md", "html", "htm", "xml", "json"]) |
|
|
|
if uploaded_file is not None: |
|
|
|
file_details = { |
|
"Filename": uploaded_file.name, |
|
"File size": f"{uploaded_file.size / 1024:.1f} KB", |
|
"File type": uploaded_file.type |
|
} |
|
|
|
st.json(file_details) |
|
|
|
|
|
if st.button("Process Document"): |
|
with st.spinner("Processing document..."): |
|
status_placeholder = st.empty() |
|
status_placeholder.info("Starting document processing...") |
|
|
|
try: |
|
|
|
status_placeholder.info("Creating temporary file...") |
|
temp_dir = tempfile.gettempdir() |
|
temp_path = os.path.join(temp_dir, uploaded_file.name) |
|
|
|
logger.info(f"Saving uploaded file to temporary path: {temp_path}") |
|
|
|
|
|
with open(temp_path, "wb") as temp_file: |
|
temp_file.write(uploaded_file.getvalue()) |
|
|
|
|
|
status_placeholder.info("Preparing document storage location...") |
|
doc_path = get_document_path(uploaded_file.name) |
|
|
|
|
|
logger.info(f"Copying file to documents directory: {doc_path}") |
|
copy_success = copy_uploaded_file(temp_path, doc_path) |
|
|
|
if not copy_success: |
|
logger.warning("Using temporary file path instead of documents directory") |
|
doc_path = temp_path |
|
status_placeholder.warning("Using temporary storage (document won't be permanently saved)") |
|
|
|
|
|
status_placeholder.info("Analyzing and indexing document content...") |
|
progress_bar = st.progress(0) |
|
max_retries = 3 |
|
|
|
for attempt in range(max_retries): |
|
try: |
|
progress_bar.progress((attempt * 30) / 100) |
|
ids = document_processor.ingest_file(temp_path, {"original_name": uploaded_file.name}) |
|
progress_bar.progress(100) |
|
break |
|
except Exception as e: |
|
error_str = str(e).lower() |
|
if ("403" in error_str or "forbidden" in error_str or "permission" in error_str) and attempt < max_retries - 1: |
|
status_placeholder.warning(f"Permission error ({attempt+1}/{max_retries}), retrying...") |
|
logger.warning(f"Permission error ({attempt+1}/{max_retries}), retrying...") |
|
time.sleep(1.5) |
|
elif attempt < max_retries - 1: |
|
|
|
status_placeholder.warning(f"Error ({attempt+1}/{max_retries}), retrying...") |
|
logger.warning(f"Error during ingestion ({attempt+1}/{max_retries}): {e}") |
|
time.sleep(1.5) |
|
else: |
|
raise |
|
|
|
|
|
if temp_path != doc_path and os.path.exists(temp_path): |
|
try: |
|
os.unlink(temp_path) |
|
logger.info(f"Temporary file removed: {temp_path}") |
|
except Exception as e: |
|
logger.warning(f"Could not remove temporary file: {e}") |
|
|
|
|
|
if ids and not all(str(id).startswith("error-") for id in ids): |
|
status_placeholder.success(f"✅ Document processed successfully!") |
|
st.balloons() |
|
else: |
|
status_placeholder.warning("⚠️ Document processed with warnings. Some content may not be fully indexed.") |
|
|
|
except Exception as e: |
|
progress_bar = st.progress(100) if 'progress_bar' in locals() else st.progress(0) |
|
logger.error(f"Error processing document: {str(e)}") |
|
status_placeholder.error(f"❌ Error processing document: {str(e)}") |
|
|
|
if "403" in str(e) or "forbidden" in str(e).lower(): |
|
st.warning("This appears to be a permissions issue. Try using a different file format or using the text input option instead.") |
|
elif "unsupported" in str(e).lower() or "not supported" in str(e).lower() or "no specific loader" in str(e).lower(): |
|
st.warning("This file format may not be supported. Try converting to PDF or TXT first.") |
|
|
|
with col2: |
|
st.subheader("Add Text Directly") |
|
|
|
|
|
text_content = st.text_area("Enter text to add to your knowledge base:", height=200) |
|
text_title = st.text_input("Give this text a title:") |
|
|
|
if st.button("Process Text") and text_content and text_title: |
|
with st.spinner("Processing text..."): |
|
status_placeholder = st.empty() |
|
status_placeholder.info("Processing your text...") |
|
|
|
try: |
|
|
|
metadata = {"title": text_title, "source": "direct_input"} |
|
ids = document_processor.ingest_text(text_content, metadata) |
|
|
|
if ids: |
|
status_placeholder.success("✅ Text processed successfully!") |
|
else: |
|
status_placeholder.warning("⚠️ Text processed with warnings.") |
|
except Exception as e: |
|
logger.error(f"Error processing text: {str(e)}") |
|
status_placeholder.error(f"❌ Error processing text: {str(e)}") |
|
|
|
|
|
with tabs[2]: |
|
if st.session_state.current_tab != "History": |
|
st.session_state.current_tab = "History" |
|
|
|
st.header("Chat History") |
|
|
|
|
|
col1, col2, col3 = st.columns([2, 1, 1]) |
|
|
|
with col1: |
|
search_query = st.text_input("Search conversations:", st.session_state.history_filter) |
|
if search_query != st.session_state.history_filter: |
|
st.session_state.history_filter = search_query |
|
|
|
with col2: |
|
st.text("Date Range (optional)") |
|
start_date = st.date_input("Start date", None) |
|
|
|
with col3: |
|
st.text("\u00A0") |
|
end_date = st.date_input("End date", None) |
|
|
|
|
|
try: |
|
history = chat_history_manager.load_history() |
|
|
|
|
|
if search_query: |
|
history = chat_history_manager.search_conversations(search_query) |
|
|
|
|
|
if start_date or end_date: |
|
|
|
start_datetime = datetime.combine(start_date, datetime.min.time()) if start_date else None |
|
end_datetime = datetime.combine(end_date, datetime.max.time()) if end_date else None |
|
history = chat_history_manager.get_conversations_by_date(start_datetime, end_datetime) |
|
|
|
|
|
if not history: |
|
st.info("No conversation history found matching your criteria.") |
|
else: |
|
|
|
history.sort(key=lambda x: x.get("timestamp", ""), reverse=True) |
|
|
|
|
|
df = pd.DataFrame(history) |
|
if not df.empty: |
|
|
|
if all(col in df.columns for col in ["timestamp", "user_query", "assistant_response"]): |
|
display_df = df[["timestamp", "user_query", "assistant_response"]] |
|
display_df = display_df.rename(columns={ |
|
"timestamp": "Date", |
|
"user_query": "Your Question", |
|
"assistant_response": "AI Response" |
|
}) |
|
|
|
|
|
if "Date" in display_df.columns: |
|
display_df["Date"] = pd.to_datetime(display_df["Date"]).dt.strftime('%Y-%m-%d %H:%M') |
|
|
|
|
|
for col in ["Your Question", "AI Response"]: |
|
if col in display_df.columns: |
|
display_df[col] = display_df[col].apply(lambda x: x[:100] + "..." if isinstance(x, str) and len(x) > 100 else x) |
|
|
|
|
|
st.dataframe(display_df, use_container_width=True) |
|
|
|
|
|
if not df.empty: |
|
selected_idx = st.selectbox("Select conversation to view details:", |
|
range(len(df)), |
|
format_func=lambda i: f"{df.iloc[i].get('timestamp', 'Unknown')} - {df.iloc[i].get('user_query', '')[:30]}...") |
|
|
|
if selected_idx is not None: |
|
selected_conv = df.iloc[selected_idx] |
|
st.subheader("Conversation Details") |
|
|
|
st.markdown("**Your Question:**") |
|
st.markdown(selected_conv.get("user_query", "")) |
|
|
|
st.markdown("**AI Response:**") |
|
st.markdown(selected_conv.get("assistant_response", "")) |
|
|
|
|
|
if "sources" in selected_conv and selected_conv["sources"]: |
|
st.markdown("**Sources:**") |
|
for src in selected_conv["sources"]: |
|
st.markdown(f"- {src}") |
|
|
|
|
|
if st.button("Continue this conversation"): |
|
|
|
st.session_state.messages.append({"role": "user", "content": selected_conv.get("user_query", "")}) |
|
st.session_state.messages.append({"role": "assistant", "content": selected_conv.get("assistant_response", "")}) |
|
|
|
st.session_state.current_tab = "Chat" |
|
st.experimental_rerun() |
|
else: |
|
st.error("Unexpected history format. Some columns are missing.") |
|
else: |
|
st.info("No conversation history found.") |
|
except Exception as e: |
|
logger.error(f"Error displaying history: {e}") |
|
st.error(f"Error loading conversation history: {str(e)}") |
|
|
|
|
|
if HF_DATASET_NAME: |
|
if st.button("Sync History to Hugging Face Hub"): |
|
with st.spinner("Syncing history..."): |
|
success = chat_history_manager.sync_to_hub() |
|
if success: |
|
st.success("History successfully synced to Hugging Face Hub!") |
|
else: |
|
st.error("Failed to sync history. Check logs for details.") |
|
|
|
|
|
with tabs[3]: |
|
if st.session_state.current_tab != "Settings": |
|
st.session_state.current_tab = "Settings" |
|
|
|
st.header("Settings") |
|
|
|
|
|
st.subheader("System Information") |
|
system_info = { |
|
"LLM Model": LLM_MODEL, |
|
"Embedding Model": EMBEDDING_MODEL, |
|
"HF Dataset": HF_DATASET_NAME or "Not configured", |
|
"Telegram Enabled": "Yes" if TELEGRAM_ENABLED else "No" |
|
} |
|
|
|
for key, value in system_info.items(): |
|
st.markdown(f"**{key}:** {value}") |
|
|
|
|
|
st.subheader("Telegram Integration") |
|
|
|
telegram_status = "Not configured" |
|
if telegram_bot: |
|
telegram_status = st.session_state.telegram_status |
|
|
|
st.markdown(f"**Status:** {telegram_status}") |
|
|
|
col1, col2 = st.columns(2) |
|
|
|
with col1: |
|
if telegram_bot and st.session_state.telegram_status != "Running": |
|
if st.button("Start Telegram Bot"): |
|
try: |
|
success = telegram_bot.start() |
|
if success: |
|
st.session_state.telegram_status = "Running" |
|
st.success("Telegram bot started!") |
|
else: |
|
st.error("Failed to start Telegram bot. Check logs for details.") |
|
except Exception as e: |
|
logger.error(f"Error starting Telegram bot: {e}") |
|
st.error(f"Error: {str(e)}") |
|
|
|
with col2: |
|
if telegram_bot and st.session_state.telegram_status == "Running": |
|
if st.button("Stop Telegram Bot"): |
|
try: |
|
telegram_bot.stop() |
|
st.session_state.telegram_status = "Stopped" |
|
st.info("Telegram bot stopped.") |
|
except Exception as e: |
|
logger.error(f"Error stopping Telegram bot: {e}") |
|
st.error(f"Error: {str(e)}") |
|
|
|
if telegram_bot: |
|
with st.expander("Telegram Bot Settings"): |
|
st.markdown(""" |
|
To configure the Telegram bot, set these environment variables: |
|
- `TELEGRAM_ENABLED`: Set to `true` to enable the bot |
|
- `TELEGRAM_BOT_TOKEN`: Your Telegram bot token |
|
- `TELEGRAM_ALLOWED_USERS`: Comma-separated list of Telegram user IDs (optional) |
|
""") |
|
|
|
if telegram_bot.allowed_user_ids: |
|
st.markdown("**Allowed User IDs:**") |
|
for user_id in telegram_bot.allowed_user_ids: |
|
st.markdown(f"- {user_id}") |
|
else: |
|
st.markdown("The bot will respond to all users (no user restrictions configured).") |
|
|
|
|
|
st.markdown("### Telegram Bot Commands") |
|
st.markdown(""" |
|
- **/start**: Start a conversation with the bot |
|
- **/help**: Shows available commands |
|
- **/search**: Use `/search your query` to search your knowledge base |
|
- **Direct messages**: Send any message to chat with your second brain |
|
|
|
#### How to Set Up Your Telegram Bot |
|
1. Talk to [@BotFather](https://t.me/botfather) on Telegram |
|
2. Use the `/newbot` command to create a new bot |
|
3. Get your bot token and add it to your `.env` file |
|
4. Set `TELEGRAM_ENABLED=true` in your `.env` file |
|
5. To find your Telegram user ID, talk to [@userinfobot](https://t.me/userinfobot) |
|
""") |
|
else: |
|
st.info("Telegram integration is not enabled. Configure your .env file to enable it.") |
|
|
|
|
|
st.subheader("Hugging Face Dataset Settings") |
|
|
|
if HF_DATASET_NAME: |
|
st.markdown(f"**Dataset Name:** {HF_DATASET_NAME}") |
|
st.markdown(f"**Local History File:** {chat_history_manager.local_file}") |
|
|
|
|
|
with st.expander("Setup Instructions"): |
|
st.markdown(""" |
|
### Setting up Hugging Face Dataset Persistence |
|
|
|
1. Create a private dataset repository on Hugging Face Hub |
|
2. Set your API token in the `.env` file as `HF_API_KEY` |
|
3. Set your dataset name as `HF_DATASET_NAME` (format: username/repo-name) |
|
|
|
Your chat history will be automatically synced to the Hub. |
|
""") |
|
else: |
|
st.info("Hugging Face Dataset persistence is not configured. Set HF_DATASET_NAME in your .env file.") |
|
|
|
|
|
if telegram_bot and TELEGRAM_ENABLED and st.session_state.telegram_status == "Not started": |
|
try: |
|
success = telegram_bot.start() |
|
if success: |
|
st.session_state.telegram_status = "Running" |
|
logger.info("Telegram bot started automatically") |
|
except Exception as e: |
|
logger.error(f"Error auto-starting Telegram bot: {e}") |
|
st.session_state.telegram_status = "Error" |
|
|
|
if __name__ == "__main__": |
|
|
|
pass |