File size: 7,322 Bytes
e6a90e9 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 |
from database_interaction.config import create_or_update_config, load_config_to_env, init_config_db
from database_interaction.user import get_user_by_id, create_or_update_user, init_user_db
from langchain_community.chat_message_histories import SQLChatMessageHistory
from langchain_core.messages import SystemMessage, AnyMessage, AIMessage
from langchain_core.messages.utils import count_tokens_approximately
from sqlalchemy.ext.asyncio import create_async_engine
from langgraph_supervisor import create_supervisor
from langmem.short_term import SummarizationNode
from typing_extensions import TypedDict
import os
class State(TypedDict):
message: AnyMessage
user_id: str
first_name: str
last_name: str
assistant_name: str
latitude: str
longitude: str
location: str
openweathermap_api_key: str
github_token: str
tavily_api_key: str
groq_api_key: str
clear_history: bool
messages: list
class Assistant:
def __init__(self, state: State):
self.state = state
self.engine = create_async_engine("sqlite+aiosqlite:///./database_files/main.db", echo=False)
self.message_history = SQLChatMessageHistory(
session_id=state['user_id'],
connection=self.engine,
async_mode=True
)
async def authorization(self):
"""Handle user authorization and configuration setup"""
try:
await init_user_db()
await init_config_db()
await create_or_update_user(
user_id=self.state['user_id'],
first_name=self.state.get('first_name'),
last_name=self.state.get('last_name'),
latitude=float(self.state['latitude']) if self.state.get('latitude') else None,
longitude=float(self.state['longitude']) if self.state.get('longitude') else None,
)
config_data = {}
config_fields = [
'assistant_name', 'openweathermap_api_key', 'github_token',
'tavily_api_key', 'groq_api_key'
]
for field in config_fields:
if self.state.get(field):
config_data[field] = self.state[field]
if config_data:
await create_or_update_config(user_id=self.state['user_id'], **config_data)
await load_config_to_env(user_id=self.state['user_id'])
if 'clear_history' in self.state and self.state['clear_history']:
await self.message_history.aclear()
except Exception as e:
print(f"Authorization/setup error: {e}")
def compile_multi_agent_system(self):
"""Create and return the multi-agent system"""
try:
from agent.deep_research_agent import deep_research_agent
from agent.coder_agent import coder_agent
from agent.prompts import supervisor_instructions
from agent.models import llm_supervisor, llm_peripheral
from agent.tools import supervisor_tools
summarization_node = SummarizationNode(
token_counter=count_tokens_approximately,
model=llm_peripheral,
max_tokens=4000,
max_summary_tokens=1000,
output_messages_key="llm_input_messages",
)
agents = [coder_agent, deep_research_agent]
supervisor = create_supervisor(
model=llm_supervisor,
tools=supervisor_tools,
agents=agents,
prompt=supervisor_instructions(supervisor_tools, agents),
add_handoff_back_messages=False,
add_handoff_messages=False,
output_mode="full_history",
pre_model_hook=summarization_node
)
return supervisor.compile()
except Exception as e:
print(f"Error creating multi-agent system: {e}")
# Return a simple fallback system with proper async interface
from langchain_core.messages import HumanMessage
from langgraph.graph import StateGraph
from typing import Dict, Any
def fallback_node(state: Dict[str, Any]):
return {"messages": state.get("messages", []) + [
HumanMessage(content=f"System error: {str(e)}. Please check configuration and try again.")]}
fallback_graph = StateGraph(dict)
fallback_graph.add_node("fallback", fallback_node)
fallback_graph.set_entry_point("fallback")
fallback_graph.set_finish_point("fallback")
return fallback_graph.compile()
async def run(self):
"""Process messages through the multi-agent system"""
try:
user_info = await get_user_by_id(user_id=self.state['user_id'])
if user_info.get('location'):
os.environ['LOCATION'] = user_info['location']
if user_info.get('latitude'):
os.environ['LATITUDE'] = str(user_info['latitude'])
if user_info.get('longitude'):
os.environ['LONGITUDE'] = str(user_info['longitude'])
system_msg = SystemMessage(
content=f"""
You are an intelligent assistant named {os.getenv('ASSISTANT_NAME', 'Assistant')}, helpful personal assistant built using a multi-agent system architecture. Your tools include web search, weather and time lookups, code execution, and GitHub integration. You work inside a Telegram interface and respond concisely, clearly, and informatively.
The user you are assisting is:
- **Name**: {user_info.get('first_name', 'Unknown') or 'Unknown'} {user_info.get('last_name', '') or ''}
- **User ID**: {self.state['user_id']}
- **Location**: {user_info.get('location', 'Unknown') or 'Unknown'}
- **Coordinates**: ({user_info.get('latitude', 'N/A') or 'N/A'}, {user_info.get('longitude', 'N/A') or 'N/A'})
You may use their location when answering weather or time-related queries. If the location is unknown, you may ask the user to share it.
Stay helpful, respectful, and relevant to the user's query.
""".strip()
)
await self.message_history.aadd_message(self.state['message'])
messages = await self.message_history.aget_messages()
self.state['messages'] = messages[-8:-1] + [system_msg, messages[-1]]
multi_agent_system = self.compile_multi_agent_system()
result = await multi_agent_system.ainvoke({"messages": self.state["messages"]},
generation_config=dict(response_modalities=["TEXT"]))
await self.message_history.aadd_message(result['messages'][-1])
return {"messages": result.get("messages", [])}
except Exception as e:
print(f"Multi-agent node error: {e}")
from langchain_core.messages import HumanMessage
return {"messages": [AIMessage(content=f"I encountered an error: {str(e)}. Please try again.")]}
|