# coding: utf-8 # Copyright (c) 2025 inclusionAI. import subprocess import time import re import traceback from time import sleep from typing import Optional, Tuple, List import base64 import xml.etree.ElementTree as ET import os from aworld.logs.util import logger, color_log, Color from aworld.utils import import_package configs = {"MIN_DIST": 30} class AndroidElement: def __init__(self, uid, bbox, attrib): self.uid = uid self.bbox = bbox self.attrib = attrib import_package('cv2', install_name='opencv-python') import_package('pyshine') def get_id_from_element(elem): bounds = elem.attrib["bounds"][1:-1].split("][") x1, y1 = map(int, bounds[0].split(",")) x2, y2 = map(int, bounds[1].split(",")) elem_w, elem_h = x2 - x1, y2 - y1 if "resource-id" in elem.attrib and elem.attrib["resource-id"]: elem_id = elem.attrib["resource-id"].replace(":", ".").replace("/", "_") else: elem_id = f"{elem.attrib['class']}_{elem_w}_{elem_h}" if "content-desc" in elem.attrib and elem.attrib["content-desc"] and len(elem.attrib["content-desc"]) < 20: content_desc = elem.attrib['content-desc'].replace("/", "_").replace(" ", "").replace(":", "_") elem_id += f"_{content_desc}" return elem_id def traverse_tree(xml_path, elem_list, attrib, add_index=False): path = [] for event, elem in ET.iterparse(xml_path, ['start', 'end']): if event == 'start': path.append(elem) if attrib in elem.attrib and elem.attrib[attrib] == "true": parent_prefix = "" if len(path) > 1: parent_elem = path[-2] # Checks if the parent element has the required attributes has_bounds = "bounds" in parent_elem.attrib has_rid_or_class = "resource-id" in parent_elem.attrib or "class" in parent_elem.attrib if has_bounds and has_rid_or_class: parent_prefix = get_id_from_element(parent_elem) bounds = elem.attrib["bounds"][1:-1].split("][") x1, y1 = map(int, bounds[0].split(",")) x2, y2 = map(int, bounds[1].split(",")) center = (x1 + x2) // 2, (y1 + y2) // 2 elem_id = get_id_from_element(elem) if parent_prefix: elem_id = parent_prefix + "_" + elem_id if add_index: elem_id += f"_{elem.attrib['index']}" close = False for e in elem_list: bbox = e.bbox center_ = (bbox[0][0] + bbox[1][0]) // 2, (bbox[0][1] + bbox[1][1]) // 2 dist = (abs(center[0] - center_[0]) ** 2 + abs(center[1] - center_[1]) ** 2) ** 0.5 if dist <= configs["MIN_DIST"]: close = True break if not close: elem_list.append(AndroidElement(elem_id, ((x1, y1), (x2, y2)), attrib)) if event == 'end': path.pop() def create_directory_for_file(file_path): # Extract the directory from the file path directory = os.path.dirname(file_path) # Check if the directory exists if not os.path.exists(directory): # Create the directory os.makedirs(directory) # Print the absolute path of the directory absolute_directory_path = os.path.abspath(directory) logger.info(f"Directory absolute path: {absolute_directory_path}") def draw_bbox_multi(img_path, output_path, elem_list): import cv2 import pyshine as ps imgcv = cv2.imread(img_path) count = 1 for elem in elem_list: try: top_left = elem.bbox[0] bottom_right = elem.bbox[1] left, top = top_left[0], top_left[1] right, bottom = bottom_right[0], bottom_right[1] # draw rectangle cv2.rectangle(imgcv, (left, top), (right, bottom), (0, 0, 221), 3) label = str(count) imgcv = ps.putBText(imgcv, label, text_offset_x=(left + right) // 2 + 10, text_offset_y=(top + bottom) // 2 + 10, vspace=10, hspace=10, font_scale=1, thickness=2, background_RGB=(221, 0, 0), text_RGB=(255, 255, 255), alpha=0.0) except Exception as e: color_log(f"ERROR: An exception occurs while labeling the image\n{e}", Color.red) logger.info(traceback.print_exc()) count += 1 cv2.imwrite(output_path, imgcv) return imgcv def draw_grid(img_path, output_path): import cv2 def get_unit_len(n): for i in range(1, n + 1): if n % i == 0 and 120 <= i <= 180: return i return -1 image = cv2.imread(img_path) height, width, _ = image.shape color = (255, 116, 113) unit_height = get_unit_len(height) if unit_height < 0: unit_height = 120 unit_width = get_unit_len(width) if unit_width < 0: unit_width = 120 thick = int(unit_width // 50) rows = height // unit_height cols = width // unit_width for i in range(rows): for j in range(cols): label = i * cols + j + 1 left = int(j * unit_width) top = int(i * unit_height) right = int((j + 1) * unit_width) bottom = int((i + 1) * unit_height) cv2.rectangle(image, (left, top), (right, bottom), color, thick // 2) cv2.putText(image, str(label), (left + int(unit_width * 0.05) + 3, top + int(unit_height * 0.3) + 3), 0, int(0.01 * unit_width), (0, 0, 0), thick) cv2.putText(image, str(label), (left + int(unit_width * 0.05), top + int(unit_height * 0.3)), 0, int(0.01 * unit_width), color, thick) cv2.imwrite(output_path, image) return rows, cols def encode_image(image_path): with open(image_path, "rb") as image_file: return base64.b64encode(image_file.read()).decode('utf-8') class ADBController: def __init__(self, avd_name: str = None, adb_path: str = os.path.expanduser('~') + "/Library/Android/sdk/platform-tools/adb", emulator_path: str = os.path.expanduser('~') + "/Library/Android/sdk/emulator/emulator", timeout: int = 30): self.avd_name = avd_name self.adb_path = adb_path self.emulator_path = emulator_path self.timeout = timeout self.emulator_process = None self.device_serial = "emulator-5554" # default self.current_elem_list = [] self.width, self.height = 0, 0 def start_emulator(self, avd_name: str = None, headless: bool = False, max_retry: int = 2) -> bool: avd = avd_name or self.avd_name if not avd: raise ValueError("AVD name must be specified") for attempt in range(max_retry + 1): if self._start_emulator_process(avd, headless): if self._wait_for_device(): logger.info(f"start success,attempt count:{attempt + 1}") self.width, self.height = self.get_screen_size() return True self.stop_emulator() return False def _start_emulator_process(self, avd: str, headless: bool) -> bool: try: cmd = [ self.emulator_path, f"@{avd}", "-no-snapshot", "-no-audio", "-gpu", "swiftshader", "-wipe-data" ] if headless: cmd.append("-no-window") self.emulator_process = subprocess.Popen( cmd, stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT ) return True except Exception as e: logger.warning(f"adb start fail: {str(e)}") return False def stop_emulator(self) -> bool: try: result = subprocess.run( [self.adb_path, "-s", self.device_serial, "emu", "kill"], timeout=self.timeout, capture_output=True, text=True ) return "OK" in result.stdout except subprocess.TimeoutExpired: return False finally: if self.emulator_process: self.emulator_process.terminate() def execute_adb(self, command: list, device_serial: str = None) -> Tuple[bool, str]: """execute adb command""" device = device_serial or self.device_serial full_cmd = [self.adb_path, "-s", device] + command try: result = subprocess.run( full_cmd, timeout=self.timeout, check=True, capture_output=True, text=True ) return True, result.stdout.strip() except subprocess.CalledProcessError as e: return False, f"Command failed: {e.stderr}" except Exception as e: return False, str(e) def execute_adb_with_stdout(self, command: List[str]) -> Tuple[bool, Optional[str]]: try: result = subprocess.run( ["adb", "-s", self.device_serial] + command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, timeout=10 ) if result.returncode == 0: return True, result.stdout.strip() else: return False, None except subprocess.TimeoutExpired: return False, None except Exception as e: return False, None # ---------- device operate ---------- def screenshot(self, save_path: str) -> bool: timestamp = int(time.time()) remote_path = f"/sdcard/screenshot_{timestamp}.png" success, _ = self.execute_adb(["shell", "screencap", "-p", remote_path]) if not success: return False return self._pull_file(remote_path, save_path) def dump_ui_xml(self, save_path: str) -> Optional[str]: remote_path = "/sdcard/ui_dump.xml" success, _ = self.execute_adb(["shell", "uiautomator", "dump", remote_path]) if not success: logger.info("dump ui xml fail") return None success = self._pull_file(remote_path, save_path) if not success: logger.info("pull ui xml fail") return None with open(save_path, 'r', encoding='utf-8') as f: xml_content = f.read() return xml_content def tap(self, element: int): x, y = self.__get_element_center(element) self.__tap_coordinate(x, y) def text(self, text: str): """ Input text, automatically replacing spaces with %s for proper ADB text input. Parameters: text: The text to input """ # Replace spaces with %s for proper handling in ADB formatted_text = text.replace(" ", "%s") success, _ = self.execute_adb(["shell", "input", "text", formatted_text]) return success def long_press(self, element: int): x, y = self.__get_element_center(element) self.__swipe_coordinate(x, y, x, y, 2000) def swipe(self, element: int, direction: str, dist: str = "medium"): """ Perform swipe operations based on screen element labels Parameters: element_tag: digital label displayed on the interface (1-based) direction: swipe direction ["up", "down", "left", "right"] dist: swipe distance ["short", "medium", "long"] """ # 获取元素坐标 x, y = self.__get_element_center(element) unit_dist = int(self.width / 10) if dist == "long": unit_dist *= 3 elif dist == "medium": unit_dist *= 2 if direction == "up": offset = 0, -2 * unit_dist elif direction == "down": offset = 0, 2 * unit_dist elif direction == "left": offset = -1 * unit_dist, 0 elif direction == "right": offset = unit_dist, 0 else: return False self.__swipe_coordinate(x, y, x + offset[0], y + offset[1]) def screenshot_and_annotate(self, name_prefix=None, return_base64=True): import cv2 """Collect screen information and mark interactive elements, and return data containing Base64 images""" sleep(3) if name_prefix is None: name_prefix = str(time.time()) tmp_files_dir = os.path.join(os.path.dirname(__file__), "tmp_files") os.makedirs(tmp_files_dir, exist_ok=True) screenshot_path = os.path.join(tmp_files_dir, f"{name_prefix}_origin.png") screenshot_res = self.screenshot(screenshot_path) xml_path = os.path.join(tmp_files_dir, f"{name_prefix}.xml") xml_res = self.dump_ui_xml(xml_path) if screenshot_res == "ERROR" or xml_res is None: logger.warning(f"Failed to take screenshot or read XML") return None, None # Parsing interactive elements clickable_list = [] focusable_list = [] traverse_tree(xml_path, clickable_list, "clickable", True) traverse_tree(xml_path, focusable_list, "focusable", True) # Merge a list of duplicate elements elem_list = clickable_list.copy() for elem in focusable_list: bbox = elem.bbox center = (bbox[0][0] + bbox[1][0]) // 2, (bbox[0][1] + bbox[1][1]) // 2 if not any( ((center[0] - ((e.bbox[0][0] + e.bbox[1][0]) // 2)) ** 2 + (center[1] - ((e.bbox[0][1] + e.bbox[1][1]) // 2)) ** 2) ** 0.5 <= configs["MIN_DIST"] for e in clickable_list ): elem_list.append(elem) # Generate annotated images labeled_path = os.path.join(tmp_files_dir, f"{name_prefix}_labeled.png") labeled_img = draw_bbox_multi(screenshot_path, labeled_path, elem_list) # Show Image Window # cv2.imshow("image", labeled_img) # cv2.waitKey(0) # cv2.destroyAllWindows() # Base64 encoding base64_str = None if return_base64: # Convert color space BGR->RGB rgb_image = cv2.cvtColor(labeled_img, cv2.COLOR_BGR2RGB) # Compress to JPEG format (with adjustable quality parameters) success, buffer = cv2.imencode(".jpg", rgb_image, [int(cv2.IMWRITE_JPEG_QUALITY), 85]) if success: base64_str = base64.b64encode(buffer).decode("utf-8") self.current_elem_list = elem_list.copy() logger.info(f"Current elem size{len(self.current_elem_list)}") return xml_res, base64_str def setup_connection(self) -> bool: """Intelligent initialization device connection""" # Prioritize physical equipment testing if self.__connect_physical_device(): return True # Try connecting to the simulator if self.avd_name and self.start_emulator(): return True raise ConnectionError("No available device found, please connect your phone or configure the simulator") # ---------- Helper Methods ---------- def __connect_physical_device(self) -> bool: """Connect an authorized USB device""" devices = self.__get_authorized_devices() if not devices: return False self.device = devices[0] logger.info(f"Connected physical device: {self.device}") self.device_serial = self.device self.width, self.height = self.get_screen_size() return True def __get_authorized_devices(self) -> list: """Get a list of authorized devices""" success, output = self.execute_adb(["devices"]) if not success: return [] return [ line.split("\t")[0] for line in output.splitlines() if "\tdevice" in line and "emulator" not in line ] def __tap_coordinate(self, x: int, y: int) -> bool: """Click screen coordinates""" success, _ = self.execute_adb(["shell", "input", "tap", str(x), str(y)]) return success def __get_element_center(self, elem_idx: int) -> tuple: """Calculate the coordinates of the center of the element""" tl, br = self.current_elem_list[int(elem_idx) - 1].bbox return (tl[0] + br[0]) // 2, (tl[1] + br[1]) // 2 def __swipe_coordinate(self, x1: int, y1: int, x2: int, y2: int, duration: int = 300) -> bool: """Slide Operation""" success, _ = self.execute_adb([ "shell", "input", "swipe", str(x1), str(y1), str(x2), str(y2), str(duration) ]) return success def _wait_for_device(self, timeout: int = 300) -> bool: """Three-level waiting detection strategy""" start_time = time.time() stages = { "adb_connected": False, "boot_completed": False, "services_ready": False } while time.time() - start_time < timeout: # Step 1: Detect adb connection if not stages["adb_connected"]: _, devices = self.execute_adb(["devices"]) if self.device_serial in devices: stages["adb_connected"] = True # Step 2: Detection system boot completed if stages["adb_connected"] and not stages["boot_completed"]: _, output = self.execute_adb([ "shell", "getprop", "sys.boot_completed" ]) if output.strip() == "1": stages["boot_completed"] = True # Step 3: Detecting Graphics Service Readiness if stages["boot_completed"] and not stages["services_ready"]: _, output = self.execute_adb([ "shell", "service check SurfaceFlinger" ]) if "found" in output.lower(): return True return False def _pull_file(self, remote: str, local: str) -> bool: """Pull device files to local""" create_directory_for_file(local) success, _ = self.execute_adb(["pull", remote, local]) if success: self.execute_adb(["shell", "rm", remote]) # 清理临时文件 return success def get_screen_size(self) -> Optional[Tuple[int, int]]: """Get screen resolution""" success, output = self.execute_adb(["shell", "wm", "size"]) if not success: return None match = re.search(r"(\d+)x(\d+)", output) if match: return int(match.group(1)), int(match.group(2)) return None if __name__ == "__main__": # Examples controller = ADBController(avd_name="Medium_Phone_API_35") # controller.stop_emulator() if controller.setup_connection(): logger.info("Simulator started successfully") width, height = controller.get_screen_size() logger.info(f"Get the screen size{width},{height}") # Take screenshots and annotate them controller.screenshot_and_annotate() controller.swipe(6, "up") # controller.screenshot_and_annotate() # controller.tap(6) xml_txt, base64_txt = controller.screenshot_and_annotate() logger.info(xml_txt) # controller.stop_emulator() logger.info("Close the simulator")