Spaces:
Sleeping
Sleeping
# coding: utf-8 | |
# Copyright (c) 2025 inclusionAI. | |
import subprocess | |
import time | |
import re | |
import traceback | |
from time import sleep | |
from typing import Optional, Tuple, List | |
import base64 | |
import xml.etree.ElementTree as ET | |
import os | |
from aworld.logs.util import logger, color_log, Color | |
from aworld.utils import import_package | |
configs = {"MIN_DIST": 30} | |
class AndroidElement: | |
def __init__(self, uid, bbox, attrib): | |
self.uid = uid | |
self.bbox = bbox | |
self.attrib = attrib | |
import_package('cv2', install_name='opencv-python') | |
import_package('pyshine') | |
def get_id_from_element(elem): | |
bounds = elem.attrib["bounds"][1:-1].split("][") | |
x1, y1 = map(int, bounds[0].split(",")) | |
x2, y2 = map(int, bounds[1].split(",")) | |
elem_w, elem_h = x2 - x1, y2 - y1 | |
if "resource-id" in elem.attrib and elem.attrib["resource-id"]: | |
elem_id = elem.attrib["resource-id"].replace(":", ".").replace("/", "_") | |
else: | |
elem_id = f"{elem.attrib['class']}_{elem_w}_{elem_h}" | |
if "content-desc" in elem.attrib and elem.attrib["content-desc"] and len(elem.attrib["content-desc"]) < 20: | |
content_desc = elem.attrib['content-desc'].replace("/", "_").replace(" ", "").replace(":", "_") | |
elem_id += f"_{content_desc}" | |
return elem_id | |
def traverse_tree(xml_path, elem_list, attrib, add_index=False): | |
path = [] | |
for event, elem in ET.iterparse(xml_path, ['start', 'end']): | |
if event == 'start': | |
path.append(elem) | |
if attrib in elem.attrib and elem.attrib[attrib] == "true": | |
parent_prefix = "" | |
if len(path) > 1: | |
parent_elem = path[-2] | |
# Checks if the parent element has the required attributes | |
has_bounds = "bounds" in parent_elem.attrib | |
has_rid_or_class = "resource-id" in parent_elem.attrib or "class" in parent_elem.attrib | |
if has_bounds and has_rid_or_class: | |
parent_prefix = get_id_from_element(parent_elem) | |
bounds = elem.attrib["bounds"][1:-1].split("][") | |
x1, y1 = map(int, bounds[0].split(",")) | |
x2, y2 = map(int, bounds[1].split(",")) | |
center = (x1 + x2) // 2, (y1 + y2) // 2 | |
elem_id = get_id_from_element(elem) | |
if parent_prefix: | |
elem_id = parent_prefix + "_" + elem_id | |
if add_index: | |
elem_id += f"_{elem.attrib['index']}" | |
close = False | |
for e in elem_list: | |
bbox = e.bbox | |
center_ = (bbox[0][0] + bbox[1][0]) // 2, (bbox[0][1] + bbox[1][1]) // 2 | |
dist = (abs(center[0] - center_[0]) ** 2 + abs(center[1] - center_[1]) ** 2) ** 0.5 | |
if dist <= configs["MIN_DIST"]: | |
close = True | |
break | |
if not close: | |
elem_list.append(AndroidElement(elem_id, ((x1, y1), (x2, y2)), attrib)) | |
if event == 'end': | |
path.pop() | |
def create_directory_for_file(file_path): | |
# Extract the directory from the file path | |
directory = os.path.dirname(file_path) | |
# Check if the directory exists | |
if not os.path.exists(directory): | |
# Create the directory | |
os.makedirs(directory) | |
# Print the absolute path of the directory | |
absolute_directory_path = os.path.abspath(directory) | |
logger.info(f"Directory absolute path: {absolute_directory_path}") | |
def draw_bbox_multi(img_path, output_path, elem_list): | |
import cv2 | |
import pyshine as ps | |
imgcv = cv2.imread(img_path) | |
count = 1 | |
for elem in elem_list: | |
try: | |
top_left = elem.bbox[0] | |
bottom_right = elem.bbox[1] | |
left, top = top_left[0], top_left[1] | |
right, bottom = bottom_right[0], bottom_right[1] | |
# draw rectangle | |
cv2.rectangle(imgcv, | |
(left, top), | |
(right, bottom), | |
(0, 0, 221), | |
3) | |
label = str(count) | |
imgcv = ps.putBText(imgcv, label, text_offset_x=(left + right) // 2 + 10, | |
text_offset_y=(top + bottom) // 2 + 10, | |
vspace=10, hspace=10, font_scale=1, thickness=2, background_RGB=(221, 0, 0), | |
text_RGB=(255, 255, 255), alpha=0.0) | |
except Exception as e: | |
color_log(f"ERROR: An exception occurs while labeling the image\n{e}", Color.red) | |
logger.info(traceback.print_exc()) | |
count += 1 | |
cv2.imwrite(output_path, imgcv) | |
return imgcv | |
def draw_grid(img_path, output_path): | |
import cv2 | |
def get_unit_len(n): | |
for i in range(1, n + 1): | |
if n % i == 0 and 120 <= i <= 180: | |
return i | |
return -1 | |
image = cv2.imread(img_path) | |
height, width, _ = image.shape | |
color = (255, 116, 113) | |
unit_height = get_unit_len(height) | |
if unit_height < 0: | |
unit_height = 120 | |
unit_width = get_unit_len(width) | |
if unit_width < 0: | |
unit_width = 120 | |
thick = int(unit_width // 50) | |
rows = height // unit_height | |
cols = width // unit_width | |
for i in range(rows): | |
for j in range(cols): | |
label = i * cols + j + 1 | |
left = int(j * unit_width) | |
top = int(i * unit_height) | |
right = int((j + 1) * unit_width) | |
bottom = int((i + 1) * unit_height) | |
cv2.rectangle(image, (left, top), (right, bottom), color, thick // 2) | |
cv2.putText(image, str(label), (left + int(unit_width * 0.05) + 3, top + int(unit_height * 0.3) + 3), 0, | |
int(0.01 * unit_width), (0, 0, 0), thick) | |
cv2.putText(image, str(label), (left + int(unit_width * 0.05), top + int(unit_height * 0.3)), 0, | |
int(0.01 * unit_width), color, thick) | |
cv2.imwrite(output_path, image) | |
return rows, cols | |
def encode_image(image_path): | |
with open(image_path, "rb") as image_file: | |
return base64.b64encode(image_file.read()).decode('utf-8') | |
class ADBController: | |
def __init__(self, avd_name: str = None, | |
adb_path: str = os.path.expanduser('~') + "/Library/Android/sdk/platform-tools/adb", | |
emulator_path: str = os.path.expanduser('~') + "/Library/Android/sdk/emulator/emulator", | |
timeout: int = 30): | |
self.avd_name = avd_name | |
self.adb_path = adb_path | |
self.emulator_path = emulator_path | |
self.timeout = timeout | |
self.emulator_process = None | |
self.device_serial = "emulator-5554" # default | |
self.current_elem_list = [] | |
self.width, self.height = 0, 0 | |
def start_emulator(self, avd_name: str = None, headless: bool = False, | |
max_retry: int = 2) -> bool: | |
avd = avd_name or self.avd_name | |
if not avd: | |
raise ValueError("AVD name must be specified") | |
for attempt in range(max_retry + 1): | |
if self._start_emulator_process(avd, headless): | |
if self._wait_for_device(): | |
logger.info(f"start success,attempt count:{attempt + 1}") | |
self.width, self.height = self.get_screen_size() | |
return True | |
self.stop_emulator() | |
return False | |
def _start_emulator_process(self, avd: str, headless: bool) -> bool: | |
try: | |
cmd = [ | |
self.emulator_path, | |
f"@{avd}", | |
"-no-snapshot", | |
"-no-audio", | |
"-gpu", "swiftshader", | |
"-wipe-data" | |
] | |
if headless: | |
cmd.append("-no-window") | |
self.emulator_process = subprocess.Popen( | |
cmd, | |
stdout=subprocess.DEVNULL, | |
stderr=subprocess.STDOUT | |
) | |
return True | |
except Exception as e: | |
logger.warning(f"adb start fail: {str(e)}") | |
return False | |
def stop_emulator(self) -> bool: | |
try: | |
result = subprocess.run( | |
[self.adb_path, "-s", self.device_serial, "emu", "kill"], | |
timeout=self.timeout, | |
capture_output=True, | |
text=True | |
) | |
return "OK" in result.stdout | |
except subprocess.TimeoutExpired: | |
return False | |
finally: | |
if self.emulator_process: | |
self.emulator_process.terminate() | |
def execute_adb(self, command: list, device_serial: str = None) -> Tuple[bool, str]: | |
"""execute adb command""" | |
device = device_serial or self.device_serial | |
full_cmd = [self.adb_path, "-s", device] + command | |
try: | |
result = subprocess.run( | |
full_cmd, | |
timeout=self.timeout, | |
check=True, | |
capture_output=True, | |
text=True | |
) | |
return True, result.stdout.strip() | |
except subprocess.CalledProcessError as e: | |
return False, f"Command failed: {e.stderr}" | |
except Exception as e: | |
return False, str(e) | |
def execute_adb_with_stdout(self, command: List[str]) -> Tuple[bool, Optional[str]]: | |
try: | |
result = subprocess.run( | |
["adb", "-s", self.device_serial] + command, | |
stdout=subprocess.PIPE, | |
stderr=subprocess.PIPE, | |
text=True, | |
timeout=10 | |
) | |
if result.returncode == 0: | |
return True, result.stdout.strip() | |
else: | |
return False, None | |
except subprocess.TimeoutExpired: | |
return False, None | |
except Exception as e: | |
return False, None | |
# ---------- device operate ---------- | |
def screenshot(self, save_path: str) -> bool: | |
timestamp = int(time.time()) | |
remote_path = f"/sdcard/screenshot_{timestamp}.png" | |
success, _ = self.execute_adb(["shell", "screencap", "-p", remote_path]) | |
if not success: | |
return False | |
return self._pull_file(remote_path, save_path) | |
def dump_ui_xml(self, save_path: str) -> Optional[str]: | |
remote_path = "/sdcard/ui_dump.xml" | |
success, _ = self.execute_adb(["shell", "uiautomator", "dump", remote_path]) | |
if not success: | |
logger.info("dump ui xml fail") | |
return None | |
success = self._pull_file(remote_path, save_path) | |
if not success: | |
logger.info("pull ui xml fail") | |
return None | |
with open(save_path, 'r', encoding='utf-8') as f: | |
xml_content = f.read() | |
return xml_content | |
def tap(self, element: int): | |
x, y = self.__get_element_center(element) | |
self.__tap_coordinate(x, y) | |
def text(self, text: str): | |
""" | |
Input text, automatically replacing spaces with %s for proper ADB text input. | |
Parameters: | |
text: The text to input | |
""" | |
# Replace spaces with %s for proper handling in ADB | |
formatted_text = text.replace(" ", "%s") | |
success, _ = self.execute_adb(["shell", "input", "text", formatted_text]) | |
return success | |
def long_press(self, element: int): | |
x, y = self.__get_element_center(element) | |
self.__swipe_coordinate(x, y, x, y, 2000) | |
def swipe(self, element: int, direction: str, dist: str = "medium"): | |
""" | |
Perform swipe operations based on screen element labels | |
Parameters: | |
element_tag: digital label displayed on the interface (1-based) | |
direction: swipe direction ["up", "down", "left", "right"] | |
dist: swipe distance ["short", "medium", "long"] | |
""" | |
# 获取元素坐标 | |
x, y = self.__get_element_center(element) | |
unit_dist = int(self.width / 10) | |
if dist == "long": | |
unit_dist *= 3 | |
elif dist == "medium": | |
unit_dist *= 2 | |
if direction == "up": | |
offset = 0, -2 * unit_dist | |
elif direction == "down": | |
offset = 0, 2 * unit_dist | |
elif direction == "left": | |
offset = -1 * unit_dist, 0 | |
elif direction == "right": | |
offset = unit_dist, 0 | |
else: | |
return False | |
self.__swipe_coordinate(x, y, x + offset[0], y + offset[1]) | |
def screenshot_and_annotate(self, name_prefix=None, return_base64=True): | |
import cv2 | |
"""Collect screen information and mark interactive elements, and return data containing Base64 images""" | |
sleep(3) | |
if name_prefix is None: | |
name_prefix = str(time.time()) | |
tmp_files_dir = os.path.join(os.path.dirname(__file__), "tmp_files") | |
os.makedirs(tmp_files_dir, exist_ok=True) | |
screenshot_path = os.path.join(tmp_files_dir, f"{name_prefix}_origin.png") | |
screenshot_res = self.screenshot(screenshot_path) | |
xml_path = os.path.join(tmp_files_dir, f"{name_prefix}.xml") | |
xml_res = self.dump_ui_xml(xml_path) | |
if screenshot_res == "ERROR" or xml_res is None: | |
logger.warning(f"Failed to take screenshot or read XML") | |
return None, None | |
# Parsing interactive elements | |
clickable_list = [] | |
focusable_list = [] | |
traverse_tree(xml_path, clickable_list, "clickable", True) | |
traverse_tree(xml_path, focusable_list, "focusable", True) | |
# Merge a list of duplicate elements | |
elem_list = clickable_list.copy() | |
for elem in focusable_list: | |
bbox = elem.bbox | |
center = (bbox[0][0] + bbox[1][0]) // 2, (bbox[0][1] + bbox[1][1]) // 2 | |
if not any( | |
((center[0] - ((e.bbox[0][0] + e.bbox[1][0]) // 2)) ** 2 + | |
(center[1] - ((e.bbox[0][1] + e.bbox[1][1]) // 2)) ** 2) ** 0.5 <= configs["MIN_DIST"] | |
for e in clickable_list | |
): | |
elem_list.append(elem) | |
# Generate annotated images | |
labeled_path = os.path.join(tmp_files_dir, f"{name_prefix}_labeled.png") | |
labeled_img = draw_bbox_multi(screenshot_path, labeled_path, elem_list) | |
# Show Image Window | |
# cv2.imshow("image", labeled_img) | |
# cv2.waitKey(0) | |
# cv2.destroyAllWindows() | |
# Base64 encoding | |
base64_str = None | |
if return_base64: | |
# Convert color space BGR->RGB | |
rgb_image = cv2.cvtColor(labeled_img, cv2.COLOR_BGR2RGB) | |
# Compress to JPEG format (with adjustable quality parameters) | |
success, buffer = cv2.imencode(".jpg", rgb_image, [int(cv2.IMWRITE_JPEG_QUALITY), 85]) | |
if success: | |
base64_str = base64.b64encode(buffer).decode("utf-8") | |
self.current_elem_list = elem_list.copy() | |
logger.info(f"Current elem size{len(self.current_elem_list)}") | |
return xml_res, base64_str | |
def setup_connection(self) -> bool: | |
"""Intelligent initialization device connection""" | |
# Prioritize physical equipment testing | |
if self.__connect_physical_device(): | |
return True | |
# Try connecting to the simulator | |
if self.avd_name and self.start_emulator(): | |
return True | |
raise ConnectionError("No available device found, please connect your phone or configure the simulator") | |
# ---------- Helper Methods ---------- | |
def __connect_physical_device(self) -> bool: | |
"""Connect an authorized USB device""" | |
devices = self.__get_authorized_devices() | |
if not devices: | |
return False | |
self.device = devices[0] | |
logger.info(f"Connected physical device: {self.device}") | |
self.device_serial = self.device | |
self.width, self.height = self.get_screen_size() | |
return True | |
def __get_authorized_devices(self) -> list: | |
"""Get a list of authorized devices""" | |
success, output = self.execute_adb(["devices"]) | |
if not success: | |
return [] | |
return [ | |
line.split("\t")[0] | |
for line in output.splitlines() | |
if "\tdevice" in line and "emulator" not in line | |
] | |
def __tap_coordinate(self, x: int, y: int) -> bool: | |
"""Click screen coordinates""" | |
success, _ = self.execute_adb(["shell", "input", "tap", str(x), str(y)]) | |
return success | |
def __get_element_center(self, elem_idx: int) -> tuple: | |
"""Calculate the coordinates of the center of the element""" | |
tl, br = self.current_elem_list[int(elem_idx) - 1].bbox | |
return (tl[0] + br[0]) // 2, (tl[1] + br[1]) // 2 | |
def __swipe_coordinate(self, x1: int, y1: int, x2: int, y2: int, duration: int = 300) -> bool: | |
"""Slide Operation""" | |
success, _ = self.execute_adb([ | |
"shell", "input", "swipe", | |
str(x1), str(y1), str(x2), str(y2), | |
str(duration) | |
]) | |
return success | |
def _wait_for_device(self, timeout: int = 300) -> bool: | |
"""Three-level waiting detection strategy""" | |
start_time = time.time() | |
stages = { | |
"adb_connected": False, | |
"boot_completed": False, | |
"services_ready": False | |
} | |
while time.time() - start_time < timeout: | |
# Step 1: Detect adb connection | |
if not stages["adb_connected"]: | |
_, devices = self.execute_adb(["devices"]) | |
if self.device_serial in devices: | |
stages["adb_connected"] = True | |
# Step 2: Detection system boot completed | |
if stages["adb_connected"] and not stages["boot_completed"]: | |
_, output = self.execute_adb([ | |
"shell", "getprop", "sys.boot_completed" | |
]) | |
if output.strip() == "1": | |
stages["boot_completed"] = True | |
# Step 3: Detecting Graphics Service Readiness | |
if stages["boot_completed"] and not stages["services_ready"]: | |
_, output = self.execute_adb([ | |
"shell", "service check SurfaceFlinger" | |
]) | |
if "found" in output.lower(): | |
return True | |
return False | |
def _pull_file(self, remote: str, local: str) -> bool: | |
"""Pull device files to local""" | |
create_directory_for_file(local) | |
success, _ = self.execute_adb(["pull", remote, local]) | |
if success: | |
self.execute_adb(["shell", "rm", remote]) # 清理临时文件 | |
return success | |
def get_screen_size(self) -> Optional[Tuple[int, int]]: | |
"""Get screen resolution""" | |
success, output = self.execute_adb(["shell", "wm", "size"]) | |
if not success: | |
return None | |
match = re.search(r"(\d+)x(\d+)", output) | |
if match: | |
return int(match.group(1)), int(match.group(2)) | |
return None | |
if __name__ == "__main__": | |
# Examples | |
controller = ADBController(avd_name="Medium_Phone_API_35") | |
# controller.stop_emulator() | |
if controller.setup_connection(): | |
logger.info("Simulator started successfully") | |
width, height = controller.get_screen_size() | |
logger.info(f"Get the screen size{width},{height}") | |
# Take screenshots and annotate them | |
controller.screenshot_and_annotate() | |
controller.swipe(6, "up") | |
# controller.screenshot_and_annotate() | |
# controller.tap(6) | |
xml_txt, base64_txt = controller.screenshot_and_annotate() | |
logger.info(xml_txt) | |
# controller.stop_emulator() | |
logger.info("Close the simulator") | |