|
""" |
|
LangGraph workflow definition for the GAIA agent. |
|
|
|
This module defines the workflow graph for the GAIA agent using LangGraph. |
|
It implements nodes for question analysis, planning, tool selection, |
|
tool execution, and answer formulation, as well as the edges between them. |
|
|
|
The graph is designed to be integrated with the GAIAAgent class and can be |
|
extended with additional nodes and edges as needed. |
|
""" |
|
|
|
import logging |
|
import json |
|
import re |
|
import traceback |
|
import openai |
|
from typing import Dict, Any, List, Tuple, Optional, Annotated, TypedDict, Literal, Union, cast |
|
|
|
from langchain.prompts import PromptTemplate |
|
from langchain_core.messages import HumanMessage, AIMessage |
|
from langchain_core.runnables import RunnableConfig |
|
from langchain_core.output_parsers import StrOutputParser |
|
|
|
from langgraph.graph import StateGraph, END |
|
|
|
from src.gaia.agent.config import get_model_config, get_agent_config, VERBOSE |
|
from src.gaia.agent.tool_registry import ( |
|
ToolRegistry, |
|
create_default_registry, |
|
search, |
|
analyze_query, |
|
resolve_question_type |
|
) |
|
from src.gaia.memory import WorkingMemory, ResultCache |
|
|
|
logger = logging.getLogger("gaia_agent.graph") |
|
|
|
class QuestionAnalysis(TypedDict): |
|
"""Type for question analysis results.""" |
|
question_type: str |
|
required_tools: List[str] |
|
information_sources: List[str] |
|
complexity: str |
|
|
|
class PlanStep(TypedDict): |
|
"""Type for a single step in the plan.""" |
|
step_number: int |
|
description: str |
|
tool: Optional[str] |
|
tool_input: Optional[Dict[str, Any]] |
|
|
|
class ToolResult(TypedDict): |
|
"""Type for tool execution results.""" |
|
tool_name: str |
|
success: bool |
|
result: Any |
|
error: Optional[str] |
|
|
|
class AgentState(TypedDict): |
|
"""Type for agent state.""" |
|
question: str |
|
analysis: Optional[QuestionAnalysis] |
|
plan: Optional[List[PlanStep]] |
|
current_step: Optional[int] |
|
tool_results: Optional[List[ToolResult]] |
|
reasoning: Optional[str] |
|
answer: Optional[str] |
|
|
|
def analyze_question(state: AgentState) -> AgentState: |
|
""" |
|
Analyze the question to determine its type and required tools. |
|
|
|
Args: |
|
state: Current agent state |
|
|
|
Returns: |
|
Updated agent state with analysis |
|
""" |
|
logger.info("Analyzing question") |
|
|
|
question = state["question"] |
|
|
|
|
|
question_analysis = analyze_query(question) |
|
question_type = resolve_question_type(question) |
|
|
|
|
|
required_tools = [] |
|
|
|
|
|
required_tools.append("web_search") |
|
|
|
|
|
if "video" in question.lower() or "youtube" in question.lower(): |
|
required_tools.append("youtube_video") |
|
|
|
if "article" in question.lower() or "paper" in question.lower() or "research" in question.lower(): |
|
required_tools.append("arxiv_search") |
|
|
|
|
|
if question_analysis["depth_needed"] == "high": |
|
required_tools.append("perplexity_search") |
|
|
|
|
|
if question_type == "reversed_text": |
|
required_tools.append("text_manipulation") |
|
|
|
|
|
analysis = { |
|
"question_type": question_type, |
|
"required_tools": required_tools, |
|
"information_sources": question_analysis["preferred_sources"], |
|
"complexity": question_analysis["depth_needed"] |
|
} |
|
|
|
logger.info(f"Question analysis: {analysis}") |
|
|
|
|
|
return { |
|
**state, |
|
"analysis": analysis |
|
} |
|
|
|
def create_plan(state: AgentState) -> AgentState: |
|
""" |
|
Create a plan for answering the question. |
|
|
|
Args: |
|
state: Current agent state |
|
|
|
Returns: |
|
Updated agent state with plan |
|
""" |
|
logger.info("Creating plan") |
|
|
|
question = state["question"] |
|
analysis = state["analysis"] |
|
|
|
if not analysis: |
|
logger.warning("No question analysis available, creating default plan") |
|
analysis = { |
|
"question_type": "factual", |
|
"required_tools": ["web_search"], |
|
"information_sources": ["web"], |
|
"complexity": "medium" |
|
} |
|
|
|
|
|
plan = [] |
|
step_num = 1 |
|
|
|
|
|
if analysis["question_type"] == "reversed_text": |
|
|
|
plan.append({ |
|
"step_number": step_num, |
|
"description": "Reverse the text in the question to understand it", |
|
"tool": "text_manipulation", |
|
"tool_input": {"text": question, "operation": "reverse"} |
|
}) |
|
step_num += 1 |
|
|
|
|
|
if "web_search" in analysis["required_tools"]: |
|
plan.append({ |
|
"step_number": step_num, |
|
"description": "Search for information using web search", |
|
"tool": "web_search", |
|
"tool_input": {"query": question} |
|
}) |
|
step_num += 1 |
|
|
|
|
|
if "youtube_video" in analysis["required_tools"]: |
|
plan.append({ |
|
"step_number": step_num, |
|
"description": "Analyze video content using YouTube tool", |
|
"tool": "youtube_video", |
|
"tool_input": {"query": question} |
|
}) |
|
step_num += 1 |
|
|
|
if "arxiv_search" in analysis["required_tools"]: |
|
plan.append({ |
|
"step_number": step_num, |
|
"description": "Search for academic papers on arXiv", |
|
"tool": "arxiv_search", |
|
"tool_input": {"query": question} |
|
}) |
|
step_num += 1 |
|
|
|
if "perplexity_search" in analysis["required_tools"]: |
|
plan.append({ |
|
"step_number": step_num, |
|
"description": "Use Perplexity for in-depth information", |
|
"tool": "perplexity_search", |
|
"tool_input": {"query": question} |
|
}) |
|
step_num += 1 |
|
|
|
|
|
plan.append({ |
|
"step_number": step_num, |
|
"description": "Analyze all retrieved information and formulate answer", |
|
"tool": None, |
|
"tool_input": None |
|
}) |
|
|
|
logger.info(f"Created plan with {len(plan)} steps") |
|
|
|
|
|
return { |
|
**state, |
|
"plan": plan, |
|
"current_step": 0, |
|
"tool_results": [] |
|
} |
|
|
|
def execute_tool(state: AgentState) -> AgentState: |
|
""" |
|
Execute the current tool in the plan. |
|
|
|
Args: |
|
state: Current agent state |
|
|
|
Returns: |
|
Updated agent state with tool results |
|
""" |
|
current_step = state["current_step"] |
|
plan = state["plan"] |
|
|
|
if current_step is None or plan is None or current_step >= len(plan): |
|
return state |
|
|
|
step = plan[current_step] |
|
tool_name = step.get("tool") |
|
|
|
logger.info(f"Executing step {current_step}: {step}") |
|
|
|
if not tool_name: |
|
return { |
|
**state, |
|
"current_step": current_step + 1 |
|
} |
|
|
|
tool_input = step.get("tool_input", {}) |
|
|
|
|
|
tool_results = state.get("tool_results", []) or [] |
|
|
|
try: |
|
logger.info(f"Executing tool: {tool_name}") |
|
|
|
|
|
registry = create_default_registry() |
|
|
|
|
|
result = None |
|
if tool_name == "web_search": |
|
query = tool_input.get("query", state["question"]) |
|
search_results = search(registry, query, format_type="unified") |
|
result = search_results |
|
|
|
elif tool_name == "youtube_video": |
|
query = tool_input.get("query", state["question"]) |
|
|
|
youtube_urls = re.findall(r'https?://(?:www\.)?youtube\.com/watch\?v=[\w-]+', query) |
|
if not youtube_urls: |
|
youtube_urls = re.findall(r'https?://(?:www\.)?youtu\.be/[\w-]+', query) |
|
|
|
if youtube_urls: |
|
video_url = youtube_urls[0] |
|
if registry.get_tool("youtube_video"): |
|
result = registry.execute_tool("youtube_video", url=video_url) |
|
else: |
|
raise Exception("YouTube video tool not available") |
|
else: |
|
raise Exception("No YouTube URL found in query") |
|
|
|
elif tool_name == "arxiv_search": |
|
query = tool_input.get("query", state["question"]) |
|
if registry.get_tool("arxiv_search"): |
|
result = registry.execute_tool("arxiv_search", query=query) |
|
else: |
|
raise Exception("ArXiv search tool not available") |
|
|
|
elif tool_name == "perplexity_search": |
|
query = tool_input.get("query", state["question"]) |
|
if registry.get_tool("perplexity_search"): |
|
result = registry.execute_tool("perplexity_search", query=query) |
|
else: |
|
raise Exception("Perplexity search tool not available") |
|
|
|
elif tool_name == "text_manipulation": |
|
text = tool_input.get("text", state["question"]) |
|
operation = tool_input.get("operation", "reverse") |
|
|
|
if operation == "reverse": |
|
result = {'reversed_text': text[::-1]} |
|
else: |
|
result = {'original_text': text} |
|
|
|
|
|
tool_result = { |
|
"tool_name": tool_name, |
|
"success": True, |
|
"result": result, |
|
"error": None |
|
} |
|
|
|
logger.info(f"Tool {tool_name} executed successfully") |
|
|
|
except Exception as e: |
|
logger.error(f"Error executing tool {tool_name}: {str(e)}") |
|
logger.debug(traceback.format_exc()) |
|
|
|
|
|
tool_result = { |
|
"tool_name": tool_name, |
|
"success": False, |
|
"result": None, |
|
"error": str(e) |
|
} |
|
|
|
|
|
return { |
|
**state, |
|
"tool_results": tool_results + [tool_result], |
|
"current_step": current_step + 1 |
|
} |
|
|
|
def formulate_answer(state: AgentState) -> AgentState: |
|
""" |
|
Formulate an answer based on tool results and reasoning. |
|
|
|
Args: |
|
state: Current agent state |
|
|
|
Returns: |
|
Updated agent state with answer |
|
""" |
|
logger.info("Formulating answer") |
|
logger.info(f"State: {state}") |
|
|
|
question = state["question"] |
|
tool_results = state["tool_results"] or [] |
|
|
|
|
|
web_search_data = [] |
|
youtube_data = None |
|
arxiv_data = [] |
|
perplexity_data = None |
|
reversed_text = None |
|
|
|
for tool_result in tool_results: |
|
if not tool_result["success"]: |
|
continue |
|
|
|
if tool_result["tool_name"] == "web_search": |
|
result = tool_result["result"] |
|
if result and "results" in result: |
|
web_search_data = result["results"] |
|
|
|
elif tool_result["tool_name"] == "youtube_video": |
|
youtube_data = tool_result["result"] |
|
|
|
elif tool_result["tool_name"] == "arxiv_search": |
|
arxiv_data = tool_result["result"] |
|
|
|
elif tool_result["tool_name"] == "perplexity_search": |
|
result = tool_result["result"] |
|
if result and "content" in result: |
|
perplexity_data = result["content"] |
|
|
|
elif tool_result["tool_name"] == "text_manipulation": |
|
result = tool_result["result"] |
|
if result and "reversed_text" in result: |
|
reversed_text = result["reversed_text"] |
|
|
|
|
|
if reversed_text: |
|
logger.info("Processing reversed text question") |
|
|
|
|
|
original_question = reversed_text |
|
|
|
|
|
|
|
if "opposite" in original_question and "word" in original_question: |
|
|
|
match = re.search(r'opposite of the word (?:"|")?(\w+)(?:"|")?', original_question) |
|
if match: |
|
word = match.group(1).lower() |
|
|
|
|
|
opposites = { |
|
"left": "right", |
|
"right": "left", |
|
"up": "down", |
|
"down": "up", |
|
"black": "white", |
|
"white": "black", |
|
"yes": "no", |
|
"no": "yes", |
|
"hot": "cold", |
|
"cold": "hot", |
|
"big": "small", |
|
"small": "big", |
|
"tall": "short", |
|
"short": "tall" |
|
} |
|
|
|
if word in opposites: |
|
answer = opposites[word] |
|
return { |
|
**state, |
|
"reasoning": f"This is a reversed text question asking for the opposite of '{word}'.", |
|
"answer": answer |
|
} |
|
|
|
|
|
reasoning = "Based on the information I've gathered:\n\n" |
|
|
|
|
|
if web_search_data: |
|
reasoning += "From web search results:\n" |
|
for i, result in enumerate(web_search_data[:3]): |
|
reasoning += f"- {result.get('title', 'Untitled')}: {result.get('snippet', 'No snippet')}...\n" |
|
|
|
|
|
if youtube_data: |
|
reasoning += "\nFrom the YouTube video:\n" |
|
if isinstance(youtube_data, dict): |
|
reasoning += f"- Title: {youtube_data.get('title', 'Unknown')}\n" |
|
reasoning += f"- Content: {youtube_data.get('content', 'No content extracted')}...\n" |
|
elif isinstance(youtube_data, str): |
|
reasoning += f"- Content: {youtube_data}...\n" |
|
|
|
|
|
if arxiv_data: |
|
reasoning += "\nFrom academic papers:\n" |
|
for i, paper in enumerate(arxiv_data[:2]): |
|
if isinstance(paper, dict): |
|
reasoning += f"- {paper.get('title', 'Untitled')}: {paper.get('summary', 'No summary')[:100]}...\n" |
|
|
|
|
|
if perplexity_data: |
|
reasoning += "\nFrom Perplexity AI:\n" |
|
reasoning += f"{perplexity_data[:500]}...\n" |
|
|
|
|
|
answer = "" |
|
|
|
|
|
if perplexity_data: |
|
|
|
answer = perplexity_data.split('\n')[0] |
|
|
|
|
|
if "how many" in question.lower(): |
|
|
|
numbers = re.findall(r'\d+', perplexity_data) |
|
if numbers: |
|
|
|
answer = numbers[0] |
|
|
|
elif "who" in question.lower(): |
|
|
|
names = re.findall(r'[A-Z][a-z]+ [A-Z][a-z]+', perplexity_data) |
|
if names: |
|
answer = names[0] |
|
|
|
|
|
if not answer and web_search_data: |
|
|
|
if web_search_data: |
|
top_result = web_search_data[0] |
|
answer = top_result.get('snippet', '') |
|
|
|
|
|
if "video" in question.lower() and youtube_data: |
|
if isinstance(youtube_data, dict) and "content" in youtube_data: |
|
answer = youtube_data["content"] |
|
|
|
|
|
if not answer: |
|
answer = "I couldn't find enough information to answer your question accurately." |
|
|
|
|
|
answer = answer.strip() |
|
if not answer.endswith((".", "!", "?")): |
|
answer += "." |
|
|
|
logger.info("Answer formulation complete") |
|
|
|
|
|
return { |
|
**state, |
|
"reasoning": reasoning, |
|
"answer": answer |
|
} |
|
|
|
def should_continue(state: AgentState) -> Literal["continue", "complete"]: |
|
""" |
|
Determine if the agent should continue executing the plan or is done. |
|
|
|
Args: |
|
state: Current agent state |
|
|
|
Returns: |
|
"continue" if there are more steps to execute, "complete" otherwise |
|
""" |
|
current_step = state.get("current_step") |
|
plan = state.get("plan", []) |
|
|
|
if current_step is None: |
|
return "continue" |
|
|
|
if current_step < len(plan): |
|
return "continue" |
|
|
|
return "complete" |
|
|
|
def run_agent_graph(initial_state: Dict[str, Any], config: Dict[str, Any]) -> Dict[str, Any]: |
|
""" |
|
Run the agent graph with the initial state. |
|
|
|
Args: |
|
initial_state: Initial state dictionary, must include "question" |
|
config: Agent configuration dictionary |
|
|
|
Returns: |
|
Final state with answer and reasoning |
|
""" |
|
logger.info("Running agent graph") |
|
logger.info(f"Initial state: {initial_state}") |
|
logger.info(f"Config: {config}") |
|
|
|
try: |
|
|
|
state: AgentState = { |
|
"question": initial_state.get("question", ""), |
|
"analysis": None, |
|
"plan": None, |
|
"current_step": None, |
|
"tool_results": [], |
|
"reasoning": None, |
|
"answer": None |
|
} |
|
|
|
|
|
state = analyze_question(state) |
|
state = create_plan(state) |
|
|
|
|
|
while should_continue(state) == "continue": |
|
state = execute_tool(state) |
|
|
|
state = formulate_answer(state) |
|
|
|
logger.info("Agent graph completed successfully") |
|
|
|
|
|
final_result = { |
|
"answer": state.get("answer", "I couldn't find an answer to your question."), |
|
"reasoning": state.get("reasoning", ""), |
|
"plan": state.get("plan", []), |
|
"tool_results": state.get("tool_results", []) |
|
} |
|
|
|
logger.info(f"Final result: {final_result['answer'][:100]}...") |
|
|
|
return final_result |
|
|
|
except Exception as e: |
|
logger.error(f"Error running agent graph: {str(e)}") |
|
logger.debug(traceback.format_exc()) |
|
|
|
return { |
|
"answer": f"I encountered an error while processing your question: {str(e)}", |
|
"reasoning": f"An error occurred: {str(e)}", |
|
"plan": [], |
|
"tool_results": [] |
|
} |