# coding: utf-8 # Copyright (c) 2025 inclusionAI. from typing import List, Dict from aworld.config import RunConfig from aworld.core.agent.swarm import GraphBuildType from aworld.core.common import Config from aworld.core.task import Task, TaskResponse, Runner from aworld.logs.util import logger from aworld.utils.common import new_instance, snake_to_camel async def choose_runners(tasks: List[Task]) -> List[Runner]: """Choose the correct runner to run the task. Args: task: A task that contains agents, tools and datas. Returns: Runner instance or exception. """ runners = [] for task in tasks: # user custom runner class runner_cls = task.runner_cls if runner_cls: return new_instance(runner_cls, task) else: # user runner class in the framework if task.swarm: task.swarm.event_driven = task.event_driven execute_type = task.swarm.build_type else: execute_type = GraphBuildType.WORKFLOW.value if task.event_driven: runner = new_instance("aworld.runners.event_runner.TaskEventRunner", task) else: runner = new_instance( f"aworld.runners.call_driven_runner.{snake_to_camel(execute_type)}Runner", task ) runners.append(runner) return runners async def execute_runner(runners: List[Runner], run_conf: RunConfig) -> Dict[str, TaskResponse]: """Execute runner in the runtime engine. Args: runners: The task processing flow. run_conf: Runtime config, can choose the special computing engine to execute the runner. """ if not run_conf: run_conf = RunConfig() name = run_conf.name if run_conf.cls: runtime_backend = new_instance(run_conf.cls, run_conf) else: runtime_backend = new_instance( f"aworld.core.runtime_engine.{snake_to_camel(name)}Runtime", run_conf) runtime_engine = runtime_backend.build_engine() return await runtime_engine.execute([runner.run for runner in runners]) def endless_detect(records: List[str], endless_threshold: int, root_agent_name: str): """A very simple implementation of endless loop detection. Args: records: Call sequence of agent. endless_threshold: Threshold for the number of repetitions. root_agent_name: Name of the entrance agent. """ if not records: return False threshold = endless_threshold last_agent_name = root_agent_name count = 1 for i in range(len(records) - 2, -1, -1): if last_agent_name == records[i]: count += 1 else: last_agent_name = records[i] count = 1 if count >= threshold: logger.warning("detect loop, will exit the loop.") return True if len(records) > 6: last_agent_name = None # latest for j in range(1, 3): for i in range(len(records) - j, 0, -2): if last_agent_name and last_agent_name == (records[i], records[i - 1]): count += 1 elif last_agent_name is None: last_agent_name = (records[i], records[i - 1]) count = 1 else: last_agent_name = None break if count >= threshold: logger.warning(f"detect loop: {last_agent_name}, will exit the loop.") return True return False