Spaces:
Sleeping
Sleeping
import json | |
import logging | |
import pdb | |
import traceback | |
from typing import Any, Awaitable, Callable, Dict, Generic, List, Optional, Type, TypeVar, Union | |
from PIL import Image, ImageDraw, ImageFont | |
import os | |
import base64 | |
import io | |
import asyncio | |
import time | |
import platform | |
from browser_use.agent.prompts import SystemPrompt, AgentMessagePrompt | |
from browser_use.agent.service import Agent | |
from browser_use.agent.message_manager.utils import convert_input_messages, extract_json_from_model_output, \ | |
save_conversation | |
from browser_use.agent.views import ( | |
ActionResult, | |
AgentError, | |
AgentHistory, | |
AgentHistoryList, | |
AgentOutput, | |
AgentSettings, | |
AgentStepInfo, | |
StepMetadata, | |
ToolCallingMethod, | |
) | |
from browser_use.agent.gif import create_history_gif | |
from browser_use.browser.browser import Browser | |
from browser_use.browser.context import BrowserContext | |
from browser_use.browser.views import BrowserStateHistory | |
from browser_use.controller.service import Controller | |
from browser_use.telemetry.views import ( | |
AgentEndTelemetryEvent, | |
AgentRunTelemetryEvent, | |
AgentStepTelemetryEvent, | |
) | |
from browser_use.utils import time_execution_async | |
from langchain_core.language_models.chat_models import BaseChatModel | |
from langchain_core.messages import ( | |
BaseMessage, | |
HumanMessage, | |
AIMessage | |
) | |
from browser_use.browser.views import BrowserState | |
from browser_use.agent.prompts import PlannerPrompt | |
from pydantic import BaseModel | |
from json_repair import repair_json | |
from src.utils.agent_state import AgentState | |
from src.utils.replayer import TraceReplayer, load_trace, Drift | |
from src.utils.recorder import Recorder | |
from .custom_message_manager import CustomMessageManager, CustomMessageManagerSettings | |
from .custom_views import CustomAgentOutput, CustomAgentStepInfo, CustomAgentState as CustomAgentStateType, CustomAgentBrain | |
logger = logging.getLogger(__name__) | |
Context = TypeVar('Context') | |
# Define a simple structure for replay task details for clarity | |
class ReplayTaskDetails: | |
def __init__(self, mode: str, trace_path: str, speed: float = 1.0, trace_save_path: Optional[str] = None): | |
self.mode = mode | |
self.trace_path = trace_path | |
self.speed = speed | |
self.trace_save_path = trace_save_path # For saving new traces if needed during an operation that might also record | |
class CustomAgent(Agent): | |
def __init__( | |
self, | |
task: str, | |
llm: BaseChatModel, | |
add_infos: str = "", | |
# Optional parameters | |
browser: Browser | None = None, | |
browser_context: BrowserContext | None = None, | |
controller: Controller[Context] = Controller(), | |
# Initial agent run parameters | |
sensitive_data: Optional[Dict[str, str]] = None, | |
initial_actions: Optional[List[Dict[str, Dict[str, Any]]]] = None, | |
# Cloud Callbacks | |
register_new_step_callback: Callable[['BrowserState', 'AgentOutput', int], Awaitable[None]] | None = None, | |
register_done_callback: Callable[['AgentHistoryList'], Awaitable[None]] | None = None, | |
register_external_agent_status_raise_error_callback: Callable[[], Awaitable[bool]] | None = None, | |
# Agent settings | |
use_vision: bool = True, | |
use_vision_for_planner: bool = False, | |
save_conversation_path: Optional[str] = None, | |
save_conversation_path_encoding: Optional[str] = 'utf-8', | |
max_failures: int = 3, | |
retry_delay: int = 10, | |
system_prompt_class: Type[SystemPrompt] = SystemPrompt, | |
agent_prompt_class: Type[AgentMessagePrompt] = AgentMessagePrompt, | |
max_input_tokens: int = 128000, | |
validate_output: bool = False, | |
message_context: Optional[str] = None, | |
generate_gif: bool | str = False, | |
available_file_paths: Optional[list[str]] = None, | |
include_attributes: list[str] = [ | |
'title', | |
'type', | |
'name', | |
'role', | |
'aria-label', | |
'placeholder', | |
'value', | |
'alt', | |
'aria-expanded', | |
'data-date-format', | |
], | |
max_actions_per_step: int = 10, | |
tool_calling_method: Optional[ToolCallingMethod] = 'auto', | |
page_extraction_llm: Optional[BaseChatModel] = None, | |
planner_llm: Optional[BaseChatModel] = None, | |
planner_interval: int = 1, # Run planner every N steps | |
# Inject state | |
injected_agent_state: Optional[CustomAgentStateType] = None, | |
context: Context | None = None, | |
): | |
super().__init__( | |
task=task, | |
llm=llm, | |
browser=browser, | |
browser_context=browser_context, | |
controller=controller, | |
sensitive_data=sensitive_data, | |
initial_actions=initial_actions, | |
register_new_step_callback=register_new_step_callback, | |
register_done_callback=register_done_callback, | |
register_external_agent_status_raise_error_callback=register_external_agent_status_raise_error_callback, | |
use_vision=use_vision, | |
use_vision_for_planner=use_vision_for_planner, | |
save_conversation_path=save_conversation_path, | |
save_conversation_path_encoding=save_conversation_path_encoding, | |
max_failures=max_failures, | |
retry_delay=retry_delay, | |
system_prompt_class=system_prompt_class, | |
max_input_tokens=max_input_tokens, | |
validate_output=validate_output, | |
message_context=message_context, | |
generate_gif=generate_gif, | |
available_file_paths=available_file_paths, | |
include_attributes=include_attributes, | |
max_actions_per_step=max_actions_per_step, | |
tool_calling_method=tool_calling_method, | |
page_extraction_llm=page_extraction_llm, | |
planner_llm=planner_llm, | |
planner_interval=planner_interval, | |
injected_agent_state=None, | |
context=context, | |
) | |
# Initialize or restore CustomAgentState | |
if injected_agent_state is not None and isinstance(injected_agent_state, CustomAgentStateType): | |
self.state: CustomAgentStateType = injected_agent_state | |
else: | |
self.state: CustomAgentStateType = CustomAgentStateType() | |
if injected_agent_state is not None: # Was provided but wrong type | |
logger.warning("injected_agent_state was provided but is not of type CustomAgentState. Initializing default CustomAgentState.") | |
self.add_infos = add_infos | |
# self.replay_event_file is removed, handled by task_input in run() | |
self.current_task_memory: str = "" # Initialize custom memory | |
self._message_manager: CustomMessageManager = CustomMessageManager( | |
task=self.task, # self.task is set by super().__init__ | |
system_message=self.settings.system_prompt_class( | |
self.available_actions, | |
max_actions_per_step=self.settings.max_actions_per_step, | |
).get_system_message(), | |
settings=CustomMessageManagerSettings( | |
max_input_tokens=self.settings.max_input_tokens, | |
include_attributes=self.settings.include_attributes, | |
message_context=self.settings.message_context, | |
sensitive_data=sensitive_data, | |
available_file_paths=self.settings.available_file_paths, | |
agent_prompt_class=agent_prompt_class | |
), | |
state=self.state.message_manager_state, # Use state from CustomAgentStateType | |
) | |
## TODO: Eval the response from LLM | |
def _log_response(self, response: CustomAgentOutput) -> None: | |
"""Log the model's response""" | |
if "Success" in response.current_state.evaluation_previous_goal: | |
emoji = "✅" | |
elif "Failed" in response.current_state.evaluation_previous_goal: | |
emoji = "❌" | |
else: | |
emoji = "🤷" | |
logger.info(f"{emoji} Eval: {response.current_state.evaluation_previous_goal}") | |
logger.info(f"🧠 New Memory: {response.current_state.important_contents}") | |
logger.info(f"🤔 Thought: {response.current_state.thought}") | |
logger.info(f"🎯 Next Goal: {response.current_state.next_goal}") | |
for i, action in enumerate(response.action): | |
logger.info( | |
f"🛠️ Action {i + 1}/{len(response.action)}: {action.model_dump_json(exclude_unset=True)}" | |
) | |
def _setup_action_models(self) -> None: | |
"""Setup dynamic action models from controller's registry""" | |
# Get the dynamic action model from controller's registry | |
self.ActionModel = self.controller.registry.create_action_model() | |
# Create output model with the dynamic actions | |
self.AgentOutput = CustomAgentOutput.type_with_custom_actions(self.ActionModel) | |
def update_step_info( | |
self, model_output: CustomAgentOutput, step_info: Optional[CustomAgentStepInfo] = None | |
): | |
""" | |
update step info | |
@dev : New Memory from LLM stores at important_contents. | |
Usage of important_contents is | |
- Track progress in repetitive tasks (e.g., "for each", "for all", "x times") | |
- Store important information found during the task | |
- Keep track of status and subresults for long tasks | |
- Store extracted content from pages | |
""" | |
if step_info is None: | |
return | |
step_info.step_number += 1 | |
important_contents = model_output.current_state.important_contents | |
if ( | |
important_contents | |
and "None" not in important_contents | |
and important_contents not in step_info.memory | |
): | |
step_info.memory += important_contents + "\n" | |
logger.info(f"🧠 All Memory: \n{step_info.memory}") | |
# hint: get next action from LLM by calling llm.invoke in utils/llm.py | |
async def get_next_action(self, input_messages: list[BaseMessage]) -> CustomAgentOutput: | |
"""Get next action from LLM based on current state""" | |
# The _convert_input_messages and cleaned_messages logic seems to have been | |
# for a specific format possibly expected by a previous _get_model_output method. | |
# We will now directly use self.llm.ainvoke with input_messages (List[BaseMessage]). | |
# The logic for removing image_urls, if still needed, would have to be | |
# applied to input_messages before this call, or handled by the LLM itself. | |
if not self.llm: | |
logger.error("LLM not initialized in CustomAgent.") | |
# Return an error structure that _parse_model_output can handle | |
# This assumes _parse_model_output can parse a JSON string error. | |
# The actual error handling might need to be more robust based on _parse_model_output's capabilities. | |
# Also, self.AgentOutput needs to be available here. | |
if not hasattr(self, 'AgentOutput') or not self.AgentOutput: | |
self._setup_action_models() # Ensure AgentOutput is set up | |
# Construct a raw string that _parse_model_output can work with to produce an AgentOutput | |
# This usually involves a JSON string that looks like what the LLM would output in an error case. | |
# For now, an empty actions list and an error message in thought/state might be a way. | |
# This is a placeholder for robust error generation. | |
error_payload = { | |
"current_state": { | |
"evaluation_previous_goal": "Error", | |
"important_contents": "LLM not initialized.", | |
"thought": "Critical error: LLM not initialized.", | |
"next_goal": "Cannot proceed." | |
}, | |
"action": [] | |
} | |
model_output_raw = json.dumps(error_payload) | |
return self._parse_model_output(model_output_raw, self.ActionModel) | |
try: | |
llm_response = await self.llm.ainvoke(input_messages) | |
# model_output_raw should be a string, typically the content from the LLM response. | |
# The base class's _parse_model_output is expected to handle this string. | |
if hasattr(llm_response, 'content') and llm_response.content is not None: | |
model_output_raw = str(llm_response.content) | |
elif isinstance(llm_response, AIMessage) and llm_response.tool_calls: | |
# If content is None but there are tool_calls, the parser might expect | |
# a specific string format (e.g., JSON of tool_calls) or to handle AIMessage directly. | |
# Forcing it to string for now, assuming the parser can handle stringified tool_calls | |
# or that the main information is in .content and tool_calls are metadata for the parser. | |
# This part is sensitive to how the base Agent's parser works. | |
# A common robust approach is for the LLM to put tool call JSON into the .content string. | |
# If not, serializing tool_calls to JSON is a common fallback if the parser expects it. | |
try: | |
# Attempt to create a JSON string that might represent the tool calls | |
# ToolCall objects in Langchain are typically TypedDicts and directly serializable. | |
model_output_raw = json.dumps(llm_response.tool_calls) | |
except Exception as serialization_error: | |
logger.warning(f"Could not serialize tool_calls for AIMessage: {serialization_error}. Falling back to str(AIMessage).") | |
model_output_raw = str(llm_response) # Fallback to full string representation | |
else: | |
model_output_raw = str(llm_response) # General fallback | |
except Exception as e: | |
logger.error(f"Error invoking LLM: {e}", exc_info=True) | |
error_payload = { | |
"current_state": { | |
"evaluation_previous_goal": "Error", | |
"important_contents": f"LLM invocation error: {str(e)}", | |
"thought": f"LLM invocation error: {str(e)}", | |
"next_goal": "Cannot proceed." | |
}, | |
"action": [] | |
} | |
model_output_raw = json.dumps(error_payload) | |
# Parse the model output | |
# Ensure self.ActionModel is available for the parser | |
if not hasattr(self, 'ActionModel') or not self.ActionModel: | |
self._setup_action_models() # Ensure ActionModel is set up for parsing | |
parsed_output = self._parse_model_output(model_output_raw, self.ActionModel) | |
return parsed_output | |
async def _run_planner(self) -> Optional[str]: | |
"""Run the planner to analyze state and suggest next steps""" | |
# Skip planning if no planner_llm is set | |
if not self.settings.planner_llm: | |
return None | |
# Create planner message history using full message history | |
planner_messages = [ | |
PlannerPrompt(self.controller.registry.get_prompt_description()).get_system_message(), | |
*self.message_manager.get_messages()[1:], # Use full message history except the first | |
] | |
if not self.settings.use_vision_for_planner and self.settings.use_vision: | |
# Type hint for last_state_message was HumanMessage, ensure planner_messages[-1] is HumanMessage or check type | |
last_planner_message = planner_messages[-1] | |
new_msg_content: Union[str, List[Dict[str, Any]]] = '' # type for new content | |
if isinstance(last_planner_message, HumanMessage): | |
if isinstance(last_planner_message.content, list): | |
processed_content_list = [] | |
for item in last_planner_message.content: | |
if isinstance(item, dict): | |
if item.get('type') == 'text': | |
processed_content_list.append({'type': 'text', 'text': item.get('text', '')}) | |
# Keep other dict types if necessary, or filter image_url | |
elif item.get('type') == 'image_url': | |
continue # Skip image | |
else: | |
processed_content_list.append(item) # Keep other dicts | |
elif isinstance(item, str): | |
processed_content_list.append({'type': 'text', 'text': item}) # Convert str to dict | |
new_msg_content = processed_content_list | |
# Reconstruct new_msg from processed_content_list if needed as a single string | |
temp_new_msg = "" | |
for item_content in new_msg_content: # new_msg_content is List[Dict[str,Any]] | |
if isinstance(item_content, dict) and item_content.get('type') == 'text': | |
temp_new_msg += item_content.get('text','') | |
new_msg = temp_new_msg | |
elif isinstance(last_planner_message.content, str): | |
new_msg = last_planner_message.content | |
planner_messages[-1] = HumanMessage(content=new_msg if new_msg else last_planner_message.content) | |
# Get planner output | |
response = await self.settings.planner_llm.ainvoke(planner_messages) | |
plan = str(response.content) | |
# console log plan | |
print(f"plan: {plan}") | |
last_message_from_manager = self.message_manager.get_messages()[-1] | |
if isinstance(last_message_from_manager, HumanMessage): | |
# Target last_message_from_manager (which is a HumanMessage) for modification | |
if isinstance(last_message_from_manager.content, list): | |
# Create a new list for content to avoid modifying immutable parts if any | |
new_content_list = [] | |
modified = False | |
for item in last_message_from_manager.content: | |
if isinstance(item, dict) and item.get('type') == 'text': | |
current_text = item.get('text', '') | |
# Create a new dict for the modified text item | |
new_content_list.append({'type': 'text', 'text': current_text + f"\\nPlanning Agent outputs plans:\\n {plan}\\n"}) | |
modified = True | |
else: | |
new_content_list.append(item) # Keep other items as is | |
if modified: | |
last_message_from_manager.content = new_content_list | |
else: # If no text item was found to append to, add a new one | |
new_content_list.append({'type': 'text', 'text': f"\\nPlanning Agent outputs plans:\\n {plan}\\n"}) | |
last_message_from_manager.content = new_content_list | |
elif isinstance(last_message_from_manager.content, str): | |
last_message_from_manager.content += f"\\nPlanning Agent outputs plans:\\n {plan}\\n " | |
# If no modification happened (e.g. content was not list or str, or list had no text part) | |
# one might consider appending a new HumanMessage with the plan, but that changes history structure. | |
try: | |
plan_json = json.loads(plan.replace("```json", "").replace("```", "")) | |
logger.info(f'📋 Plans:\\n{json.dumps(plan_json, indent=4)}') | |
reasoning_content = getattr(response, "reasoning_content", None) | |
if reasoning_content: | |
logger.info("🤯 Start Planning Deep Thinking: ") | |
logger.info(reasoning_content) | |
logger.info("🤯 End Planning Deep Thinking") | |
except json.JSONDecodeError: | |
logger.info(f'📋 Plans:\n{plan}') | |
except Exception as e: | |
logger.debug(f'Error parsing planning analysis: {e}') | |
logger.info(f'📋 Plans: {plan}') | |
return plan | |
def _summarize_browsing_history(self, max_steps: int = 5, max_chars: int = 1500) -> str: | |
if not hasattr(self.state, 'history') or not self.state.history: | |
return "No browsing history yet." | |
summary_lines = [] | |
try: | |
# Iterate backwards through history items | |
for history_item in reversed(self.state.history.history): | |
if len(summary_lines) >= max_steps: | |
break | |
page_title = getattr(history_item.state, "page_title", "Unknown Page") if history_item.state else "Unknown Page" | |
url = getattr(history_item.state, "url", "Unknown URL") if history_item.state else "Unknown URL" | |
actions_summary = [] | |
current_actions = history_item.model_output.action if history_item.model_output and hasattr(history_item.model_output, 'action') else [] | |
if current_actions: | |
for act_model in current_actions: # act_model is ActionModel | |
if hasattr(act_model, 'name'): | |
action_str = f"{act_model.name}" # type: ignore[attr-defined] | |
args_str = json.dumps(act_model.arguments) if hasattr(act_model, 'arguments') and act_model.arguments else "" # type: ignore[attr-defined] | |
if args_str and args_str !="{}": | |
action_str += f"({args_str})" | |
actions_summary.append(action_str) | |
action_desc = "; ".join(actions_summary) if actions_summary else "No action taken" | |
step_num_str = f"Step {history_item.metadata.step_number}" if history_item.metadata and hasattr(history_item.metadata, 'step_number') else "Step Unknown" | |
summary_line = f"- {step_num_str}: [{page_title}]({url}) - Action: {action_desc}\\\\n" | |
if sum(len(s) for s in summary_lines) + len(summary_line) > max_chars and summary_lines: | |
summary_lines.append("... (history truncated due to length)") | |
break | |
summary_lines.append(summary_line) | |
except Exception as e: | |
logger.error(f"Error summarizing browsing history: {e}") | |
return "Error summarizing history." | |
if not summary_lines: | |
return "No actions recorded in recent history." | |
return "Browsing History (Recent Steps):\\n" + "".join(reversed(summary_lines)) | |
async def step(self, base_step_info: Optional[AgentStepInfo] = None) -> None: | |
# The base_step_info comes from the superclass Agent's run loop. | |
# We need to create a CustomAgentStepInfo for our custom prompts. | |
# if not base_step_info: # This check might be too strict if super().run() doesn't always provide it. | |
# logger.error("base_step_info not provided to CustomAgent.step by superclass run loop.") | |
# # Decide how to handle this: error out, or create a default? | |
# # For now, let's assume it's provided or self.state is the source of truth for step numbers. | |
# # If super().run() manages step counts, base_step_info.step_number would be relevant. | |
# # If CustomAgent manages its own (self.state.n_steps), use that. | |
# # Let's use self.state for step counts as it seems to be incremented by CustomAgent. | |
current_custom_step_info = CustomAgentStepInfo( | |
step_number=self.state.n_steps, # Use self.state.n_steps | |
max_steps=self.state.max_steps if self.state.max_steps is not None else 100, # Get from state or default | |
task=self.task, | |
add_infos=self.add_infos, | |
memory=self.current_task_memory | |
) | |
logger.info(f"CustomAgent - Step {current_custom_step_info.step_number}: Starting step.") # AGENT_HEALTH_LOG | |
model_output = None # Initialize to ensure it's defined for finally | |
state = None # Initialize | |
result = None # Initialize | |
tokens = 0 # Initialize | |
step_start_time = time.time() | |
try: | |
logger.info(f"CustomAgent - Step {current_custom_step_info.step_number}: Attempting to get browser state.") # NEW LOG | |
state = await self.browser_context.get_state() | |
logger.info(f"CustomAgent - Step {current_custom_step_info.step_number}: Browser state retrieval complete. State is None: {state is None}") # NEW LOG | |
if state: | |
logger.debug(f"CustomAgent.step: self.browser_context.get_state() returned. URL: {state.url if state else 'N/A'}") | |
# AGENT_HEALTH_LOG - Log raw observation | |
if state: | |
# Avoid logging full screenshots if they are part of the state | |
# loggable_state_dict = state.model_dump() # This caused AttributeError | |
# Manually construct a dictionary for logging from BrowserState attributes | |
loggable_state_dict = { | |
"url": getattr(state, 'url', 'N/A'), | |
"html_content": getattr(state, 'html_content', '')[:200] + "... (truncated)" if getattr(state, 'html_content', '') else 'N/A', | |
"interactive_elements": f"{len(getattr(state, 'interactive_elements', []))} elements", | |
"page_title": getattr(state, 'page_title', 'N/A'), | |
"focused_element_index": getattr(state, 'focused_element_index', None), | |
# Add other relevant attributes you want to log from BrowserState | |
} | |
screenshot_data = getattr(state, 'screenshot', None) | |
if screenshot_data: | |
loggable_state_dict['screenshot'] = f"Screenshot data present (length: {len(screenshot_data)})" | |
else: | |
loggable_state_dict['screenshot'] = "No screenshot data" | |
logger.debug(f"CustomAgent - Step {current_custom_step_info.step_number}: Raw observation received: {json.dumps(loggable_state_dict, indent=2)}") | |
else: | |
logger.debug(f"CustomAgent - Step {current_custom_step_info.step_number}: No observation (state is None).") | |
await self._raise_if_stopped_or_paused() | |
history_summary_str = self._summarize_browsing_history(max_steps=5, max_chars=1500) | |
self.message_manager.add_state_message( | |
state=state, | |
actions=self.state.last_action, # type: ignore[call-arg] | |
result=self.state.last_result, | |
step_info=current_custom_step_info, # Use the created CustomAgentStepInfo | |
use_vision=self.settings.use_vision, | |
history_summary=history_summary_str # type: ignore[call-arg] | |
) | |
if self.settings.planner_llm and self.state.n_steps % self.settings.planner_interval == 0: | |
await self._run_planner() | |
input_messages = self.message_manager.get_messages() | |
tokens = self._message_manager.state.history.current_tokens | |
# AGENT_HEALTH_LOG - Before LLM call | |
logger.debug(f"CustomAgent - Step {current_custom_step_info.step_number}: Preparing to call LLM. Number of input messages: {len(input_messages)}. Tokens: {tokens}") | |
# For very detailed debugging, you might log the messages themselves, but be mindful of size/sensitivity | |
# for msg_idx, msg_content in enumerate(input_messages): | |
# logger.trace(f" Input Message {msg_idx}: Role: {msg_content.type}, Content: {msg_content.content[:200]}") | |
try: | |
model_output = await self.get_next_action(input_messages) | |
# AGENT_HEALTH_LOG - After LLM call | |
if model_output: | |
logger.debug(f"CustomAgent - Step {current_custom_step_info.step_number}: LLM response received: {model_output.model_dump_json(indent=2)}") | |
else: | |
logger.warning(f"CustomAgent - Step {current_custom_step_info.step_number}: LLM call did not return a valid model_output.") | |
self._log_response(model_output) | |
# self.state.n_steps is incremented here, AFTER CustomAgentStepInfo was created with the *current* step number | |
# This is fine, as the prompt needs the current step, and n_steps tracks completed/next step. | |
if self.register_new_step_callback: | |
await self.register_new_step_callback(state, model_output, self.state.n_steps +1) # n_steps will be for the *next* step | |
if self.settings.save_conversation_path: | |
target = self.settings.save_conversation_path + f'_{self.state.n_steps +1}.txt' | |
save_conversation(input_messages, model_output, target, | |
self.settings.save_conversation_path_encoding) | |
if self.model_name != "deepseek-reasoner": | |
self.message_manager._remove_state_message_by_index(-1) # type: ignore[attr-defined] | |
await self._raise_if_stopped_or_paused() | |
except Exception as e: | |
self.message_manager._remove_state_message_by_index(-1) # type: ignore[attr-defined] | |
raise e | |
# AGENT_HEALTH_LOG - Before action execution | |
if model_output and model_output.action: | |
logger.info(f"CustomAgent - Step {current_custom_step_info.step_number}: Attempting actions: {model_output.action}") | |
else: | |
logger.info(f"CustomAgent - Step {current_custom_step_info.step_number}: No actions to perform based on LLM output.") | |
# AGENT_HEALTH_LOG - Wrap action execution | |
try: | |
result = await self.multi_act(model_output.action) # type: ignore | |
logger.info(f"CustomAgent - Step {current_custom_step_info.step_number}: Actions executed. Result: {result}") | |
except Exception as e_action: | |
logger.error(f"CustomAgent - Step {current_custom_step_info.step_number}: Error during action execution (multi_act): {e_action}", exc_info=True) | |
# Decide how to set result or if error handling in _handle_step_error is sufficient | |
# For now, this log will capture it, and the main exception handler will take over. | |
raise # Re-raise to be caught by the outer try-except | |
# Update step_info's memory (which is current_custom_step_info) with model output | |
self.update_step_info(model_output, current_custom_step_info) # type: ignore | |
# Persist the updated memory for the next step | |
self.current_task_memory = current_custom_step_info.memory | |
# Increment n_steps after all actions for the current step are done and memory is updated. | |
self.state.n_steps += 1 | |
for ret_ in result: | |
if ret_.extracted_content and "Extracted page" in ret_.extracted_content: | |
if ret_.extracted_content[:100] not in self.state.extracted_content: | |
self.state.extracted_content += ret_.extracted_content | |
self.state.last_result = result | |
self.state.last_action = model_output.action | |
if len(result) > 0 and result[-1].is_done: | |
if not self.state.extracted_content: | |
# If step_info's memory was used for CustomAgentStepInfo it might be outdated here. | |
# Use current_task_memory which should be the most up-to-date. | |
self.state.extracted_content = self.current_task_memory | |
result[-1].extracted_content = self.state.extracted_content | |
logger.info(f"📄 Result: {result[-1].extracted_content}") | |
self.state.consecutive_failures = 0 | |
except InterruptedError: | |
logger.debug('Agent paused') | |
self.state.last_result = [ | |
ActionResult( | |
error='The agent was paused - now continuing actions might need to be repeated', | |
include_in_memory=True | |
) | |
] | |
# AGENT_HEALTH_LOG - End of step (paused) | |
logger.info(f"CustomAgent - Step {current_custom_step_info.step_number}: Paused.") | |
return | |
except Exception as e: | |
result = await self._handle_step_error(e) | |
self.state.last_result = result | |
# AGENT_HEALTH_LOG - End of step (exception) | |
logger.error(f"CustomAgent - Step {current_custom_step_info.step_number}: Ended with exception: {e}", exc_info=True) | |
finally: | |
logger.debug("Entering CustomAgent.step finally block.") # DEBUG | |
step_end_time = time.time() | |
actions_telemetry = [a.model_dump(exclude_unset=True) for a in model_output.action] if model_output and hasattr(model_output, 'action') and model_output.action else [] | |
logger.debug("Attempting to capture telemetry.") # DEBUG | |
self.telemetry.capture( | |
AgentStepTelemetryEvent( | |
agent_id=self.state.agent_id, | |
step=self.state.n_steps, # Note: n_steps was already incremented | |
actions=actions_telemetry, | |
consecutive_failures=self.state.consecutive_failures, | |
step_error=[r.error for r in result if r.error] if result else ['No result after step execution'], # Modified for clarity | |
) | |
) | |
logger.debug("Telemetry captured.") # DEBUG | |
if not result: | |
logger.debug("No result from multi_act, returning from step.") # DEBUG | |
return | |
if state and model_output: | |
logger.debug(f"Calling _make_history_item with model_output: {type(model_output)}, state: {type(state)}, result: {type(result)}") # DEBUG | |
metadata = StepMetadata( | |
step_number=self.state.n_steps, # n_steps was already incremented | |
step_start_time=step_start_time, | |
step_end_time=step_end_time, | |
input_tokens=tokens, | |
) | |
self._make_history_item(model_output, state, result, metadata) | |
logger.debug("_make_history_item finished.") # DEBUG | |
else: | |
logger.debug("Skipping _make_history_item due to no state or model_output.") # DEBUG | |
# Log final state before returning from step | |
logger.debug(f"CustomAgent.step state before return: n_steps={self.state.n_steps}, stopped={self.state.stopped}, paused={self.state.paused}, consecutive_failures={self.state.consecutive_failures}, last_result_count={len(self.state.last_result) if self.state.last_result else 0}") | |
if self.state.last_result: | |
for i, res_item in enumerate(self.state.last_result): | |
logger.debug(f" last_result[{i}]: error='{res_item.error}', is_done={res_item.is_done}") | |
logger.debug("Exiting CustomAgent.step finally block.") # DEBUG | |
# AGENT_HEALTH_LOG - End of step (finally block) | |
logger.info(f"CustomAgent - Step {current_custom_step_info.step_number}: Finished step processing (finally block).") | |
# New: modified to accept ReplayTaskDetails at replay mode | |
async def run(self, task_input: Union[str, ReplayTaskDetails], max_steps: int = 100) -> Optional[AgentHistoryList]: | |
""" | |
Run the agent to complete the task. | |
If task_input is ReplayTaskDetails, it runs in replay mode. | |
Otherwise, it runs in autonomous mode. | |
""" | |
self.state.start_time = time.time() | |
self.state.task_input = task_input | |
self.state.max_steps = max_steps | |
if isinstance(task_input, ReplayTaskDetails) and task_input.mode == "replay": | |
logger.info(f"🚀 Starting agent in REPLAY mode for trace: {task_input.trace_path}") | |
if not self.browser_context: | |
logger.error("Replay mode: Browser context is not available.") | |
return None | |
# Ensure there is a page to replay on | |
if not self.page or self.page.is_closed(): | |
logger.info("Replay mode: self.page is not valid. Attempting to get/create a page.") | |
playwright_context = getattr(self.browser_context, "playwright_context", None) | |
if playwright_context and playwright_context.pages: | |
self.page = playwright_context.pages[0] | |
await self.page.bring_to_front() | |
logger.info(f"Replay mode: Using existing page: {self.page.url}") | |
elif playwright_context: | |
self.page = await playwright_context.new_page() | |
logger.info(f"Replay mode: Created new page: {self.page.url}") | |
else: | |
logger.error("Replay mode: playwright_context is None, cannot create or get a page.") | |
return None | |
try: | |
trace_events = load_trace(task_input.trace_path) | |
if not trace_events: | |
logger.warning(f"Replay mode: No events found in trace file: {task_input.trace_path}") | |
return None | |
replayer = TraceReplayer(self.page, trace_events) | |
logger.info(f"Replayer initialized. Starting playback at speed: {task_input.speed}x") | |
await replayer.play(speed=task_input.speed) | |
logger.info(f"🏁 Replay finished for trace: {task_input.trace_path}") | |
except Drift as d: | |
drift_message = getattr(d, "message", str(d)) | |
logger.error(f"💣 DRIFT DETECTED during replay of {task_input.trace_path}: {drift_message}") | |
if d.event: | |
logger.error(f" Drift occurred at event: {json.dumps(d.event)}") | |
except FileNotFoundError: | |
logger.error(f"Replay mode: Trace file not found at {task_input.trace_path}") | |
except Exception as e: | |
logger.exception(f"Replay mode: An unexpected error occurred during replay of {task_input.trace_path}") | |
finally: | |
# Decide if browser/context should be closed after replay based on agent settings (e.g., keep_browser_open) | |
# For now, let's assume it follows the general agent cleanup logic if applicable, or stays open. | |
pass | |
return None # Replay mode doesn't return standard agent history | |
# Autonomous mode logic continues below | |
elif isinstance(task_input, str): | |
if task_input != self.task: | |
logger.info(f"Autonomous run: Task updated from '{self.task}' to '{task_input}'") | |
self.task = task_input | |
# self._message_manager.task = self.task # add_new_task will set this | |
# Clear existing messages from the history | |
if hasattr(self._message_manager.state, 'history') and hasattr(self._message_manager.state.history, 'messages') and isinstance(self._message_manager.state.history.messages, list): | |
logger.debug("Clearing message history list as task has changed.") | |
self._message_manager.state.history.messages.clear() | |
# Also reset token count if possible/necessary, assuming it's managed alongside messages | |
if hasattr(self._message_manager.state.history, 'current_tokens'): | |
self._message_manager.state.history.current_tokens = 0 | |
else: | |
logger.warning("Could not clear message history messages list for new task.") | |
# Inform the message manager about the new task | |
if hasattr(self._message_manager, "add_new_task") and callable(self._message_manager.add_new_task): | |
logger.debug(f"Calling message_manager.add_new_task() with new task: {self.task[:70]}...") | |
self._message_manager.add_new_task(self.task) | |
# add_infos is not directly used by add_new_task, but could be part of the task string construction if needed earlier. | |
# For now, we assume self.task (already updated from task_input) contains all necessary info. | |
else: | |
logger.warning(f"CustomMessageManager does not have a callable 'add_new_task' method. New task may not be properly set in message manager.") | |
logger.info(f"Starting autonomous agent run for task: '{self.task}', max_steps: {max_steps}") | |
logger.debug(f"CustomAgent: About to call super().run('{self.task}', {max_steps}, {self.controller})") | |
history: Optional[AgentHistoryList] = await super().run(self.task, max_steps=max_steps, controller=self.controller) | |
logger.debug(f"CustomAgent: super().run() returned. History is None: {history is None}") | |
if history and hasattr(history, 'history'): | |
logger.debug(f"CustomAgent: History length: {len(history.history) if history.history else 0}") # DEBUG | |
# AGENT_HEALTH_LOG - After super().run() | |
logger.info(f"CustomAgent - Autonomous run finished. Result from super().run(): {'History object received' if history else 'No history object (None)'}") | |
# After autonomous run, Recorder history persistence is handled by the UI's explicit stop recording. | |
# The agent itself, when run with a string task, should not be responsible for this. | |
# Removing the block that attempted to save Recorder traces here. | |
return history | |
def _convert_input_messages(self, messages: List[BaseMessage]) -> List[Dict[str, Any]]: | |
converted_messages = [] | |
for msg in messages: | |
msg_item = {} | |
if isinstance(msg, HumanMessage): | |
msg_item["role"] = "user" | |
msg_item["content"] = msg.content | |
elif isinstance(msg, AIMessage): | |
msg_item["role"] = "assistant" | |
# Handle tool calls if present | |
if msg.tool_calls: | |
msg_item["content"] = None # Standard AIMessage content is None if tool_calls are present | |
msg_item["tool_calls"] = msg.tool_calls | |
else: | |
msg_item["content"] = msg.content | |
elif hasattr(msg, 'role') and hasattr(msg, 'content'): # For generic BaseMessage with role and content | |
msg_item["role"] = getattr(msg, "role", "unknown") | |
msg_item["content"] = getattr(msg, "content", "") | |
else: | |
# Fallback or skip if message type is not directly convertible | |
logger.warning(f"Skipping message of unhandled type: {type(msg)}") | |
continue | |
# Add reasoning_content for tool_code type messages if available | |
if msg_item.get("type") == "tool_code" and isinstance(msg, AIMessage) and hasattr(msg, 'reasoning_content'): | |
reasoning_content = getattr(msg, "reasoning_content", None) | |
if reasoning_content: | |
msg_item["reasoning_content"] = reasoning_content | |
converted_messages.append(msg_item) | |
return converted_messages | |
def _parse_model_output(self, output: str, ActionModel: Type[BaseModel]) -> CustomAgentOutput: | |
try: | |
if not hasattr(self, 'AgentOutput') or not self.AgentOutput: | |
self._setup_action_models() # Sets self.AgentOutput | |
extracted_output: Union[str, Dict[Any, Any]] = extract_json_from_model_output(output) | |
parsed_data: CustomAgentOutput | |
if isinstance(extracted_output, dict): | |
# If it's already a dict, assume it's valid JSON and Pydantic can handle it | |
parsed_data = self.AgentOutput.model_validate(extracted_output) | |
elif isinstance(extracted_output, str): | |
# If it's a string, try to repair it then parse | |
repaired_json_string = repair_json(extracted_output, return_objects=False) | |
if not isinstance(repaired_json_string, str): | |
logger.error(f"repair_json with return_objects=False did not return a string. Got: {type(repaired_json_string)}. Falling back to original extracted string.") | |
# Fallback or raise error. Forcing to string for now. | |
repaired_json_string = str(extracted_output) # Fallback to the original extracted string if repair fails badly | |
parsed_data = self.AgentOutput.model_validate_json(repaired_json_string) | |
else: | |
raise ValueError(f"Unexpected output type from extract_json_from_model_output: {type(extracted_output)}") | |
# Ensure the final parsed_data is indeed CustomAgentOutput | |
if not isinstance(parsed_data, CustomAgentOutput): | |
logger.warning(f"Parsed data is type {type(parsed_data)}, not CustomAgentOutput. Attempting conversion or default.") | |
# This might happen if self.AgentOutput.model_validate/model_validate_json doesn't return the precise | |
# CustomAgentOutput type but a compatible one (e.g. base AgentOutput). | |
# We need to ensure it has the CustomAgentBrain structure. | |
action_list = getattr(parsed_data, 'action', []) | |
current_state_data = getattr(parsed_data, 'current_state', None) | |
if isinstance(current_state_data, CustomAgentBrain): | |
parsed_data = self.AgentOutput(action=action_list, current_state=current_state_data) | |
elif isinstance(current_state_data, dict): | |
try: | |
brain = CustomAgentBrain(**current_state_data) | |
parsed_data = self.AgentOutput(action=action_list, current_state=brain) | |
except Exception as brain_ex: | |
logger.error(f"Could not construct CustomAgentBrain from dict: {brain_ex}. Falling back to error brain.") | |
error_brain = CustomAgentBrain( | |
evaluation_previous_goal="Error", | |
important_contents="Failed to reconstruct agent brain during parsing.", | |
thought="Critical error in parsing agent state.", | |
next_goal="Retry or report error." | |
) | |
parsed_data = self.AgentOutput(action=action_list, current_state=error_brain) | |
else: | |
logger.error("current_state is missing or not CustomAgentBrain/dict. Falling back to error brain.") | |
error_brain = CustomAgentBrain( | |
evaluation_previous_goal="Error", | |
important_contents="Missing or invalid agent brain during parsing.", | |
thought="Critical error in parsing agent state.", | |
next_goal="Retry or report error." | |
) | |
# Ensure action_list is compatible if it came from a different model type | |
# For simplicity, if we have to create an error brain, we might also want to clear actions | |
# or ensure they are valid ActionModel instances. For now, passing them as is. | |
parsed_data = self.AgentOutput(action=action_list, current_state=error_brain) | |
return parsed_data | |
except Exception as e: | |
logger.error(f"Error parsing model output: {e}\\nRaw output:\\n{output}", exc_info=True) | |
if not hasattr(self, 'AgentOutput') or not self.AgentOutput: | |
self._setup_action_models() # Ensure self.AgentOutput is set up for fallback | |
error_brain = CustomAgentBrain( | |
evaluation_previous_goal="Error", | |
important_contents=f"Parsing error: {str(e)}", | |
thought=f"Failed to parse LLM output. Error: {str(e)}", | |
next_goal="Retry or report error." | |
) | |
return self.AgentOutput(action=[], current_state=error_brain) | |
# pass # Original empty implementation |