|
import discord |
|
import asyncio |
|
import logging |
|
import os |
|
from typing import Dict, List, Any |
|
import threading |
|
from discord.ext import commands |
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
logger = logging.getLogger(__name__) |
|
|
|
class DiscordBot: |
|
""" |
|
Discord bot integration for the AI second brain. |
|
Handles message ingestion, responses, and synchronization with the main app. |
|
""" |
|
|
|
def __init__(self, agent, token=None, channel_whitelist=None): |
|
""" |
|
Initialize the Discord bot. |
|
|
|
Args: |
|
agent: The AssistantAgent instance to use for processing queries |
|
token: Discord bot token (defaults to environment variable) |
|
channel_whitelist: List of channel IDs to listen to (None for all) |
|
""" |
|
self.agent = agent |
|
self.token = token or os.getenv("DISCORD_BOT_TOKEN") |
|
self.channel_whitelist = channel_whitelist or [] |
|
self.message_history = [] |
|
|
|
|
|
intents = discord.Intents.default() |
|
intents.message_content = True |
|
self.client = commands.Bot(command_prefix="!", intents=intents) |
|
|
|
|
|
self.setup_event_handlers() |
|
|
|
|
|
self.bot_thread = None |
|
|
|
logger.info("Discord bot initialized") |
|
|
|
def setup_event_handlers(self): |
|
"""Register event handlers for the Discord client.""" |
|
|
|
@self.client.event |
|
async def on_ready(): |
|
logger.info(f"Discord bot logged in as {self.client.user}") |
|
|
|
@self.client.event |
|
async def on_message(message): |
|
|
|
if message.author == self.client.user: |
|
return |
|
|
|
|
|
await self.client.process_commands(message) |
|
|
|
|
|
if self.channel_whitelist and message.channel.id not in self.channel_whitelist: |
|
return |
|
|
|
|
|
is_dm = isinstance(message.channel, discord.DMChannel) |
|
is_mentioned = self.client.user in message.mentions |
|
|
|
if is_dm or is_mentioned: |
|
await self.process_message(message) |
|
|
|
|
|
@self.client.command(name="help") |
|
async def help_command(ctx): |
|
help_text = """ |
|
**AI Assistant Commands** |
|
- Mention me with a question to get an answer |
|
- Send me a DM with your query |
|
- Use `!search <query>` to search your knowledge base |
|
- Use `!upload` with an attachment to add to your knowledge base |
|
""" |
|
await ctx.send(help_text) |
|
|
|
|
|
@self.client.command(name="search") |
|
async def search_command(ctx, *, query): |
|
async with ctx.typing(): |
|
response = await self.process_query(query) |
|
await ctx.send(response["answer"]) |
|
|
|
|
|
if response["sources"]: |
|
sources_text = "**Sources:**\n" + "\n".join([ |
|
f"- {s['file_name']} ({s['source']})" |
|
for s in response["sources"] |
|
]) |
|
await ctx.send(sources_text) |
|
|
|
async def process_message(self, message): |
|
"""Process a Discord message and send a response.""" |
|
|
|
content = message.content |
|
for mention in message.mentions: |
|
content = content.replace(f'<@{mention.id}>', '').replace(f'<@!{mention.id}>', '') |
|
|
|
query = content.strip() |
|
if not query: |
|
await message.channel.send("How can I help you?") |
|
return |
|
|
|
|
|
async with message.channel.typing(): |
|
|
|
response = await self.process_query(query) |
|
|
|
|
|
self.message_history.append({ |
|
"user": str(message.author), |
|
"query": query, |
|
"response": response["answer"], |
|
"timestamp": message.created_at.isoformat(), |
|
"channel": str(message.channel) |
|
}) |
|
|
|
|
|
await message.channel.send(response["answer"]) |
|
|
|
|
|
if response["sources"]: |
|
sources_text = "**Sources:**\n" + "\n".join([ |
|
f"- {s['file_name']} ({s['source']})" |
|
for s in response["sources"] |
|
]) |
|
await message.channel.send(sources_text) |
|
|
|
async def process_query(self, query): |
|
"""Process a query using the agent and return a response.""" |
|
|
|
loop = asyncio.get_event_loop() |
|
response = await loop.run_in_executor(None, self.agent.query, query) |
|
|
|
|
|
if "answer" in response: |
|
await loop.run_in_executor( |
|
None, |
|
self.agent.add_conversation_to_memory, |
|
query, |
|
response["answer"] |
|
) |
|
|
|
return response |
|
|
|
def start(self): |
|
"""Start the Discord bot in a separate thread.""" |
|
if not self.token: |
|
logger.error("Discord bot token not found") |
|
return False |
|
|
|
def run_bot(): |
|
asyncio.set_event_loop(asyncio.new_event_loop()) |
|
self.client.run(self.token) |
|
|
|
self.bot_thread = threading.Thread(target=run_bot, daemon=True) |
|
self.bot_thread.start() |
|
logger.info("Discord bot started in background thread") |
|
return True |
|
|
|
def stop(self): |
|
"""Stop the Discord bot.""" |
|
if self.client and self.client.is_ready(): |
|
asyncio.run_coroutine_threadsafe(self.client.close(), self.client.loop) |
|
logger.info("Discord bot stopped") |
|
|
|
def get_message_history(self): |
|
"""Get the message history.""" |
|
return self.message_history |