import os import logging import threading import asyncio from typing import Dict, List, Any import time from datetime import datetime from telegram import Update, Bot from telegram.ext import Application, CommandHandler, MessageHandler, ContextTypes, filters # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class TelegramBot: """ Telegram bot integration for the AI second brain. Handles message ingestion, responses, and synchronization with the main app. """ def __init__(self, agent, token=None, allowed_user_ids=None): """ Initialize the Telegram bot. Args: agent: The AssistantAgent instance to use for processing queries token: Telegram bot token (defaults to environment variable) allowed_user_ids: List of Telegram user IDs that can use the bot (None for all) """ self.agent = agent self.token = token or os.getenv("TELEGRAM_BOT_TOKEN") self.allowed_user_ids = allowed_user_ids or [] if isinstance(self.allowed_user_ids, str): # Convert comma-separated string to list of integers self.allowed_user_ids = [int(uid.strip()) for uid in self.allowed_user_ids.split(',') if uid.strip()] self.message_history = [] # Initialize bot application self.application = None self.bot_thread = None logger.info("Telegram bot initialized") async def start_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE): """Handle the /start command.""" user_name = update.message.from_user.first_name await update.message.reply_text( f"Hello {user_name}! I'm your AI Second Brain assistant. Ask me anything or use /help to see available commands." ) async def help_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE): """Handle the /help command.""" help_text = """ *AI Second Brain Commands* - Just send me a message with your question - /search query - Search your knowledge base - /help - Show this help message """ await update.message.reply_text(help_text, parse_mode='Markdown') async def search_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE): """Handle the /search command.""" # Check if user is allowed if self.allowed_user_ids and update.message.from_user.id not in self.allowed_user_ids: await update.message.reply_text("You're not authorized to use this bot.") return query = ' '.join(context.args) if not query: await update.message.reply_text("Please provide a search query: /search your query here") return # Show typing status await context.bot.send_chat_action(chat_id=update.effective_chat.id, action="typing") # Process the query try: response = await self.process_query(query) # Send the response await update.message.reply_text(response["answer"]) # If there are sources, send them in a followup message if response["sources"]: sources_text = "*Sources:*\n" + "\n".join([ f"- {s['file_name']} ({s['source']})" for s in response["sources"] ]) await update.message.reply_text(sources_text, parse_mode='Markdown') except Exception as e: logger.error(f"Error processing search: {e}") await update.message.reply_text(f"Error processing your search: {str(e)}") async def handle_message(self, update: Update, context: ContextTypes.DEFAULT_TYPE): """Handle normal messages.""" # Check if user is allowed if self.allowed_user_ids and update.message.from_user.id not in self.allowed_user_ids: await update.message.reply_text("You're not authorized to use this bot.") return # Get the message text query = update.message.text # Show typing status await context.bot.send_chat_action(chat_id=update.effective_chat.id, action="typing") # Process the query try: # Process the message response = await self.process_query(query) # Store in message history self.message_history.append({ "user": update.message.from_user.username or str(update.message.from_user.id), "user_id": update.message.from_user.id, "query": query, "response": response["answer"], "timestamp": datetime.now().isoformat(), "chat_id": update.effective_chat.id }) # Send the response await update.message.reply_text(response["answer"]) # If there are sources, send them in a followup message if response["sources"]: sources_text = "*Sources:*\n" + "\n".join([ f"- {s['file_name']} ({s['source']})" for s in response["sources"] ]) await update.message.reply_text(sources_text, parse_mode='Markdown') except Exception as e: logger.error(f"Error processing message: {e}") await update.message.reply_text(f"I encountered an error: {str(e)}") async def error_handler(self, update, context): """Handle errors.""" logger.error(f"Error: {context.error} - caused by update {update}") # Send a message to the user if update and update.effective_chat: await context.bot.send_message( chat_id=update.effective_chat.id, text="Sorry, an error occurred while processing your message." ) async def process_query(self, query): """Process a query using the agent and return a response.""" # Create a new event loop for the thread loop = asyncio.get_event_loop() # Run the query in a separate thread to avoid blocking def run_query(): return self.agent.query(query) # Execute the query response = await loop.run_in_executor(None, run_query) # Add the conversation to the agent's memory if "answer" in response: def add_to_memory(): self.agent.add_conversation_to_memory(query, response["answer"]) await loop.run_in_executor(None, add_to_memory) return response def setup_application(self): """Set up the Telegram application with handlers.""" # Create the Application self.application = Application.builder().token(self.token).build() # Add command handlers self.application.add_handler(CommandHandler("start", self.start_command)) self.application.add_handler(CommandHandler("help", self.help_command)) self.application.add_handler(CommandHandler("search", self.search_command)) # Add message handler self.application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, self.handle_message)) # Add error handler self.application.add_error_handler(self.error_handler) logger.info("Telegram application set up successfully") def start(self): """Start the Telegram bot in a separate thread.""" if not self.token: logger.error("Telegram bot token not found") return False try: # Set up the application self.setup_application() # Run the bot in a separate thread def run_bot(): asyncio.set_event_loop(asyncio.new_event_loop()) self.application.run_polling(stop_signals=None) self.bot_thread = threading.Thread(target=run_bot, daemon=True) self.bot_thread.start() logger.info("Telegram bot started in background thread") return True except Exception as e: logger.error(f"Error starting Telegram bot: {e}") return False def stop(self): """Stop the Telegram bot.""" if self.application: logger.info("Stopping Telegram bot...") async def stop_app(): await self.application.stop() await self.application.shutdown() # Run the stop function in a new event loop loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: loop.run_until_complete(stop_app()) finally: loop.close() logger.info("Telegram bot stopped") return True return False def get_message_history(self): """Get the message history.""" return self.message_history