|
""" |
|
Reasoning System for GAIA-Ready AI Agent |
|
|
|
This module provides advanced reasoning capabilities for the AI agent, |
|
implementing the ReAct approach (Reasoning + Acting) and supporting |
|
the Think-Act-Observe workflow. |
|
""" |
|
|
|
import os |
|
import json |
|
from typing import List, Dict, Any, Optional, Union, Tuple |
|
from datetime import datetime |
|
import traceback |
|
import re |
|
|
|
try: |
|
from smolagents import Agent, InferenceClientModel, Tool |
|
except ImportError: |
|
import subprocess |
|
subprocess.check_call(["pip", "install", "smolagents"]) |
|
from smolagents import Agent, InferenceClientModel, Tool |
|
|
|
|
|
class ReasoningSystem: |
|
""" |
|
Advanced reasoning system implementing the ReAct approach |
|
and supporting the Think-Act-Observe workflow. |
|
""" |
|
def __init__(self, agent, memory_manager): |
|
self.agent = agent |
|
self.memory_manager = memory_manager |
|
self.max_reasoning_depth = 5 |
|
self.reasoning_templates = self._load_reasoning_templates() |
|
|
|
def _load_reasoning_templates(self) -> Dict[str, str]: |
|
"""Load reasoning templates for different stages of the workflow""" |
|
return { |
|
"think": """ |
|
# Task Analysis and Planning |
|
|
|
## Task |
|
{query} |
|
|
|
## Relevant Context |
|
{context} |
|
|
|
## Analysis |
|
Let me analyze this task step by step: |
|
1. What is being asked? |
|
2. What information do I need? |
|
3. What challenges might I encounter? |
|
|
|
## Plan |
|
Based on my analysis, here's my plan: |
|
1. [First step] |
|
2. [Second step] |
|
3. [Third step] |
|
... |
|
|
|
## Tools Needed |
|
To accomplish this task, I'll need: |
|
- [Tool 1]: For [purpose] |
|
- [Tool 2]: For [purpose] |
|
... |
|
|
|
## Expected Outcome |
|
If successful, I expect to: |
|
[Description of expected outcome] |
|
""", |
|
"act": """ |
|
# Action Execution |
|
|
|
## Current Task |
|
{query} |
|
|
|
## Current Plan |
|
{plan} |
|
|
|
## Previous Results |
|
{previous_results} |
|
|
|
## Next Action |
|
Based on my plan and previous results, I'll now: |
|
1. Use the [tool name] tool |
|
2. With parameters: [parameters] |
|
3. Purpose: [why this action is needed] |
|
|
|
## Execution |
|
[Detailed description of how I'll execute this action] |
|
""", |
|
"observe": """ |
|
# Result Analysis |
|
|
|
## Current Task |
|
{query} |
|
|
|
## Action Taken |
|
{action} |
|
|
|
## Results Obtained |
|
{results} |
|
|
|
## Analysis |
|
Let me analyze these results: |
|
1. What did I learn? |
|
2. Does this answer the original question? |
|
3. Are there any inconsistencies or gaps? |
|
|
|
## Next Steps |
|
Based on my analysis: |
|
- [Next step recommendation] |
|
- [Alternative approach if needed] |
|
|
|
## Progress Assessment |
|
Task completion status: [percentage]% |
|
[Explanation of current progress] |
|
""" |
|
} |
|
|
|
def think(self, query: str) -> Dict[str, Any]: |
|
""" |
|
Analyze the task and plan an approach (Think phase) |
|
|
|
Args: |
|
query: The user's query or task |
|
|
|
Returns: |
|
Dictionary containing analysis and plan |
|
""" |
|
|
|
relevant_memories = self.memory_manager.get_relevant_memories(query) |
|
|
|
|
|
context = "" |
|
if relevant_memories: |
|
context_items = [] |
|
for memory in relevant_memories: |
|
memory_type = memory.get("type", "general") |
|
content = memory.get("content", "") |
|
relevance = memory.get("relevance_score", 0) |
|
context_items.append(f"- [{memory_type.upper()}] (Relevance: {relevance:.2f}): {content}") |
|
context = "\n".join(context_items) |
|
else: |
|
context = "No relevant prior knowledge found." |
|
|
|
|
|
thinking_template = self.reasoning_templates["think"] |
|
thinking_prompt = thinking_template.format( |
|
query=query, |
|
context=context |
|
) |
|
|
|
|
|
try: |
|
response = self.agent.chat(thinking_prompt) |
|
|
|
|
|
self.memory_manager.add_to_short_term({ |
|
"type": "thinking", |
|
"content": response, |
|
"timestamp": datetime.now().isoformat() |
|
}) |
|
|
|
|
|
analysis = self._extract_section(response, "Analysis") |
|
plan = self._extract_section(response, "Plan") |
|
tools_needed = self._extract_section(response, "Tools Needed") |
|
expected_outcome = self._extract_section(response, "Expected Outcome") |
|
|
|
return { |
|
"raw_response": response, |
|
"analysis": analysis, |
|
"plan": plan, |
|
"tools_needed": tools_needed, |
|
"expected_outcome": expected_outcome |
|
} |
|
except Exception as e: |
|
error_msg = f"Error during thinking phase: {str(e)}\n{traceback.format_exc()}" |
|
print(error_msg) |
|
|
|
|
|
self.memory_manager.add_to_short_term({ |
|
"type": "error", |
|
"content": error_msg, |
|
"timestamp": datetime.now().isoformat() |
|
}) |
|
|
|
|
|
return { |
|
"raw_response": "Error occurred during thinking phase.", |
|
"analysis": "Could not analyze the task due to an error.", |
|
"plan": "1. Try a simpler approach\n2. Break down the task into smaller steps", |
|
"tools_needed": "web_search: To find basic information", |
|
"expected_outcome": "Partial answer to the query" |
|
} |
|
|
|
def act(self, plan: Dict[str, Any], query: str, previous_results: str = "") -> Dict[str, Any]: |
|
""" |
|
Execute actions based on the plan (Act phase) |
|
|
|
Args: |
|
plan: The plan generated by the think step |
|
query: The original query |
|
previous_results: Results from previous actions |
|
|
|
Returns: |
|
Dictionary containing action details and results |
|
""" |
|
|
|
action_template = self.reasoning_templates["act"] |
|
action_prompt = action_template.format( |
|
query=query, |
|
plan=plan.get("plan", "No plan available."), |
|
previous_results=previous_results if previous_results else "No previous results." |
|
) |
|
|
|
try: |
|
|
|
action_response = self.agent.chat(action_prompt) |
|
|
|
|
|
self.memory_manager.add_to_short_term({ |
|
"type": "action_planning", |
|
"content": action_response, |
|
"timestamp": datetime.now().isoformat() |
|
}) |
|
|
|
|
|
tool_info = self._extract_tool_info(action_response) |
|
|
|
if not tool_info: |
|
|
|
direct_prompt = f""" |
|
Based on the task "{query}" and the plan: |
|
{plan.get('plan', 'No plan available.')} |
|
|
|
Which specific tool should I use next and with what parameters? |
|
Respond in this format: |
|
TOOL: [tool name] |
|
PARAMETERS: [parameter1=value1, parameter2=value2, ...] |
|
""" |
|
direct_response = self.agent.chat(direct_prompt) |
|
tool_info = self._extract_tool_info(direct_response) |
|
|
|
if tool_info: |
|
tool_name = tool_info["tool"] |
|
tool_params = tool_info["parameters"] |
|
|
|
|
|
matching_tool = None |
|
for tool in self.agent.tools: |
|
if tool.name == tool_name: |
|
matching_tool = tool |
|
break |
|
|
|
if matching_tool: |
|
|
|
try: |
|
if isinstance(tool_params, dict): |
|
result = matching_tool.function(**tool_params) |
|
else: |
|
result = matching_tool.function(tool_params) |
|
|
|
|
|
self.memory_manager.add_to_short_term({ |
|
"type": "action_result", |
|
"content": f"Tool: {tool_name}\nParameters: {tool_params}\nResult: {result}", |
|
"timestamp": datetime.now().isoformat() |
|
}) |
|
|
|
return { |
|
"tool": tool_name, |
|
"parameters": tool_params, |
|
"result": result, |
|
"success": True, |
|
"error": None |
|
} |
|
except Exception as e: |
|
error_msg = f"Error executing tool {tool_name}: {str(e)}\n{traceback.format_exc()}" |
|
print(error_msg) |
|
|
|
|
|
self.memory_manager.add_to_short_term({ |
|
"type": "error", |
|
"content": error_msg, |
|
"timestamp": datetime.now().isoformat() |
|
}) |
|
|
|
return { |
|
"tool": tool_name, |
|
"parameters": tool_params, |
|
"result": f"Error: {str(e)}", |
|
"success": False, |
|
"error": str(e) |
|
} |
|
else: |
|
error_msg = f"Tool '{tool_name}' not found." |
|
print(error_msg) |
|
|
|
|
|
self.memory_manager.add_to_short_term({ |
|
"type": "error", |
|
"content": error_msg, |
|
"timestamp": datetime.now().isoformat() |
|
}) |
|
|
|
return { |
|
"tool": tool_name, |
|
"parameters": tool_params, |
|
"result": f"Error: Tool '{tool_name}' not found.", |
|
"success": False, |
|
"error": "Tool not found" |
|
} |
|
else: |
|
error_msg = "Could not determine which tool to use." |
|
print(error_msg) |
|
|
|
|
|
self.memory_manager.add_to_short_term({ |
|
"type": "error", |
|
"content": error_msg, |
|
"timestamp": datetime.now().isoformat() |
|
}) |
|
|
|
|
|
try: |
|
web_search_tool = None |
|
for tool in self.agent.tools: |
|
if tool.name == "web_search": |
|
web_search_tool = tool |
|
break |
|
|
|
if web_search_tool: |
|
result = web_search_tool.function(query) |
|
return { |
|
"tool": "web_search", |
|
"parameters": query, |
|
"result": result, |
|
"success": True, |
|
"error": None, |
|
"fallback": True |
|
} |
|
else: |
|
return { |
|
"tool": "none", |
|
"parameters": "none", |
|
"result": "Could not determine which tool to use and web_search fallback not available.", |
|
"success": False, |
|
"error": "No tool selected" |
|
} |
|
except Exception as e: |
|
return { |
|
"tool": "web_search", |
|
"parameters": query, |
|
"result": f"Error in fallback web search: {str(e)}", |
|
"success": False, |
|
"error": str(e), |
|
"fallback": True |
|
} |
|
except Exception as e: |
|
error_msg = f"Error during action phase: {str(e)}\n{traceback.format_exc()}" |
|
print(error_msg) |
|
|
|
|
|
self.memory_manager.add_to_short_term({ |
|
"type": "error", |
|
"content": error_msg, |
|
"timestamp": datetime.now().isoformat() |
|
}) |
|
|
|
return { |
|
"tool": "none", |
|
"parameters": "none", |
|
"result": f"Error during action planning: {str(e)}", |
|
"success": False, |
|
"error": str(e) |
|
} |
|
|
|
def observe(self, action_result: Dict[str, Any], plan: Dict[str, Any], query: str) -> Dict[str, Any]: |
|
""" |
|
Analyze the results of actions and determine next steps (Observe phase) |
|
|
|
Args: |
|
action_result: Results from the act step |
|
plan: The original plan |
|
query: The original query |
|
|
|
Returns: |
|
Dictionary containing observation and next steps |
|
""" |
|
|
|
observation_template = self.reasoning_templates["observe"] |
|
observation_prompt = observation_template.format( |
|
query=query, |
|
action=f"Tool: {action_result.get('tool', 'none')}\nParameters: {action_result.get('parameters', 'none')}", |
|
results=action_result.get('result', 'No results.') |
|
) |
|
|
|
try: |
|
|
|
observation_response = self.agent.chat(observation_prompt) |
|
|
|
|
|
self.memory_manager.add_to_short_term({ |
|
"type": "observation", |
|
"content": observation_response, |
|
"timestamp": datetime.now().isoformat() |
|
}) |
|
|
|
|
|
analysis = self._extract_section(observation_response, "Analysis") |
|
next_steps = self._extract_section(observation_response, "Next Steps") |
|
progress = self._extract_section(observation_response, "Progress Assessment") |
|
|
|
|
|
continue_execution = True |
|
|
|
|
|
completion_phrases = [ |
|
"task complete", "question answered", "fully answered", |
|
"100%", "task is complete", "fully resolved" |
|
] |
|
|
|
if any(phrase in observation_response.lower() for phrase in completion_phrases): |
|
continue_execution = False |
|
|
|
|
|
self.memory_manager.add_to_long_term({ |
|
"type": "final_answer", |
|
"query": query, |
|
"content": observation_response, |
|
"timestamp": datetime.now().isoformat(), |
|
"importance": 0.8 |
|
}) |
|
|
|
return { |
|
"raw_response": observation_response, |
|
"analysis": analysis, |
|
"next_steps": next_steps, |
|
"progress": progress, |
|
"continue": continue_execution |
|
} |
|
except Exception as e: |
|
error_msg = f"Error during observation phase: {str(e)}\n{traceback.format_exc()}" |
|
print(error_msg) |
|
|
|
|
|
self.memory_manager.add_to_short_term({ |
|
"type": "error", |
|
"content": error_msg, |
|
"timestamp": datetime.now().isoformat() |
|
}) |
|
|
|
|
|
return { |
|
"raw_response": f"Error occurred during observation phase: {str(e)}", |
|
"analysis": "Could not analyze the results due to an error.", |
|
"next_steps": "Try a different approach or tool.", |
|
"progress": "Unknown due to error.", |
|
"continue": True |
|
} |
|
|
|
def _extract_section(self, text: str, section_name: str) -> str: |
|
"""Extract a section from the response text""" |
|
pattern = rf"(?:^|\n)(?:#+\s*{re.escape(section_name)}:?|\*\*{re.escape(section_name)}:?\*\*|{re.escape(section_name)}:?)\s*(.*?)(?:\n(?:#+\s*|$)|\Z)" |
|
match = re.search(pattern, text, re.DOTALL | re.IGNORECASE) |
|
|
|
if match: |
|
content = match.group(1).strip() |
|
return content |
|
|
|
|
|
pattern = rf"{re.escape(section_name)}:?\s*(.*?)(?:\n\n|\n[A-Z]|\Z)" |
|
match = re.search(pattern, text, re.DOTALL | re.IGNORECASE) |
|
|
|
if match: |
|
content = match.group(1).strip() |
|
return content |
|
|
|
return f"No {section_name.lower()} found." |
|
|
|
def _extract_tool_info(self, text: str) -> Optional[Dict[str, Any]]: |
|
"""Extract tool name and parameters from the response text""" |
|
|
|
tool_pattern = r"(?:TOOL|Tool|tool):\s*(\w+)" |
|
tool_match = re.search(tool_pattern, text) |
|
|
|
if not tool_match: |
|
return None |
|
|
|
tool_name = tool_match.group(1).strip() |
|
|
|
|
|
params_pattern = r"(?:PARAMETERS|Parameters|parameters):\s*(.*?)(?:\n\n|\n[A-Z]|\Z)" |
|
params_match = re.search(params_pattern, text, re.DOTALL) |
|
|
|
if params_match: |
|
params_text = params_match.group(1).strip() |
|
|
|
|
|
if "=" in params_text: |
|
|
|
params_dict = {} |
|
param_pairs = re.findall(r"(\w+)\s*=\s*([^,\n]+)", params_text) |
|
|
|
for key, value in param_pairs: |
|
params_dict[key.strip()] = value.strip() |
|
|
|
return { |
|
"tool": tool_name, |
|
"parameters": params_dict |
|
} |
|
else: |
|
|
|
return { |
|
"tool": tool_name, |
|
"parameters": params_text |
|
} |
|
else: |
|
|
|
return { |
|
"tool": tool_name, |
|
"parameters": {} |
|
} |
|
|
|
def execute_reasoning_cycle(self, query: str, max_iterations: int = 5) -> str: |
|
""" |
|
Execute a complete Think-Act-Observe reasoning cycle |
|
|
|
Args: |
|
query: The user's query or task |
|
max_iterations: Maximum number of iterations |
|
|
|
Returns: |
|
Final answer to the query |
|
""" |
|
|
|
self.memory_manager.add_to_short_term({ |
|
"type": "query", |
|
"content": query, |
|
"timestamp": datetime.now().isoformat() |
|
}) |
|
|
|
|
|
iteration = 0 |
|
final_answer = None |
|
all_results = [] |
|
|
|
while iteration < max_iterations: |
|
print(f"Iteration {iteration + 1}/{max_iterations}") |
|
|
|
|
|
print("Thinking...") |
|
plan = self.think(query) |
|
|
|
|
|
print("Acting...") |
|
previous_results = "\n".join([r.get("result", "") for r in all_results]) |
|
action_result = self.act(plan, query, previous_results) |
|
all_results.append(action_result) |
|
|
|
|
|
print("Observing...") |
|
observation = self.observe(action_result, plan, query) |
|
|
|
|
|
if not observation["continue"]: |
|
|
|
final_answer_prompt = f""" |
|
TASK: {query} |
|
|
|
REASONING PROCESS: |
|
{plan.get('raw_response', 'No thinking process available.')} |
|
|
|
ACTIONS TAKEN: |
|
{', '.join([f"{r.get('tool', 'unknown')}({r.get('parameters', '')})" for r in all_results])} |
|
|
|
RESULTS: |
|
{previous_results} |
|
{action_result.get('result', '')} |
|
|
|
OBSERVATION: |
|
{observation.get('raw_response', 'No observation available.')} |
|
|
|
Based on all the above, provide a comprehensive final answer to the original task. |
|
""" |
|
final_answer = self.agent.chat(final_answer_prompt) |
|
|
|
|
|
self.memory_manager.add_to_long_term({ |
|
"type": "final_answer", |
|
"query": query, |
|
"content": final_answer, |
|
"timestamp": datetime.now().isoformat(), |
|
"importance": 0.9 |
|
}) |
|
|
|
break |
|
|
|
|
|
query = f""" |
|
Original task: {query} |
|
|
|
Progress so far: |
|
{observation.get('raw_response', 'No observation available.')} |
|
|
|
Please continue solving this task. |
|
""" |
|
|
|
iteration += 1 |
|
|
|
|
|
if final_answer is None: |
|
final_answer = f""" |
|
I've spent {max_iterations} iterations trying to solve this task. |
|
Here's my best answer based on what I've learned: |
|
|
|
{observation.get('raw_response', 'No final observation available.')} |
|
|
|
Note: This answer may be incomplete as I reached the maximum number of iterations. |
|
""" |
|
|
|
|
|
self.memory_manager.add_to_long_term({ |
|
"type": "partial_answer", |
|
"query": query, |
|
"content": final_answer, |
|
"timestamp": datetime.now().isoformat(), |
|
"importance": 0.6 |
|
}) |
|
|
|
return final_answer |
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
from smolagents import Agent, InferenceClientModel, Tool |
|
|
|
|
|
class MockAgent: |
|
def __init__(self): |
|
self.tools = [ |
|
Tool(name="web_search", description="Search the web", function=lambda x: f"Search results for: {x}"), |
|
Tool(name="calculator", description="Calculate", function=lambda x: f"Result: {eval(x)}") |
|
] |
|
|
|
def chat(self, message): |
|
return f"Response to: {message[:50]}..." |
|
|
|
|
|
class MockMemoryManager: |
|
def add_to_short_term(self, item): |
|
print(f"Added to short-term: {item['type']}") |
|
|
|
def add_to_long_term(self, item): |
|
print(f"Added to long-term: {item['type']}") |
|
|
|
def get_relevant_memories(self, query): |
|
return [] |
|
|
|
|
|
agent = MockAgent() |
|
memory_manager = MockMemoryManager() |
|
reasoning = ReasoningSystem(agent, memory_manager) |
|
|
|
result = reasoning.execute_reasoning_cycle("What is 2+2?") |
|
print(f"\nFinal result: {result}") |
|
|