|
import os |
|
import logging |
|
import threading |
|
import asyncio |
|
from typing import Dict, List, Any |
|
import time |
|
from datetime import datetime |
|
from telegram import Update, Bot |
|
from telegram.ext import Application, CommandHandler, MessageHandler, ContextTypes, filters |
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
logger = logging.getLogger(__name__) |
|
|
|
class TelegramBot: |
|
""" |
|
Telegram bot integration for the AI second brain. |
|
Handles message ingestion, responses, and synchronization with the main app. |
|
""" |
|
|
|
def __init__(self, agent, token=None, allowed_user_ids=None): |
|
""" |
|
Initialize the Telegram bot. |
|
|
|
Args: |
|
agent: The AssistantAgent instance to use for processing queries |
|
token: Telegram bot token (defaults to environment variable) |
|
allowed_user_ids: List of Telegram user IDs that can use the bot (None for all) |
|
""" |
|
self.agent = agent |
|
self.token = token or os.getenv("TELEGRAM_BOT_TOKEN") |
|
self.allowed_user_ids = allowed_user_ids or [] |
|
if isinstance(self.allowed_user_ids, str): |
|
|
|
self.allowed_user_ids = [int(uid.strip()) for uid in self.allowed_user_ids.split(',') if uid.strip()] |
|
self.message_history = [] |
|
|
|
|
|
self.application = None |
|
self.bot_thread = None |
|
|
|
logger.info("Telegram bot initialized") |
|
|
|
async def start_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE): |
|
"""Handle the /start command.""" |
|
user_name = update.message.from_user.first_name |
|
await update.message.reply_text( |
|
f"Hello {user_name}! I'm your AI Second Brain assistant. Ask me anything or use /help to see available commands." |
|
) |
|
|
|
async def help_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE): |
|
"""Handle the /help command.""" |
|
help_text = """ |
|
*AI Second Brain Commands* |
|
- Just send me a message with your question |
|
- /search query - Search your knowledge base |
|
- /help - Show this help message |
|
""" |
|
await update.message.reply_text(help_text, parse_mode='Markdown') |
|
|
|
async def search_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE): |
|
"""Handle the /search command.""" |
|
|
|
if self.allowed_user_ids and update.message.from_user.id not in self.allowed_user_ids: |
|
await update.message.reply_text("You're not authorized to use this bot.") |
|
return |
|
|
|
query = ' '.join(context.args) |
|
if not query: |
|
await update.message.reply_text("Please provide a search query: /search your query here") |
|
return |
|
|
|
|
|
await context.bot.send_chat_action(chat_id=update.effective_chat.id, action="typing") |
|
|
|
|
|
try: |
|
response = await self.process_query(query) |
|
|
|
|
|
await update.message.reply_text(response["answer"]) |
|
|
|
|
|
if response["sources"]: |
|
sources_text = "*Sources:*\n" + "\n".join([ |
|
f"- {s['file_name']} ({s['source']})" |
|
for s in response["sources"] |
|
]) |
|
await update.message.reply_text(sources_text, parse_mode='Markdown') |
|
except Exception as e: |
|
logger.error(f"Error processing search: {e}") |
|
await update.message.reply_text(f"Error processing your search: {str(e)}") |
|
|
|
async def handle_message(self, update: Update, context: ContextTypes.DEFAULT_TYPE): |
|
"""Handle normal messages.""" |
|
|
|
if self.allowed_user_ids and update.message.from_user.id not in self.allowed_user_ids: |
|
await update.message.reply_text("You're not authorized to use this bot.") |
|
return |
|
|
|
|
|
query = update.message.text |
|
|
|
|
|
await context.bot.send_chat_action(chat_id=update.effective_chat.id, action="typing") |
|
|
|
|
|
try: |
|
|
|
response = await self.process_query(query) |
|
|
|
|
|
self.message_history.append({ |
|
"user": update.message.from_user.username or str(update.message.from_user.id), |
|
"user_id": update.message.from_user.id, |
|
"query": query, |
|
"response": response["answer"], |
|
"timestamp": datetime.now().isoformat(), |
|
"chat_id": update.effective_chat.id |
|
}) |
|
|
|
|
|
await update.message.reply_text(response["answer"]) |
|
|
|
|
|
if response["sources"]: |
|
sources_text = "*Sources:*\n" + "\n".join([ |
|
f"- {s['file_name']} ({s['source']})" |
|
for s in response["sources"] |
|
]) |
|
await update.message.reply_text(sources_text, parse_mode='Markdown') |
|
except Exception as e: |
|
logger.error(f"Error processing message: {e}") |
|
await update.message.reply_text(f"I encountered an error: {str(e)}") |
|
|
|
async def error_handler(self, update, context): |
|
"""Handle errors.""" |
|
logger.error(f"Error: {context.error} - caused by update {update}") |
|
|
|
|
|
if update and update.effective_chat: |
|
await context.bot.send_message( |
|
chat_id=update.effective_chat.id, |
|
text="Sorry, an error occurred while processing your message." |
|
) |
|
|
|
async def process_query(self, query): |
|
"""Process a query using the agent and return a response.""" |
|
|
|
loop = asyncio.get_event_loop() |
|
|
|
|
|
def run_query(): |
|
return self.agent.query(query) |
|
|
|
|
|
response = await loop.run_in_executor(None, run_query) |
|
|
|
|
|
if "answer" in response: |
|
def add_to_memory(): |
|
self.agent.add_conversation_to_memory(query, response["answer"]) |
|
|
|
await loop.run_in_executor(None, add_to_memory) |
|
|
|
return response |
|
|
|
def setup_application(self): |
|
"""Set up the Telegram application with handlers.""" |
|
|
|
self.application = Application.builder().token(self.token).build() |
|
|
|
|
|
self.application.add_handler(CommandHandler("start", self.start_command)) |
|
self.application.add_handler(CommandHandler("help", self.help_command)) |
|
self.application.add_handler(CommandHandler("search", self.search_command)) |
|
|
|
|
|
self.application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, self.handle_message)) |
|
|
|
|
|
self.application.add_error_handler(self.error_handler) |
|
|
|
logger.info("Telegram application set up successfully") |
|
|
|
def start(self): |
|
"""Start the Telegram bot in a separate thread.""" |
|
if not self.token: |
|
logger.error("Telegram bot token not found") |
|
return False |
|
|
|
try: |
|
|
|
self.setup_application() |
|
|
|
|
|
def run_bot(): |
|
asyncio.set_event_loop(asyncio.new_event_loop()) |
|
self.application.run_polling(stop_signals=None) |
|
|
|
self.bot_thread = threading.Thread(target=run_bot, daemon=True) |
|
self.bot_thread.start() |
|
|
|
logger.info("Telegram bot started in background thread") |
|
return True |
|
except Exception as e: |
|
logger.error(f"Error starting Telegram bot: {e}") |
|
return False |
|
|
|
def stop(self): |
|
"""Stop the Telegram bot.""" |
|
if self.application: |
|
logger.info("Stopping Telegram bot...") |
|
|
|
async def stop_app(): |
|
await self.application.stop() |
|
await self.application.shutdown() |
|
|
|
|
|
loop = asyncio.new_event_loop() |
|
asyncio.set_event_loop(loop) |
|
try: |
|
loop.run_until_complete(stop_app()) |
|
finally: |
|
loop.close() |
|
|
|
logger.info("Telegram bot stopped") |
|
return True |
|
return False |
|
|
|
def get_message_history(self): |
|
"""Get the message history.""" |
|
return self.message_history |