File size: 9,223 Bytes
2a735cc
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
import os
import logging
import threading
import asyncio
from typing import Dict, List, Any
import time
from datetime import datetime
from telegram import Update, Bot
from telegram.ext import Application, CommandHandler, MessageHandler, ContextTypes, filters

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class TelegramBot:
    """
    Telegram bot integration for the AI second brain.
    Handles message ingestion, responses, and synchronization with the main app.
    """
    
    def __init__(self, agent, token=None, allowed_user_ids=None):
        """
        Initialize the Telegram bot.
        
        Args:
            agent: The AssistantAgent instance to use for processing queries
            token: Telegram bot token (defaults to environment variable)
            allowed_user_ids: List of Telegram user IDs that can use the bot (None for all)
        """
        self.agent = agent
        self.token = token or os.getenv("TELEGRAM_BOT_TOKEN")
        self.allowed_user_ids = allowed_user_ids or []
        if isinstance(self.allowed_user_ids, str):
            # Convert comma-separated string to list of integers
            self.allowed_user_ids = [int(uid.strip()) for uid in self.allowed_user_ids.split(',') if uid.strip()]
        self.message_history = []
        
        # Initialize bot application
        self.application = None
        self.bot_thread = None
        
        logger.info("Telegram bot initialized")
    
    async def start_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
        """Handle the /start command."""
        user_name = update.message.from_user.first_name
        await update.message.reply_text(
            f"Hello {user_name}! I'm your AI Second Brain assistant. Ask me anything or use /help to see available commands."
        )
    
    async def help_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
        """Handle the /help command."""
        help_text = """
*AI Second Brain Commands*
- Just send me a message with your question
- /search query - Search your knowledge base
- /help - Show this help message
        """
        await update.message.reply_text(help_text, parse_mode='Markdown')
    
    async def search_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
        """Handle the /search command."""
        # Check if user is allowed
        if self.allowed_user_ids and update.message.from_user.id not in self.allowed_user_ids:
            await update.message.reply_text("You're not authorized to use this bot.")
            return
            
        query = ' '.join(context.args)
        if not query:
            await update.message.reply_text("Please provide a search query: /search your query here")
            return
            
        # Show typing status
        await context.bot.send_chat_action(chat_id=update.effective_chat.id, action="typing")
        
        # Process the query
        try:
            response = await self.process_query(query)
            
            # Send the response
            await update.message.reply_text(response["answer"])
            
            # If there are sources, send them in a followup message
            if response["sources"]:
                sources_text = "*Sources:*\n" + "\n".join([
                    f"- {s['file_name']} ({s['source']})"
                    for s in response["sources"]
                ])
                await update.message.reply_text(sources_text, parse_mode='Markdown')
        except Exception as e:
            logger.error(f"Error processing search: {e}")
            await update.message.reply_text(f"Error processing your search: {str(e)}")
    
    async def handle_message(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
        """Handle normal messages."""
        # Check if user is allowed
        if self.allowed_user_ids and update.message.from_user.id not in self.allowed_user_ids:
            await update.message.reply_text("You're not authorized to use this bot.")
            return
            
        # Get the message text
        query = update.message.text
        
        # Show typing status
        await context.bot.send_chat_action(chat_id=update.effective_chat.id, action="typing")
        
        # Process the query
        try:
            # Process the message
            response = await self.process_query(query)
            
            # Store in message history
            self.message_history.append({
                "user": update.message.from_user.username or str(update.message.from_user.id),
                "user_id": update.message.from_user.id,
                "query": query,
                "response": response["answer"],
                "timestamp": datetime.now().isoformat(),
                "chat_id": update.effective_chat.id
            })
            
            # Send the response
            await update.message.reply_text(response["answer"])
            
            # If there are sources, send them in a followup message
            if response["sources"]:
                sources_text = "*Sources:*\n" + "\n".join([
                    f"- {s['file_name']} ({s['source']})"
                    for s in response["sources"]
                ])
                await update.message.reply_text(sources_text, parse_mode='Markdown')
        except Exception as e:
            logger.error(f"Error processing message: {e}")
            await update.message.reply_text(f"I encountered an error: {str(e)}")
    
    async def error_handler(self, update, context):
        """Handle errors."""
        logger.error(f"Error: {context.error} - caused by update {update}")
        
        # Send a message to the user
        if update and update.effective_chat:
            await context.bot.send_message(
                chat_id=update.effective_chat.id,
                text="Sorry, an error occurred while processing your message."
            )
    
    async def process_query(self, query):
        """Process a query using the agent and return a response."""
        # Create a new event loop for the thread
        loop = asyncio.get_event_loop()
        
        # Run the query in a separate thread to avoid blocking
        def run_query():
            return self.agent.query(query)
        
        # Execute the query
        response = await loop.run_in_executor(None, run_query)
        
        # Add the conversation to the agent's memory
        if "answer" in response:
            def add_to_memory():
                self.agent.add_conversation_to_memory(query, response["answer"])
            
            await loop.run_in_executor(None, add_to_memory)
            
        return response
    
    def setup_application(self):
        """Set up the Telegram application with handlers."""
        # Create the Application
        self.application = Application.builder().token(self.token).build()
        
        # Add command handlers
        self.application.add_handler(CommandHandler("start", self.start_command))
        self.application.add_handler(CommandHandler("help", self.help_command))
        self.application.add_handler(CommandHandler("search", self.search_command))
        
        # Add message handler
        self.application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, self.handle_message))
        
        # Add error handler
        self.application.add_error_handler(self.error_handler)
        
        logger.info("Telegram application set up successfully")
    
    def start(self):
        """Start the Telegram bot in a separate thread."""
        if not self.token:
            logger.error("Telegram bot token not found")
            return False
            
        try:
            # Set up the application
            self.setup_application()
            
            # Run the bot in a separate thread
            def run_bot():
                asyncio.set_event_loop(asyncio.new_event_loop())
                self.application.run_polling(stop_signals=None)
            
            self.bot_thread = threading.Thread(target=run_bot, daemon=True)
            self.bot_thread.start()
            
            logger.info("Telegram bot started in background thread")
            return True
        except Exception as e:
            logger.error(f"Error starting Telegram bot: {e}")
            return False
    
    def stop(self):
        """Stop the Telegram bot."""
        if self.application:
            logger.info("Stopping Telegram bot...")
            
            async def stop_app():
                await self.application.stop()
                await self.application.shutdown()
            
            # Run the stop function in a new event loop
            loop = asyncio.new_event_loop()
            asyncio.set_event_loop(loop)
            try:
                loop.run_until_complete(stop_app())
            finally:
                loop.close()
            
            logger.info("Telegram bot stopped")
            return True
        return False
    
    def get_message_history(self):
        """Get the message history."""
        return self.message_history