File size: 9,223 Bytes
2a735cc |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 |
import os
import logging
import threading
import asyncio
from typing import Dict, List, Any
import time
from datetime import datetime
from telegram import Update, Bot
from telegram.ext import Application, CommandHandler, MessageHandler, ContextTypes, filters
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class TelegramBot:
"""
Telegram bot integration for the AI second brain.
Handles message ingestion, responses, and synchronization with the main app.
"""
def __init__(self, agent, token=None, allowed_user_ids=None):
"""
Initialize the Telegram bot.
Args:
agent: The AssistantAgent instance to use for processing queries
token: Telegram bot token (defaults to environment variable)
allowed_user_ids: List of Telegram user IDs that can use the bot (None for all)
"""
self.agent = agent
self.token = token or os.getenv("TELEGRAM_BOT_TOKEN")
self.allowed_user_ids = allowed_user_ids or []
if isinstance(self.allowed_user_ids, str):
# Convert comma-separated string to list of integers
self.allowed_user_ids = [int(uid.strip()) for uid in self.allowed_user_ids.split(',') if uid.strip()]
self.message_history = []
# Initialize bot application
self.application = None
self.bot_thread = None
logger.info("Telegram bot initialized")
async def start_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Handle the /start command."""
user_name = update.message.from_user.first_name
await update.message.reply_text(
f"Hello {user_name}! I'm your AI Second Brain assistant. Ask me anything or use /help to see available commands."
)
async def help_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Handle the /help command."""
help_text = """
*AI Second Brain Commands*
- Just send me a message with your question
- /search query - Search your knowledge base
- /help - Show this help message
"""
await update.message.reply_text(help_text, parse_mode='Markdown')
async def search_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Handle the /search command."""
# Check if user is allowed
if self.allowed_user_ids and update.message.from_user.id not in self.allowed_user_ids:
await update.message.reply_text("You're not authorized to use this bot.")
return
query = ' '.join(context.args)
if not query:
await update.message.reply_text("Please provide a search query: /search your query here")
return
# Show typing status
await context.bot.send_chat_action(chat_id=update.effective_chat.id, action="typing")
# Process the query
try:
response = await self.process_query(query)
# Send the response
await update.message.reply_text(response["answer"])
# If there are sources, send them in a followup message
if response["sources"]:
sources_text = "*Sources:*\n" + "\n".join([
f"- {s['file_name']} ({s['source']})"
for s in response["sources"]
])
await update.message.reply_text(sources_text, parse_mode='Markdown')
except Exception as e:
logger.error(f"Error processing search: {e}")
await update.message.reply_text(f"Error processing your search: {str(e)}")
async def handle_message(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Handle normal messages."""
# Check if user is allowed
if self.allowed_user_ids and update.message.from_user.id not in self.allowed_user_ids:
await update.message.reply_text("You're not authorized to use this bot.")
return
# Get the message text
query = update.message.text
# Show typing status
await context.bot.send_chat_action(chat_id=update.effective_chat.id, action="typing")
# Process the query
try:
# Process the message
response = await self.process_query(query)
# Store in message history
self.message_history.append({
"user": update.message.from_user.username or str(update.message.from_user.id),
"user_id": update.message.from_user.id,
"query": query,
"response": response["answer"],
"timestamp": datetime.now().isoformat(),
"chat_id": update.effective_chat.id
})
# Send the response
await update.message.reply_text(response["answer"])
# If there are sources, send them in a followup message
if response["sources"]:
sources_text = "*Sources:*\n" + "\n".join([
f"- {s['file_name']} ({s['source']})"
for s in response["sources"]
])
await update.message.reply_text(sources_text, parse_mode='Markdown')
except Exception as e:
logger.error(f"Error processing message: {e}")
await update.message.reply_text(f"I encountered an error: {str(e)}")
async def error_handler(self, update, context):
"""Handle errors."""
logger.error(f"Error: {context.error} - caused by update {update}")
# Send a message to the user
if update and update.effective_chat:
await context.bot.send_message(
chat_id=update.effective_chat.id,
text="Sorry, an error occurred while processing your message."
)
async def process_query(self, query):
"""Process a query using the agent and return a response."""
# Create a new event loop for the thread
loop = asyncio.get_event_loop()
# Run the query in a separate thread to avoid blocking
def run_query():
return self.agent.query(query)
# Execute the query
response = await loop.run_in_executor(None, run_query)
# Add the conversation to the agent's memory
if "answer" in response:
def add_to_memory():
self.agent.add_conversation_to_memory(query, response["answer"])
await loop.run_in_executor(None, add_to_memory)
return response
def setup_application(self):
"""Set up the Telegram application with handlers."""
# Create the Application
self.application = Application.builder().token(self.token).build()
# Add command handlers
self.application.add_handler(CommandHandler("start", self.start_command))
self.application.add_handler(CommandHandler("help", self.help_command))
self.application.add_handler(CommandHandler("search", self.search_command))
# Add message handler
self.application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, self.handle_message))
# Add error handler
self.application.add_error_handler(self.error_handler)
logger.info("Telegram application set up successfully")
def start(self):
"""Start the Telegram bot in a separate thread."""
if not self.token:
logger.error("Telegram bot token not found")
return False
try:
# Set up the application
self.setup_application()
# Run the bot in a separate thread
def run_bot():
asyncio.set_event_loop(asyncio.new_event_loop())
self.application.run_polling(stop_signals=None)
self.bot_thread = threading.Thread(target=run_bot, daemon=True)
self.bot_thread.start()
logger.info("Telegram bot started in background thread")
return True
except Exception as e:
logger.error(f"Error starting Telegram bot: {e}")
return False
def stop(self):
"""Stop the Telegram bot."""
if self.application:
logger.info("Stopping Telegram bot...")
async def stop_app():
await self.application.stop()
await self.application.shutdown()
# Run the stop function in a new event loop
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_until_complete(stop_app())
finally:
loop.close()
logger.info("Telegram bot stopped")
return True
return False
def get_message_history(self):
"""Get the message history."""
return self.message_history |