Duibonduil's picture
Upload 4 files
47aff61 verified
# coding: utf-8
# Copyright (c) 2025 inclusionAI.
import asyncio
from typing import List, Dict, Union
from aworld.config import RunConfig
from aworld.config.conf import TaskConfig
from aworld.agents.llm_agent import Agent
from aworld.core.agent.swarm import Swarm
from aworld.core.common import Config
from aworld.core.task import Task, TaskResponse, Runner
from aworld.output import StreamingOutputs
from aworld.runners.utils import choose_runners, execute_runner
from aworld.utils.common import sync_exec
class Runners:
"""Unified entrance to the utility class of the runnable task of execution."""
@staticmethod
def streamed_run_task(task: Task) -> StreamingOutputs:
"""Run the task in stream output."""
if not task.conf:
task.conf = TaskConfig()
streamed_result = StreamingOutputs(
input=task.input,
usage={},
is_complete=False
)
task.outputs = streamed_result
streamed_result._run_impl_task = asyncio.create_task(
Runners.run_task(task)
)
return streamed_result
@staticmethod
async def run_task(task: Union[Task, List[Task]], run_conf: RunConfig = None) -> Dict[str, TaskResponse]:
"""Run tasks for some complex scenarios where agents cannot be directly used.
Args:
task: User task define.
run_conf:
"""
if isinstance(task, Task):
task = [task]
runners: List[Runner] = await choose_runners(task)
return await execute_runner(runners, run_conf)
@staticmethod
def sync_run_task(task: Union[Task, List[Task]], run_conf: Config = None) -> Dict[str, TaskResponse]:
return sync_exec(Runners.run_task, task=task, run_conf=run_conf)
@staticmethod
def sync_run(
input: str,
agent: Agent = None,
swarm: Swarm = None,
tool_names: List[str] = [],
session_id: str = None
) -> TaskResponse:
return sync_exec(
Runners.run,
input=input,
agent=agent,
swarm=swarm,
tool_names=tool_names,
session_id=session_id
)
@staticmethod
async def run(
input: str,
agent: Agent = None,
swarm: Swarm = None,
tool_names: List[str] = [],
session_id: str = None
) -> TaskResponse:
"""Run agent directly with input and tool names.
Args:
input: User query.
agent: An agent with AI model configured, prompts, tools, mcp servers and other agents.
swarm: Multi-agent topo.
tool_names: Tool name list.
session_id: Session id.
Returns:
TaskResponse: Task response.
"""
if agent and swarm:
raise ValueError("`agent` and `swarm` only choose one.")
if not input:
raise ValueError('`input` is empty.')
if agent:
agent.task = input
swarm = Swarm(agent)
task = Task(input=input, swarm=swarm, tool_names=tool_names,
event_driven=swarm.event_driven, session_id=session_id)
res = await Runners.run_task(task)
return res.get(task.id)