# coding: utf-8 # Copyright (c) 2025 inclusionAI. import json import time import traceback from typing import Dict, Any, Optional, List, Union from langchain_core.messages import HumanMessage, BaseMessage, SystemMessage from examples.android.prompts import SYSTEM_PROMPT, LAST_STEP_PROMPT from examples.android.utils import ( AgentState, AgentHistory, AgentHistoryList, ActionResult, PolicyMetadata, AgentBrain, Trajectory ) from examples.browsers.common import AgentStepInfo from aworld.config.conf import AgentConfig, ConfigDict from aworld.core.agent.base import AgentResult from aworld.agents.llm_agent import Agent from aworld.core.common import Observation, ActionModel, ToolActionInfo from aworld.logs.util import logger from examples.tools.tool_action import AndroidAction class AndroidAgent(Agent): def __init__(self, conf: Union[Dict[str, Any], ConfigDict, AgentConfig], **kwargs): super(AndroidAgent, self).__init__(conf, **kwargs) provider = self.conf.llm_config.llm_provider if self.conf.llm_config.llm_provider else self.conf.llm_provider if self.conf.llm_config.llm_provider: self.conf.llm_config.llm_provider = "chat" + provider else: self.conf.llm_provider = "chat" + provider self.available_actions_desc = self._build_action_prompt() # Settings self.settings = self.conf def reset(self, options: Dict[str, Any]): super(AndroidAgent, self).__init__(options) # State self.state = AgentState() # History self.history = AgentHistoryList(history=[]) self.trajectory = Trajectory(history=[]) def _build_action_prompt(self) -> str: def _prompt(info: ToolActionInfo) -> str: s = f'{info.desc}:\n' s += '{' + str(info.name) + ': ' if info.input_params: s += str({k: {"title": k, "type": v} for k, v in info.input_params.items()}) s += '}' return s # Iterate over all android actions val = "\n".join([_prompt(v.value) for k, v in AndroidAction.__members__.items()]) return val def policy(self, observation: Observation, info: Dict[str, Any] = None, **kwargs) -> Union[List[ActionModel], None]: self._finished = False step_info = AgentStepInfo(number=self.state.n_steps, max_steps=self.conf.max_steps) last_step_msg = None if step_info and step_info.is_last_step(): # Add last step warning if needed last_step_msg = HumanMessage( content=LAST_STEP_PROMPT) logger.info('Last step finishing up') logger.info(f'[agent] 📍 Step {self.state.n_steps}') step_start_time = time.time() try: xml_content, base64_img = observation.dom_tree, observation.image if xml_content is None: logger.error("[agent] ⚠ Failed to get UI state, stopping task") self.stop() return None self.state.last_result = (xml_content, base64_img if base64_img else "") logger.info("[agent] 🤖 Analyzing current state with LLM...") a_step_msg = HumanMessage(content=[ { "type": "text", "text": f""" Task: {self.task} Current Step: {self.state.n_steps} Please analyze the current interface and decide the next action. Please directly return the response in JSON format without any other text or code block markers. """ }, { "type": "image_url", "image_url": f"data:image/jpeg;base64,{self.state.image}" } ]) messages = [SystemMessage(content=SYSTEM_PROMPT)] if last_step_msg: messages.append(last_step_msg) messages.append(a_step_msg) logger.info(f"[agent] VLM Input last message: {messages[-1]}") llm_result = None try: llm_result = self._do_policy(messages) if self.state.stopped or self.state.paused: logger.info('Android agent paused after getting state') return [ActionModel(tool_name='android', action_name="stop")] tool_action = llm_result.actions step_metadata = PolicyMetadata( start_time=step_start_time, end_time=time.time(), number=self.state.n_steps, input_tokens=1 ) history_item = AgentHistory( result=[ActionResult(success=True)], metadata=step_metadata, content=xml_content, base64_img=base64_img ) self.history.history.append(history_item) if self.settings.save_history and self.settings.history_path: self.history.save_to_file(self.settings.history_path) logger.info(f'📍 Step {self.state.n_steps} starts to execute') self.state.n_steps += 1 self.state.consecutive_failures = 0 return tool_action except Exception as e: logger.warning(traceback.format_exc()) raise RuntimeError("Android agent encountered exception while making the policy.", e) finally: if llm_result: self.trajectory.add_step(observation, info, llm_result) metadata = PolicyMetadata( number=self.state.n_steps, start_time=step_start_time, end_time=time.time(), input_tokens=1 ) self._make_history_item(llm_result, observation, metadata) else: logger.warning("no result to record!") except json.JSONDecodeError as e: logger.error("[agent] ❌ JSON parsing error") raise except Exception as e: logger.error(f"[agent] ❌ Action execution error: {str(e)}") raise def _do_policy(self, input_messages: list[BaseMessage]) -> AgentResult: response = self.llm.invoke(input_messages) content = response.content if content.startswith("```json"): content = content[7:] if content.startswith("```"): content = content[3:] if content.endswith("```"): content = content[:-3] content = content.strip() action_data = json.loads(content) brain_state = AgentBrain(**action_data["current_state"]) logger.info(f"[agent] ⚠ Eval: {brain_state.evaluation_previous_goal}") logger.info(f"[agent] 🧠 Memory: {brain_state.memory}") logger.info(f"[agent] đŸŽ¯ Next goal: {brain_state.next_goal}") actions = action_data.get('action') result = [] if not actions: actions = action_data.get("actions") # print actions logger.info(f"[agent] VLM Output actions: {actions}") for action in actions: action_type = action.get('type') if not action_type: logger.warning(f"Action missing type: {action}") continue params = {} if 'type' == action_type: action_type = 'input_text' if 'params' in action: params = action['params'] if 'index' in action: params['index'] = action['index'] if 'type' in action: params['type'] = action['type'] if 'text' in action: params['text'] = action['text'] action_model = ActionModel( tool_name='android', action_name=action_type, params=params ) result.append(action_model) return AgentResult(current_state=brain_state, actions=result) def _make_history_item(self, model_output: AgentResult | None, state: Observation, metadata: Optional[PolicyMetadata] = None) -> None: if isinstance(state, dict): state = Observation(**state) history_item = AgentHistory( model_output=model_output, result=state.action_result, metadata=metadata, content=state.dom_tree, base64_img=state.image ) self.state.history.history.append(history_item) def pause(self) -> None: """Pause the agent""" logger.info('🔄 Pausing Agent') self.state.paused = True def resume(self) -> None: """Resume the agent""" logger.info('â–ļī¸ Agent resuming') self.state.paused = False def stop(self) -> None: """Stop the agent""" logger.info('âšī¸ Agent stopping') self.state.stopped = True