""" Reasoning System for GAIA-Ready AI Agent This module provides advanced reasoning capabilities for the AI agent, implementing the ReAct approach (Reasoning + Acting) and supporting the Think-Act-Observe workflow. """ import os import json from typing import List, Dict, Any, Optional, Union, Tuple from datetime import datetime import traceback import re try: from smolagents import Agent, InferenceClientModel, Tool except ImportError: import subprocess subprocess.check_call(["pip", "install", "smolagents"]) from smolagents import Agent, InferenceClientModel, Tool class ReasoningSystem: """ Advanced reasoning system implementing the ReAct approach and supporting the Think-Act-Observe workflow. """ def __init__(self, agent, memory_manager): self.agent = agent self.memory_manager = memory_manager self.max_reasoning_depth = 5 self.reasoning_templates = self._load_reasoning_templates() def _load_reasoning_templates(self) -> Dict[str, str]: """Load reasoning templates for different stages of the workflow""" return { "think": """ # Task Analysis and Planning ## Task {query} ## Relevant Context {context} ## Analysis Let me analyze this task step by step: 1. What is being asked? 2. What information do I need? 3. What challenges might I encounter? ## Plan Based on my analysis, here's my plan: 1. [First step] 2. [Second step] 3. [Third step] ... ## Tools Needed To accomplish this task, I'll need: - [Tool 1]: For [purpose] - [Tool 2]: For [purpose] ... ## Expected Outcome If successful, I expect to: [Description of expected outcome] """, "act": """ # Action Execution ## Current Task {query} ## Current Plan {plan} ## Previous Results {previous_results} ## Next Action Based on my plan and previous results, I'll now: 1. Use the [tool name] tool 2. With parameters: [parameters] 3. Purpose: [why this action is needed] ## Execution [Detailed description of how I'll execute this action] """, "observe": """ # Result Analysis ## Current Task {query} ## Action Taken {action} ## Results Obtained {results} ## Analysis Let me analyze these results: 1. What did I learn? 2. Does this answer the original question? 3. Are there any inconsistencies or gaps? ## Next Steps Based on my analysis: - [Next step recommendation] - [Alternative approach if needed] ## Progress Assessment Task completion status: [percentage]% [Explanation of current progress] """ } def think(self, query: str) -> Dict[str, Any]: """ Analyze the task and plan an approach (Think phase) Args: query: The user's query or task Returns: Dictionary containing analysis and plan """ # Retrieve relevant memories relevant_memories = self.memory_manager.get_relevant_memories(query) # Format context from relevant memories context = "" if relevant_memories: context_items = [] for memory in relevant_memories: memory_type = memory.get("type", "general") content = memory.get("content", "") relevance = memory.get("relevance_score", 0) context_items.append(f"- [{memory_type.upper()}] (Relevance: {relevance:.2f}): {content}") context = "\n".join(context_items) else: context = "No relevant prior knowledge found." # Apply the thinking template thinking_template = self.reasoning_templates["think"] thinking_prompt = thinking_template.format( query=query, context=context ) # Use the agent to generate a plan try: response = self.agent.chat(thinking_prompt) # Store the thinking in memory self.memory_manager.add_to_short_term({ "type": "thinking", "content": response, "timestamp": datetime.now().isoformat() }) # Parse the response to extract structured information analysis = self._extract_section(response, "Analysis") plan = self._extract_section(response, "Plan") tools_needed = self._extract_section(response, "Tools Needed") expected_outcome = self._extract_section(response, "Expected Outcome") return { "raw_response": response, "analysis": analysis, "plan": plan, "tools_needed": tools_needed, "expected_outcome": expected_outcome } except Exception as e: error_msg = f"Error during thinking phase: {str(e)}\n{traceback.format_exc()}" print(error_msg) # Store the error in memory self.memory_manager.add_to_short_term({ "type": "error", "content": error_msg, "timestamp": datetime.now().isoformat() }) # Return a basic plan despite the error return { "raw_response": "Error occurred during thinking phase.", "analysis": "Could not analyze the task due to an error.", "plan": "1. Try a simpler approach\n2. Break down the task into smaller steps", "tools_needed": "web_search: To find basic information", "expected_outcome": "Partial answer to the query" } def act(self, plan: Dict[str, Any], query: str, previous_results: str = "") -> Dict[str, Any]: """ Execute actions based on the plan (Act phase) Args: plan: The plan generated by the think step query: The original query previous_results: Results from previous actions Returns: Dictionary containing action details and results """ # Apply the action template action_template = self.reasoning_templates["act"] action_prompt = action_template.format( query=query, plan=plan.get("plan", "No plan available."), previous_results=previous_results if previous_results else "No previous results." ) try: # Use the agent to determine the next action action_response = self.agent.chat(action_prompt) # Store the action planning in memory self.memory_manager.add_to_short_term({ "type": "action_planning", "content": action_response, "timestamp": datetime.now().isoformat() }) # Parse the action response to extract tool and parameters tool_info = self._extract_tool_info(action_response) if not tool_info: # If no tool was identified, try a more direct approach direct_prompt = f""" Based on the task "{query}" and the plan: {plan.get('plan', 'No plan available.')} Which specific tool should I use next and with what parameters? Respond in this format: TOOL: [tool name] PARAMETERS: [parameter1=value1, parameter2=value2, ...] """ direct_response = self.agent.chat(direct_prompt) tool_info = self._extract_tool_info(direct_response) if tool_info: tool_name = tool_info["tool"] tool_params = tool_info["parameters"] # Find the matching tool matching_tool = None for tool in self.agent.tools: if tool.name == tool_name: matching_tool = tool break if matching_tool: # Execute the tool try: if isinstance(tool_params, dict): result = matching_tool.function(**tool_params) else: result = matching_tool.function(tool_params) # Store the successful action result in memory self.memory_manager.add_to_short_term({ "type": "action_result", "content": f"Tool: {tool_name}\nParameters: {tool_params}\nResult: {result}", "timestamp": datetime.now().isoformat() }) return { "tool": tool_name, "parameters": tool_params, "result": result, "success": True, "error": None } except Exception as e: error_msg = f"Error executing tool {tool_name}: {str(e)}\n{traceback.format_exc()}" print(error_msg) # Store the error in memory self.memory_manager.add_to_short_term({ "type": "error", "content": error_msg, "timestamp": datetime.now().isoformat() }) return { "tool": tool_name, "parameters": tool_params, "result": f"Error: {str(e)}", "success": False, "error": str(e) } else: error_msg = f"Tool '{tool_name}' not found." print(error_msg) # Store the error in memory self.memory_manager.add_to_short_term({ "type": "error", "content": error_msg, "timestamp": datetime.now().isoformat() }) return { "tool": tool_name, "parameters": tool_params, "result": f"Error: Tool '{tool_name}' not found.", "success": False, "error": "Tool not found" } else: error_msg = "Could not determine which tool to use." print(error_msg) # Store the error in memory self.memory_manager.add_to_short_term({ "type": "error", "content": error_msg, "timestamp": datetime.now().isoformat() }) # Default to web search as a fallback try: web_search_tool = None for tool in self.agent.tools: if tool.name == "web_search": web_search_tool = tool break if web_search_tool: result = web_search_tool.function(query) return { "tool": "web_search", "parameters": query, "result": result, "success": True, "error": None, "fallback": True } else: return { "tool": "none", "parameters": "none", "result": "Could not determine which tool to use and web_search fallback not available.", "success": False, "error": "No tool selected" } except Exception as e: return { "tool": "web_search", "parameters": query, "result": f"Error in fallback web search: {str(e)}", "success": False, "error": str(e), "fallback": True } except Exception as e: error_msg = f"Error during action phase: {str(e)}\n{traceback.format_exc()}" print(error_msg) # Store the error in memory self.memory_manager.add_to_short_term({ "type": "error", "content": error_msg, "timestamp": datetime.now().isoformat() }) return { "tool": "none", "parameters": "none", "result": f"Error during action planning: {str(e)}", "success": False, "error": str(e) } def observe(self, action_result: Dict[str, Any], plan: Dict[str, Any], query: str) -> Dict[str, Any]: """ Analyze the results of actions and determine next steps (Observe phase) Args: action_result: Results from the act step plan: The original plan query: The original query Returns: Dictionary containing observation and next steps """ # Apply the observation template observation_template = self.reasoning_templates["observe"] observation_prompt = observation_template.format( query=query, action=f"Tool: {action_result.get('tool', 'none')}\nParameters: {action_result.get('parameters', 'none')}", results=action_result.get('result', 'No results.') ) try: # Use the agent to analyze the results observation_response = self.agent.chat(observation_prompt) # Store the observation in memory self.memory_manager.add_to_short_term({ "type": "observation", "content": observation_response, "timestamp": datetime.now().isoformat() }) # Parse the observation to extract structured information analysis = self._extract_section(observation_response, "Analysis") next_steps = self._extract_section(observation_response, "Next Steps") progress = self._extract_section(observation_response, "Progress Assessment") # Determine if we need to continue with more actions continue_execution = True # Check for completion indicators completion_phrases = [ "task complete", "question answered", "fully answered", "100%", "task is complete", "fully resolved" ] if any(phrase in observation_response.lower() for phrase in completion_phrases): continue_execution = False # Store the final answer in long-term memory self.memory_manager.add_to_long_term({ "type": "final_answer", "query": query, "content": observation_response, "timestamp": datetime.now().isoformat(), "importance": 0.8 # High importance for final answers }) return { "raw_response": observation_response, "analysis": analysis, "next_steps": next_steps, "progress": progress, "continue": continue_execution } except Exception as e: error_msg = f"Error during observation phase: {str(e)}\n{traceback.format_exc()}" print(error_msg) # Store the error in memory self.memory_manager.add_to_short_term({ "type": "error", "content": error_msg, "timestamp": datetime.now().isoformat() }) # Default observation with continuation return { "raw_response": f"Error occurred during observation phase: {str(e)}", "analysis": "Could not analyze the results due to an error.", "next_steps": "Try a different approach or tool.", "progress": "Unknown due to error.", "continue": True # Continue by default on error } def _extract_section(self, text: str, section_name: str) -> str: """Extract a section from the response text""" pattern = rf"(?:^|\n)(?:#+\s*{re.escape(section_name)}:?|\*\*{re.escape(section_name)}:?\*\*|{re.escape(section_name)}:?)\s*(.*?)(?:\n(?:#+\s*|$)|\Z)" match = re.search(pattern, text, re.DOTALL | re.IGNORECASE) if match: content = match.group(1).strip() return content # Try a more lenient approach if the first one fails pattern = rf"{re.escape(section_name)}:?\s*(.*?)(?:\n\n|\n[A-Z]|\Z)" match = re.search(pattern, text, re.DOTALL | re.IGNORECASE) if match: content = match.group(1).strip() return content return f"No {section_name.lower()} found." def _extract_tool_info(self, text: str) -> Optional[Dict[str, Any]]: """Extract tool name and parameters from the response text""" # Try to find tool name tool_pattern = r"(?:TOOL|Tool|tool):\s*(\w+)" tool_match = re.search(tool_pattern, text) if not tool_match: return None tool_name = tool_match.group(1).strip() # Try to find parameters params_pattern = r"(?:PARAMETERS|Parameters|parameters):\s*(.*?)(?:\n\n|\n[A-Z]|\Z)" params_match = re.search(params_pattern, text, re.DOTALL) if params_match: params_text = params_match.group(1).strip() # Check if parameters are in key=value format if "=" in params_text: # Parse as dictionary params_dict = {} param_pairs = re.findall(r"(\w+)\s*=\s*([^,\n]+)", params_text) for key, value in param_pairs: params_dict[key.strip()] = value.strip() return { "tool": tool_name, "parameters": params_dict } else: # Treat as a single string parameter return { "tool": tool_name, "parameters": params_text } else: # No parameters found, use empty dict return { "tool": tool_name, "parameters": {} } def execute_reasoning_cycle(self, query: str, max_iterations: int = 5) -> str: """ Execute a complete Think-Act-Observe reasoning cycle Args: query: The user's query or task max_iterations: Maximum number of iterations Returns: Final answer to the query """ # Store the query in memory self.memory_manager.add_to_short_term({ "type": "query", "content": query, "timestamp": datetime.now().isoformat() }) # Initialize the workflow iteration = 0 final_answer = None all_results = [] while iteration < max_iterations: print(f"Iteration {iteration + 1}/{max_iterations}") # Think print("Thinking...") plan = self.think(query) # Act print("Acting...") previous_results = "\n".join([r.get("result", "") for r in all_results]) action_result = self.act(plan, query, previous_results) all_results.append(action_result) # Observe print("Observing...") observation = self.observe(action_result, plan, query) # Check if we have a final answer if not observation["continue"]: # Generate final answer final_answer_prompt = f""" TASK: {query} REASONING PROCESS: {plan.get('raw_response', 'No thinking process available.')} ACTIONS TAKEN: {', '.join([f"{r.get('tool', 'unknown')}({r.get('parameters', '')})" for r in all_results])} RESULTS: {previous_results} {action_result.get('result', '')} OBSERVATION: {observation.get('raw_response', 'No observation available.')} Based on all the above, provide a comprehensive final answer to the original task. """ final_answer = self.agent.chat(final_answer_prompt) # Store the final answer in long-term memory self.memory_manager.add_to_long_term({ "type": "final_answer", "query": query, "content": final_answer, "timestamp": datetime.now().isoformat(), "importance": 0.9 # Very high importance }) break # Update the query with the observation for the next iteration query = f""" Original task: {query} Progress so far: {observation.get('raw_response', 'No observation available.')} Please continue solving this task. """ iteration += 1 # If we reached max iterations without a final answer if final_answer is None: final_answer = f""" I've spent {max_iterations} iterations trying to solve this task. Here's my best answer based on what I've learned: {observation.get('raw_response', 'No final observation available.')} Note: This answer may be incomplete as I reached the maximum number of iterations. """ # Store the partial answer in long-term memory self.memory_manager.add_to_long_term({ "type": "partial_answer", "query": query, "content": final_answer, "timestamp": datetime.now().isoformat(), "importance": 0.6 # Medium importance for partial answers }) return final_answer # Example usage if __name__ == "__main__": # This would be imported from your agent.py from smolagents import Agent, InferenceClientModel, Tool # Mock agent for testing class MockAgent: def __init__(self): self.tools = [ Tool(name="web_search", description="Search the web", function=lambda x: f"Search results for: {x}"), Tool(name="calculator", description="Calculate", function=lambda x: f"Result: {eval(x)}") ] def chat(self, message): return f"Response to: {message[:50]}..." # Mock memory manager class MockMemoryManager: def add_to_short_term(self, item): print(f"Added to short-term: {item['type']}") def add_to_long_term(self, item): print(f"Added to long-term: {item['type']}") def get_relevant_memories(self, query): return [] # Test the reasoning system agent = MockAgent() memory_manager = MockMemoryManager() reasoning = ReasoningSystem(agent, memory_manager) result = reasoning.execute_reasoning_cycle("What is 2+2?") print(f"\nFinal result: {result}")