Spaces:
Sleeping
Sleeping
# coding: utf-8 | |
# Copyright (c) 2025 inclusionAI. | |
from typing import List, Dict | |
from aworld.config import RunConfig | |
from aworld.core.agent.swarm import GraphBuildType | |
from aworld.core.common import Config | |
from aworld.core.task import Task, TaskResponse, Runner | |
from aworld.logs.util import logger | |
from aworld.utils.common import new_instance, snake_to_camel | |
async def choose_runners(tasks: List[Task]) -> List[Runner]: | |
"""Choose the correct runner to run the task. | |
Args: | |
task: A task that contains agents, tools and datas. | |
Returns: | |
Runner instance or exception. | |
""" | |
runners = [] | |
for task in tasks: | |
# user custom runner class | |
runner_cls = task.runner_cls | |
if runner_cls: | |
return new_instance(runner_cls, task) | |
else: | |
# user runner class in the framework | |
if task.swarm: | |
task.swarm.event_driven = task.event_driven | |
execute_type = task.swarm.build_type | |
else: | |
execute_type = GraphBuildType.WORKFLOW.value | |
if task.event_driven: | |
runner = new_instance("aworld.runners.event_runner.TaskEventRunner", task) | |
else: | |
runner = new_instance( | |
f"aworld.runners.call_driven_runner.{snake_to_camel(execute_type)}Runner", | |
task | |
) | |
runners.append(runner) | |
return runners | |
async def execute_runner(runners: List[Runner], run_conf: RunConfig) -> Dict[str, TaskResponse]: | |
"""Execute runner in the runtime engine. | |
Args: | |
runners: The task processing flow. | |
run_conf: Runtime config, can choose the special computing engine to execute the runner. | |
""" | |
if not run_conf: | |
run_conf = RunConfig() | |
name = run_conf.name | |
if run_conf.cls: | |
runtime_backend = new_instance(run_conf.cls, run_conf) | |
else: | |
runtime_backend = new_instance( | |
f"aworld.core.runtime_engine.{snake_to_camel(name)}Runtime", run_conf) | |
runtime_engine = runtime_backend.build_engine() | |
return await runtime_engine.execute([runner.run for runner in runners]) | |
def endless_detect(records: List[str], endless_threshold: int, root_agent_name: str): | |
"""A very simple implementation of endless loop detection. | |
Args: | |
records: Call sequence of agent. | |
endless_threshold: Threshold for the number of repetitions. | |
root_agent_name: Name of the entrance agent. | |
""" | |
if not records: | |
return False | |
threshold = endless_threshold | |
last_agent_name = root_agent_name | |
count = 1 | |
for i in range(len(records) - 2, -1, -1): | |
if last_agent_name == records[i]: | |
count += 1 | |
else: | |
last_agent_name = records[i] | |
count = 1 | |
if count >= threshold: | |
logger.warning("detect loop, will exit the loop.") | |
return True | |
if len(records) > 6: | |
last_agent_name = None | |
# latest | |
for j in range(1, 3): | |
for i in range(len(records) - j, 0, -2): | |
if last_agent_name and last_agent_name == (records[i], records[i - 1]): | |
count += 1 | |
elif last_agent_name is None: | |
last_agent_name = (records[i], records[i - 1]) | |
count = 1 | |
else: | |
last_agent_name = None | |
break | |
if count >= threshold: | |
logger.warning(f"detect loop: {last_agent_name}, will exit the loop.") | |
return True | |
return False | |