""" Main agent implementation for the GAIA benchmark. This module contains the GAIAAgent class which is responsible for: - Processing questions from the GAIA benchmark - Selecting and executing appropriate tools - Formulating precise answers - Handling errors and logging - Addressing special cases like reversed text and word unscrambling The agent uses LangGraph for workflow management and Supabase for memory, with configuration from the config module. It can be extended with additional capabilities as needed. """ import logging import time import traceback import re import hashlib from typing import Dict, Any, Optional, List, Union, Tuple from src.gaia.agent.config import ( get_logging_config, get_model_config, get_tool_config, get_memory_config, get_agent_config, VERBOSE ) # Import LangGraph workflow from src.gaia.agent.graph import run_agent_graph from src.gaia.memory import SupabaseMemory from src.gaia.memory.supabase_memory import ConversationMemory, ResultCache, WorkingMemory logging_config = get_logging_config() logging.basicConfig( level=logging_config["level"], format=logging_config["format"], filename=logging_config["filename"] ) logger = logging.getLogger("gaia_agent") class GAIAAgent: """ Agent for answering questions from the GAIA benchmark. This agent processes questions, selects appropriate tools, executes a reasoning process, and formulates precise answers. It includes improved handling for special cases like reversed text, direct text manipulation, and word unscrambling. """ def __init__(self, config: Optional[Any] = None): """ Initialize the GAIA agent. Args: config: Optional configuration (dictionary, string, Config object, etc.) """ # Create default config skeleton default_config = { "model": get_model_config(), "tools": get_tool_config(), "memory": get_memory_config(), "agent": get_agent_config() } # Store the original config object for tests self._original_config = config # Initialize memory attributes early self.supabase_memory = None self.conversation_memory = None self.result_cache = None self.working_memory = None # Initialize with default config if none provided if config is None: self.config = default_config # Handle string config (commonly passed from tests) elif isinstance(config, str): self.config = default_config # Handle dict config elif isinstance(config, dict): self.config = config # Ensure required sections exist if "model" not in self.config: self.config["model"] = get_model_config() if "tools" not in self.config: self.config["tools"] = get_tool_config() if "memory" not in self.config: self.config["memory"] = get_memory_config() if "agent" not in self.config: self.config["agent"] = get_agent_config() # Handle Config object or any other object else: # Use a default config that can be modified by test methods self.config = default_config # Ensure memory config exists if "memory" not in self.config: self.config["memory"] = get_memory_config() self.memory_config = self.config["memory"] # Add configuration support methods for tests # These methods allow our class to work with tests that expect Config-like behavior self._config_cache = {} # Initialize memory if enabled if self.memory_config.get("enabled", False): self._initialize_memory() # Set up logging self.verbose = VERBOSE logger.info("GAIA Agent initialized") # Methods to support tests that use Config objects def get(self, key, default=None): """Get configuration value (for compatibility with tests).""" if hasattr(self._original_config, 'get'): # If original config was a Config-like object, use its get method return self._original_config.get(key, default) elif isinstance(self.config, dict) and key in self.config: return self.config[key] else: # Return from cache if available return self._config_cache.get(key, default) def set(self, key, value): """Set configuration value (for compatibility with tests).""" if hasattr(self._original_config, 'set'): # If original config was a Config-like object, use its set method self._original_config.set(key, value) elif isinstance(self.config, dict): # For dict configs, set directly self.config[key] = value # Always cache the value for easy access self._config_cache[key] = value def _initialize_memory(self): """Initialize memory systems based on configuration.""" try: # Create a default memory implementation self.supabase_memory = SupabaseMemory({}) logger.info("Default memory initialized") # Initialize specialized memory interfaces with default settings self.conversation_memory = ConversationMemory(self.supabase_memory, "conversation") self.result_cache = ResultCache(self.supabase_memory) self.working_memory = WorkingMemory(self.supabase_memory, "working") logger.info("All memory systems initialized with defaults") except Exception as e: logger.error(f"Failed to initialize memory: {str(e)}") logger.debug(traceback.format_exc()) # Create empty placeholder memory to prevent failures self.supabase_memory = None self.conversation_memory = None self.result_cache = None self.working_memory = None def process_question(self, question: str) -> str: """ Process a question and generate an answer. Args: question (str): The question to process Returns: str: The answer to the question """ start_time = time.time() try: logger.info(f"Processing question: {question[:100]}...") answer = self._process_question(question) end_time = time.time() processing_time = end_time - start_time logger.info(f"Question processed in {processing_time:.2f} seconds") return answer except Exception as e: logger.error(f"Error processing question: {str(e)}") logger.debug(traceback.format_exc()) return f"Error processing your question: {str(e)}" def query(self, question: str) -> Dict[str, Any]: """ Query the agent with a question to get an answer. This is the main API method used by test harnesses and applications. It returns a dictionary with the answer, reasoning, and other metadata. Args: question (str): The question to answer Returns: Dict[str, Any]: Dictionary containing the answer and metadata """ try: start_time = time.time() answer = self.process_question(question) end_time = time.time() processing_time = end_time - start_time # Get metadata from working memory if available reasoning = "" tools_used = [] if self.working_memory: plan = self.working_memory.get_intermediate_result("plan") if plan: reasoning = str(plan) tool_results = self.working_memory.get_intermediate_result("tool_results") if tool_results: tools_used = [r.get("tool_name") for r in tool_results if r.get("tool_name")] return { "answer": answer, "reasoning": reasoning, "time_taken": processing_time, "tools_used": tools_used, "success": True } except Exception as e: logger.error(f"Error in query: {str(e)}") logger.debug(traceback.format_exc()) return { "answer": f"Error: {str(e)}", "reasoning": f"An error occurred: {str(e)}", "time_taken": 0, "tools_used": [], "success": False, "error": str(e) } def _process_question(self, question: str) -> str: """ Internal method to process a question using the LangGraph workflow. Args: question (str): The question to process Returns: str: The answer to the question """ try: # Check if the question or answer has been cached cache_key = hashlib.md5(question.encode()).hexdigest() if self.result_cache: cached_answer = self.result_cache.get_result(cache_key) if cached_answer: logger.info("Retrieved answer from cache") return cached_answer answer = "" # Process the question using LangGraph if run_agent_graph: try: # Run the LangGraph workflow logger.info("Running LangGraph workflow") result = run_agent_graph( {"question": question}, self.config ) if result and isinstance(result, dict): answer = result.get("answer", "") logger.info(f"Got answer from run_agent_graph: {answer[:100]}...") # Store intermediate results in working memory if available if self.working_memory: if result.get("plan"): self.working_memory.store_intermediate_result( "plan", result["plan"] ) if result.get("tool_results"): self.working_memory.store_intermediate_result( "tool_results", result["tool_results"] ) except Exception as e: logger.error(f"Error running LangGraph workflow: {str(e)}") answer = f"I encountered an error while processing your question. Please try rephrasing or asking a different question." # If we still don't have an answer, provide a generic response if not answer: answer = "I don't have enough information to answer this question accurately." # Cache the result and update conversation memory if self.result_cache: self.result_cache.cache_result(cache_key, answer) if self.conversation_memory: self.conversation_memory.add_message("user", question) self.conversation_memory.add_message("assistant", answer) return answer except Exception as e: logger.error(f"Error running LangGraph workflow: {str(e)}") raise def get_memory_snapshot(self) -> Dict[str, Any]: """ Get a snapshot of the agent's memory. Returns: Dict containing memory contents """ snapshot = {} if hasattr(self, 'conversation_memory') and self.conversation_memory: try: snapshot["conversation"] = self.conversation_memory.get_messages() except Exception as e: logger.warning(f"Error getting conversation memory: {str(e)}") snapshot["conversation"] = [] if hasattr(self, 'working_memory') and self.working_memory: try: snapshot["working"] = self.working_memory.get_all_results() except Exception as e: logger.warning(f"Error getting working memory: {str(e)}") snapshot["working"] = {} return snapshot def clear_memory(self): """Clear all agent memory.""" if hasattr(self, 'conversation_memory') and self.conversation_memory: self.conversation_memory.clear() if hasattr(self, 'working_memory') and self.working_memory: self.working_memory.clear() if hasattr(self, 'result_cache') and self.result_cache: self.result_cache.clear() logger.info("Agent memory cleared") def run(self, question: str) -> str: """ Run the agent on a question and return the answer. This method is required by the app.py interface. Args: question (str): The question to process Returns: str: The answer to the question """ return self.process_question(question)