File size: 13,818 Bytes
c922f8b 60e4487 c922f8b 3f73bce |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 |
"""
Main agent implementation for the GAIA benchmark.
This module contains the GAIAAgent class which is responsible for:
- Processing questions from the GAIA benchmark
- Selecting and executing appropriate tools
- Formulating precise answers
- Handling errors and logging
- Addressing special cases like reversed text and word unscrambling
The agent uses LangGraph for workflow management and Supabase for memory,
with configuration from the config module. It can be extended with
additional capabilities as needed.
"""
import logging
import time
import traceback
import re
import hashlib
from typing import Dict, Any, Optional, List, Union, Tuple
from src.gaia.agent.config import (
get_logging_config,
get_model_config,
get_tool_config,
get_memory_config,
get_agent_config,
VERBOSE
)
# Import LangGraph workflow
from src.gaia.agent.graph import run_agent_graph
from src.gaia.memory import SupabaseMemory
from src.gaia.memory.supabase_memory import ConversationMemory, ResultCache, WorkingMemory
logging_config = get_logging_config()
logging.basicConfig(
level=logging_config["level"],
format=logging_config["format"],
filename=logging_config["filename"]
)
logger = logging.getLogger("gaia_agent")
class GAIAAgent:
"""
Agent for answering questions from the GAIA benchmark.
This agent processes questions, selects appropriate tools,
executes a reasoning process, and formulates precise answers.
It includes improved handling for special cases like reversed text,
direct text manipulation, and word unscrambling.
"""
def __init__(self, config: Optional[Any] = None):
"""
Initialize the GAIA agent.
Args:
config: Optional configuration (dictionary, string, Config object, etc.)
"""
# Create default config skeleton
default_config = {
"model": get_model_config(),
"tools": get_tool_config(),
"memory": get_memory_config(),
"agent": get_agent_config()
}
# Store the original config object for tests
self._original_config = config
# Initialize memory attributes early
self.supabase_memory = None
self.conversation_memory = None
self.result_cache = None
self.working_memory = None
# Initialize with default config if none provided
if config is None:
self.config = default_config
# Handle string config (commonly passed from tests)
elif isinstance(config, str):
self.config = default_config
# Handle dict config
elif isinstance(config, dict):
self.config = config
# Ensure required sections exist
if "model" not in self.config:
self.config["model"] = get_model_config()
if "tools" not in self.config:
self.config["tools"] = get_tool_config()
if "memory" not in self.config:
self.config["memory"] = get_memory_config()
if "agent" not in self.config:
self.config["agent"] = get_agent_config()
# Handle Config object or any other object
else:
# Use a default config that can be modified by test methods
self.config = default_config
# Ensure memory config exists
if "memory" not in self.config:
self.config["memory"] = get_memory_config()
self.memory_config = self.config["memory"]
# Add configuration support methods for tests
# These methods allow our class to work with tests that expect Config-like behavior
self._config_cache = {}
# Initialize memory if enabled
if self.memory_config.get("enabled", False):
self._initialize_memory()
# Set up logging
self.verbose = VERBOSE
logger.info("GAIA Agent initialized")
# Methods to support tests that use Config objects
def get(self, key, default=None):
"""Get configuration value (for compatibility with tests)."""
if hasattr(self._original_config, 'get'):
# If original config was a Config-like object, use its get method
return self._original_config.get(key, default)
elif isinstance(self.config, dict) and key in self.config:
return self.config[key]
else:
# Return from cache if available
return self._config_cache.get(key, default)
def set(self, key, value):
"""Set configuration value (for compatibility with tests)."""
if hasattr(self._original_config, 'set'):
# If original config was a Config-like object, use its set method
self._original_config.set(key, value)
elif isinstance(self.config, dict):
# For dict configs, set directly
self.config[key] = value
# Always cache the value for easy access
self._config_cache[key] = value
def _initialize_memory(self):
"""Initialize memory systems based on configuration."""
try:
# Create a default memory implementation
self.supabase_memory = SupabaseMemory({})
logger.info("Default memory initialized")
# Initialize specialized memory interfaces with default settings
self.conversation_memory = ConversationMemory(self.supabase_memory, "conversation")
self.result_cache = ResultCache(self.supabase_memory)
self.working_memory = WorkingMemory(self.supabase_memory, "working")
logger.info("All memory systems initialized with defaults")
except Exception as e:
logger.error(f"Failed to initialize memory: {str(e)}")
logger.debug(traceback.format_exc())
# Create empty placeholder memory to prevent failures
self.supabase_memory = None
self.conversation_memory = None
self.result_cache = None
self.working_memory = None
def process_question(self, question: str) -> str:
"""
Process a question and generate an answer.
Args:
question (str): The question to process
Returns:
str: The answer to the question
"""
start_time = time.time()
try:
logger.info(f"Processing question: {question[:100]}...")
answer = self._process_question(question)
end_time = time.time()
processing_time = end_time - start_time
logger.info(f"Question processed in {processing_time:.2f} seconds")
return answer
except Exception as e:
logger.error(f"Error processing question: {str(e)}")
logger.debug(traceback.format_exc())
return f"Error processing your question: {str(e)}"
def query(self, question: str) -> Dict[str, Any]:
"""
Query the agent with a question to get an answer.
This is the main API method used by test harnesses and applications.
It returns a dictionary with the answer, reasoning, and other metadata.
Args:
question (str): The question to answer
Returns:
Dict[str, Any]: Dictionary containing the answer and metadata
"""
try:
start_time = time.time()
answer = self.process_question(question)
end_time = time.time()
processing_time = end_time - start_time
# Get metadata from working memory if available
reasoning = ""
tools_used = []
if self.working_memory:
plan = self.working_memory.get_intermediate_result("plan")
if plan:
reasoning = str(plan)
tool_results = self.working_memory.get_intermediate_result("tool_results")
if tool_results:
tools_used = [r.get("tool_name") for r in tool_results if r.get("tool_name")]
return {
"answer": answer,
"reasoning": reasoning,
"time_taken": processing_time,
"tools_used": tools_used,
"success": True
}
except Exception as e:
logger.error(f"Error in query: {str(e)}")
logger.debug(traceback.format_exc())
return {
"answer": f"Error: {str(e)}",
"reasoning": f"An error occurred: {str(e)}",
"time_taken": 0,
"tools_used": [],
"success": False,
"error": str(e)
}
def _process_question(self, question: str) -> str:
"""
Internal method to process a question using the LangGraph workflow.
Args:
question (str): The question to process
Returns:
str: The answer to the question
"""
try:
# Check if the question or answer has been cached
cache_key = hashlib.md5(question.encode()).hexdigest()
if self.result_cache:
cached_answer = self.result_cache.get_result(cache_key)
if cached_answer:
logger.info("Retrieved answer from cache")
return cached_answer
answer = ""
# Process the question using LangGraph
if run_agent_graph:
try:
# Run the LangGraph workflow
logger.info("Running LangGraph workflow")
result = run_agent_graph(
{"question": question},
self.config
)
if result and isinstance(result, dict):
answer = result.get("answer", "")
logger.info(f"Got answer from run_agent_graph: {answer[:100]}...")
# Store intermediate results in working memory if available
if self.working_memory:
if result.get("plan"):
self.working_memory.store_intermediate_result(
"plan", result["plan"]
)
if result.get("tool_results"):
self.working_memory.store_intermediate_result(
"tool_results", result["tool_results"]
)
except Exception as e:
logger.error(f"Error running LangGraph workflow: {str(e)}")
answer = f"I encountered an error while processing your question. Please try rephrasing or asking a different question."
# If we still don't have an answer, provide a generic response
if not answer:
answer = "I don't have enough information to answer this question accurately."
# Cache the result and update conversation memory
if self.result_cache:
self.result_cache.cache_result(cache_key, answer)
if self.conversation_memory:
self.conversation_memory.add_message("user", question)
self.conversation_memory.add_message("assistant", answer)
return answer
except Exception as e:
logger.error(f"Error running LangGraph workflow: {str(e)}")
raise
def get_memory_snapshot(self) -> Dict[str, Any]:
"""
Get a snapshot of the agent's memory.
Returns:
Dict containing memory contents
"""
snapshot = {}
if hasattr(self, 'conversation_memory') and self.conversation_memory:
try:
snapshot["conversation"] = self.conversation_memory.get_messages()
except Exception as e:
logger.warning(f"Error getting conversation memory: {str(e)}")
snapshot["conversation"] = []
if hasattr(self, 'working_memory') and self.working_memory:
try:
snapshot["working"] = self.working_memory.get_all_results()
except Exception as e:
logger.warning(f"Error getting working memory: {str(e)}")
snapshot["working"] = {}
return snapshot
def clear_memory(self):
"""Clear all agent memory."""
if hasattr(self, 'conversation_memory') and self.conversation_memory:
self.conversation_memory.clear()
if hasattr(self, 'working_memory') and self.working_memory:
self.working_memory.clear()
if hasattr(self, 'result_cache') and self.result_cache:
self.result_cache.clear()
logger.info("Agent memory cleared")
def run(self, question: str) -> str:
"""
Run the agent on a question and return the answer.
This method is required by the app.py interface.
Args:
question (str): The question to process
Returns:
str: The answer to the question
"""
return self.process_question(question) |