2B / app /core /telegram_bot.py
37-AN
Update for Hugging Face Space deployment
2a735cc
import os
import logging
import threading
import asyncio
from typing import Dict, List, Any
import time
from datetime import datetime
from telegram import Update, Bot
from telegram.ext import Application, CommandHandler, MessageHandler, ContextTypes, filters
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class TelegramBot:
"""
Telegram bot integration for the AI second brain.
Handles message ingestion, responses, and synchronization with the main app.
"""
def __init__(self, agent, token=None, allowed_user_ids=None):
"""
Initialize the Telegram bot.
Args:
agent: The AssistantAgent instance to use for processing queries
token: Telegram bot token (defaults to environment variable)
allowed_user_ids: List of Telegram user IDs that can use the bot (None for all)
"""
self.agent = agent
self.token = token or os.getenv("TELEGRAM_BOT_TOKEN")
self.allowed_user_ids = allowed_user_ids or []
if isinstance(self.allowed_user_ids, str):
# Convert comma-separated string to list of integers
self.allowed_user_ids = [int(uid.strip()) for uid in self.allowed_user_ids.split(',') if uid.strip()]
self.message_history = []
# Initialize bot application
self.application = None
self.bot_thread = None
logger.info("Telegram bot initialized")
async def start_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Handle the /start command."""
user_name = update.message.from_user.first_name
await update.message.reply_text(
f"Hello {user_name}! I'm your AI Second Brain assistant. Ask me anything or use /help to see available commands."
)
async def help_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Handle the /help command."""
help_text = """
*AI Second Brain Commands*
- Just send me a message with your question
- /search query - Search your knowledge base
- /help - Show this help message
"""
await update.message.reply_text(help_text, parse_mode='Markdown')
async def search_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Handle the /search command."""
# Check if user is allowed
if self.allowed_user_ids and update.message.from_user.id not in self.allowed_user_ids:
await update.message.reply_text("You're not authorized to use this bot.")
return
query = ' '.join(context.args)
if not query:
await update.message.reply_text("Please provide a search query: /search your query here")
return
# Show typing status
await context.bot.send_chat_action(chat_id=update.effective_chat.id, action="typing")
# Process the query
try:
response = await self.process_query(query)
# Send the response
await update.message.reply_text(response["answer"])
# If there are sources, send them in a followup message
if response["sources"]:
sources_text = "*Sources:*\n" + "\n".join([
f"- {s['file_name']} ({s['source']})"
for s in response["sources"]
])
await update.message.reply_text(sources_text, parse_mode='Markdown')
except Exception as e:
logger.error(f"Error processing search: {e}")
await update.message.reply_text(f"Error processing your search: {str(e)}")
async def handle_message(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Handle normal messages."""
# Check if user is allowed
if self.allowed_user_ids and update.message.from_user.id not in self.allowed_user_ids:
await update.message.reply_text("You're not authorized to use this bot.")
return
# Get the message text
query = update.message.text
# Show typing status
await context.bot.send_chat_action(chat_id=update.effective_chat.id, action="typing")
# Process the query
try:
# Process the message
response = await self.process_query(query)
# Store in message history
self.message_history.append({
"user": update.message.from_user.username or str(update.message.from_user.id),
"user_id": update.message.from_user.id,
"query": query,
"response": response["answer"],
"timestamp": datetime.now().isoformat(),
"chat_id": update.effective_chat.id
})
# Send the response
await update.message.reply_text(response["answer"])
# If there are sources, send them in a followup message
if response["sources"]:
sources_text = "*Sources:*\n" + "\n".join([
f"- {s['file_name']} ({s['source']})"
for s in response["sources"]
])
await update.message.reply_text(sources_text, parse_mode='Markdown')
except Exception as e:
logger.error(f"Error processing message: {e}")
await update.message.reply_text(f"I encountered an error: {str(e)}")
async def error_handler(self, update, context):
"""Handle errors."""
logger.error(f"Error: {context.error} - caused by update {update}")
# Send a message to the user
if update and update.effective_chat:
await context.bot.send_message(
chat_id=update.effective_chat.id,
text="Sorry, an error occurred while processing your message."
)
async def process_query(self, query):
"""Process a query using the agent and return a response."""
# Create a new event loop for the thread
loop = asyncio.get_event_loop()
# Run the query in a separate thread to avoid blocking
def run_query():
return self.agent.query(query)
# Execute the query
response = await loop.run_in_executor(None, run_query)
# Add the conversation to the agent's memory
if "answer" in response:
def add_to_memory():
self.agent.add_conversation_to_memory(query, response["answer"])
await loop.run_in_executor(None, add_to_memory)
return response
def setup_application(self):
"""Set up the Telegram application with handlers."""
# Create the Application
self.application = Application.builder().token(self.token).build()
# Add command handlers
self.application.add_handler(CommandHandler("start", self.start_command))
self.application.add_handler(CommandHandler("help", self.help_command))
self.application.add_handler(CommandHandler("search", self.search_command))
# Add message handler
self.application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, self.handle_message))
# Add error handler
self.application.add_error_handler(self.error_handler)
logger.info("Telegram application set up successfully")
def start(self):
"""Start the Telegram bot in a separate thread."""
if not self.token:
logger.error("Telegram bot token not found")
return False
try:
# Set up the application
self.setup_application()
# Run the bot in a separate thread
def run_bot():
asyncio.set_event_loop(asyncio.new_event_loop())
self.application.run_polling(stop_signals=None)
self.bot_thread = threading.Thread(target=run_bot, daemon=True)
self.bot_thread.start()
logger.info("Telegram bot started in background thread")
return True
except Exception as e:
logger.error(f"Error starting Telegram bot: {e}")
return False
def stop(self):
"""Stop the Telegram bot."""
if self.application:
logger.info("Stopping Telegram bot...")
async def stop_app():
await self.application.stop()
await self.application.shutdown()
# Run the stop function in a new event loop
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_until_complete(stop_app())
finally:
loop.close()
logger.info("Telegram bot stopped")
return True
return False
def get_message_history(self):
"""Get the message history."""
return self.message_history