import discord import asyncio import logging import os from typing import Dict, List, Any import threading from discord.ext import commands # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class DiscordBot: """ Discord bot integration for the AI second brain. Handles message ingestion, responses, and synchronization with the main app. """ def __init__(self, agent, token=None, channel_whitelist=None): """ Initialize the Discord bot. Args: agent: The AssistantAgent instance to use for processing queries token: Discord bot token (defaults to environment variable) channel_whitelist: List of channel IDs to listen to (None for all) """ self.agent = agent self.token = token or os.getenv("DISCORD_BOT_TOKEN") self.channel_whitelist = channel_whitelist or [] self.message_history = [] # Set up Discord client with intents intents = discord.Intents.default() intents.message_content = True # Required to read message content self.client = commands.Bot(command_prefix="!", intents=intents) # Register event handlers self.setup_event_handlers() # Thread for bot self.bot_thread = None logger.info("Discord bot initialized") def setup_event_handlers(self): """Register event handlers for the Discord client.""" @self.client.event async def on_ready(): logger.info(f"Discord bot logged in as {self.client.user}") @self.client.event async def on_message(message): # Don't respond to self if message.author == self.client.user: return # Check if this is a command await self.client.process_commands(message) # Only process messages in whitelisted channels if whitelist exists if self.channel_whitelist and message.channel.id not in self.channel_whitelist: return # Only respond to messages that mention the bot or are DMs is_dm = isinstance(message.channel, discord.DMChannel) is_mentioned = self.client.user in message.mentions if is_dm or is_mentioned: await self.process_message(message) # Add a !help command @self.client.command(name="help") async def help_command(ctx): help_text = """ **AI Assistant Commands** - Mention me with a question to get an answer - Send me a DM with your query - Use `!search ` to search your knowledge base - Use `!upload` with an attachment to add to your knowledge base """ await ctx.send(help_text) # Add a search command @self.client.command(name="search") async def search_command(ctx, *, query): async with ctx.typing(): response = await self.process_query(query) await ctx.send(response["answer"]) # If there are sources, show them in a followup message if response["sources"]: sources_text = "**Sources:**\n" + "\n".join([ f"- {s['file_name']} ({s['source']})" for s in response["sources"] ]) await ctx.send(sources_text) async def process_message(self, message): """Process a Discord message and send a response.""" # Clean up mention and extract query content = message.content for mention in message.mentions: content = content.replace(f'<@{mention.id}>', '').replace(f'<@!{mention.id}>', '') query = content.strip() if not query: await message.channel.send("How can I help you?") return # Show typing indicator async with message.channel.typing(): # Process the query and get a response response = await self.process_query(query) # Store in message history self.message_history.append({ "user": str(message.author), "query": query, "response": response["answer"], "timestamp": message.created_at.isoformat(), "channel": str(message.channel) }) # Send the response await message.channel.send(response["answer"]) # If there are sources, send them in a followup message if response["sources"]: sources_text = "**Sources:**\n" + "\n".join([ f"- {s['file_name']} ({s['source']})" for s in response["sources"] ]) await message.channel.send(sources_text) async def process_query(self, query): """Process a query using the agent and return a response.""" # Run the query in a thread to avoid blocking the event loop loop = asyncio.get_event_loop() response = await loop.run_in_executor(None, self.agent.query, query) # Add the conversation to the agent's memory if "answer" in response: await loop.run_in_executor( None, self.agent.add_conversation_to_memory, query, response["answer"] ) return response def start(self): """Start the Discord bot in a separate thread.""" if not self.token: logger.error("Discord bot token not found") return False def run_bot(): asyncio.set_event_loop(asyncio.new_event_loop()) self.client.run(self.token) self.bot_thread = threading.Thread(target=run_bot, daemon=True) self.bot_thread.start() logger.info("Discord bot started in background thread") return True def stop(self): """Stop the Discord bot.""" if self.client and self.client.is_ready(): asyncio.run_coroutine_threadsafe(self.client.close(), self.client.loop) logger.info("Discord bot stopped") def get_message_history(self): """Get the message history.""" return self.message_history