Duibonduil's picture
Upload 9 files
3e8c06e verified
import json
import logging
from builtins import anext
from datetime import datetime
from typing import Any, Dict, Generator, AsyncGenerator, Optional
from pydantic import Field, BaseModel, model_validator
from aworld.models.model_response import ModelResponse, ToolCall
class OutputPart(BaseModel):
content: Any
metadata: Optional[Dict[str, Any]] = Field(default_factory=dict, description="metadata")
@model_validator(mode='after')
def setup_metadata(self):
# Ensure metadata is initialized
if self.metadata is None:
self.metadata = {}
return self
class Output(BaseModel):
metadata: Optional[Dict[str, Any]] = Field(default_factory=dict, description="metadata")
parts: Any = Field(default_factory=list, exclude=True, description="parts of Output")
data: Any = Field(default=None, exclude=True, description="Output Data")
@model_validator(mode='after')
def setup_defaults(self):
# Ensure metadata and parts are initialized
if self.metadata is None:
self.metadata = {}
if self.parts is None:
self.parts = []
return self
def add_part(self, content: Any):
if self.parts is None:
self.parts = []
self.parts.append(OutputPart(content=content))
def output_type(self):
return "default"
class ToolCallOutput(Output):
@classmethod
def from_tool_call(cls, tool_call: ToolCall):
return cls(data = tool_call)
def output_type(self):
return "tool_call"
class ToolResultOutput(Output):
origin_tool_call: Optional[ToolCall] = Field(default=None, description="origin tool call", exclude=True)
image: str = Field(default=None)
images: list[str] = Field(default_factory=list)
tool_type: str = Field(default=None)
tool_name: str = Field(default=None)
def output_type(self):
return "tool_call_result"
pass
class RunFinishedSignal(Output):
def output_type(self):
return "finished_signal"
RUN_FINISHED_SIGNAL = RunFinishedSignal()
class MessageOutput(Output):
"""
MessageOutput structure of LLM output
if you want to get the only response, you must first call reasoning_generator or set parameter only_response to True , then call response_generator
if you model not reasoning, you do not need care about reasoning_generator and reasoning
1. source: async/sync generator of the message
2. reasoning_generator: async/sync reasoning generator of the message
3. response_generator: async/sync response generator of the message;
4. reasoning: reasoning of the message
5. response: response of the message
6. tool_calls
"""
source: Any = Field(default=None, exclude=True, description="Source of the message")
reason_generator: Any = Field(default=None, exclude=True, description="reasoning generator of the message")
response_generator: Any = Field(default=None, exclude=True, description="response generator of the message")
"""
result
"""
reasoning: str = Field(default=None, description="reasoning of the message")
response: Any = Field(default=None, description="response of the message")
tool_calls: list[ToolCallOutput] = Field(default_factory=list, description="tool_calls")
"""
other config
"""
reasoning_format_start: str = Field(default="<think>", description="reasoning format start of the message")
reasoning_format_end: str = Field(default="</think>", description="reasoning format end of the message")
json_parse: bool = Field(default=False, description="json parse of the message", exclude=True)
has_reasoning: bool = Field(default=False, description="has reasoning of the message")
finished: bool = Field(default=False, description="finished of the message")
@model_validator(mode='after')
def setup_generators(self):
"""
Setup generators for reasoning and response
"""
source = self.source
# if ModelResponse
if isinstance(self.source, ModelResponse):
source = self.source.content
if self.source.tool_calls:
[self.tool_calls.append(ToolCallOutput.from_tool_call(tool_call)) for tool_call in
self.source.tool_calls]
if source is not None and isinstance(source, AsyncGenerator):
# Create empty generators first, they will be initialized when actually used
self.reason_generator = self.__aget_reasoning_generator()
self.response_generator = self.__aget_response_generator()
elif source is not None and isinstance(source, Generator):
self.reason_generator, self.response_generator = self.__split_reasoning_and_response__()
elif source is not None and isinstance(source, str):
self.reasoning, self.response = self.__resolve_think__(source)
return self
async def get_finished_reasoning(self):
if self.reasoning:
return self.reasoning
else:
if self.has_reasoning and not self.reasoning:
async for reason in self.reason_generator:
pass
return self.reasoning
else:
return self.reasoning
async def get_finished_response(self):
if self.response:
return self.response
else:
if self.response_generator:
async for item in self.response_generator:
pass
return self.response
async def __aget_reasoning_generator(self) -> AsyncGenerator[str, None]:
"""
Get reasoning content as async generator
"""
if not self.has_reasoning:
yield ""
self.reasoning = ""
return
reasoning_buffer = ""
is_in_reasoning = False
if self.reasoning and len(self.reasoning) > 0:
yield self.reasoning
return
try:
while True:
chunk = await anext(self.source)
chunk_content = self.get_chunk_content(chunk)
if not chunk_content:
continue
if chunk_content.startswith(self.reasoning_format_start):
is_in_reasoning = True
reasoning_buffer = chunk_content
yield chunk_content
elif chunk_content.endswith(self.reasoning_format_end) and is_in_reasoning:
reasoning_buffer += chunk_content
yield chunk_content
self.reasoning = reasoning_buffer
break
elif is_in_reasoning:
reasoning_buffer += chunk_content
yield chunk_content
except StopAsyncIteration:
logging.info("StopAsyncIteration")
async def __aget_response_generator(self) -> AsyncGenerator[str, None]:
"""
Get response content as async generator
if has_reasoning is True, system will first call reasoning_generator if you not call it;
else it will return content contains reasoning and response
"""
response_buffer = ""
if self.response and len(self.response) > 0:
yield self.response
return
# if has_reasoning is True, system will first call reasoning_generator if you not call it;
if self.has_reasoning and not self.reasoning:
async for reason in self.reason_generator:
pass
try:
while True:
chunk = await anext(self.source)
chunk_content = self.get_chunk_content(chunk)
if not chunk_content:
continue
response_buffer += chunk_content
yield chunk_content
except StopAsyncIteration:
self.finished = True
self.response = self.__resolve_json__(response_buffer, self.json_parse)
def get_chunk_content(self, chunk):
if isinstance(chunk, ModelResponse):
return chunk.content
else:
return chunk
def __split_reasoning_and_response__(self) -> tuple[Generator[str, None, None], Generator[str, None, None]]: # type: ignore
"""
Split source into reasoning and response generators for sync source
Returns:
tuple: (reasoning_generator, response_generator)
"""
if not self.has_reasoning:
yield ""
self.reasoning = ""
return
if not isinstance(self.source, Generator):
raise ValueError("Source must be a Generator")
def reasoning_generator():
if self.reasoning and len(self.reasoning) > 0:
yield self.reasoning
return
reasoning_buffer = ""
is_in_reasoning = False
try:
while True:
chunk = next(self.source)
chunk_content = self.get_chunk_content(chunk)
if chunk_content.startswith(self.reasoning_format_start):
is_in_reasoning = True
reasoning_buffer = chunk_content
yield chunk_content
elif chunk_content.endswith(self.reasoning_format_end) and is_in_reasoning:
reasoning_buffer += chunk_content
self.reasoning = reasoning_buffer
yield chunk_content
break
elif is_in_reasoning:
yield chunk_content
reasoning_buffer += chunk_content
except StopIteration:
print("StopIteration")
self.reasoning = reasoning_buffer
def response_generator():
if self.response and len(self.response) > 0:
yield self.response
return
# if has_reasoning is True, system will first call reasoning_generator if you not call it;
if self.has_reasoning and not self.reasoning:
for reason in self.reason_generator:
pass
response_buffer = ""
try:
while True:
chunk = next(self.source)
chunk_content = self.get_chunk_content(chunk)
response_buffer += chunk_content
self.response = response_buffer
yield chunk_content
except StopIteration:
self.response = self.__resolve_json__(response_buffer,self.json_parse)
self.finished = True
return reasoning_generator(), response_generator()
def __resolve_think__(self, content):
import re
start_tag = self.reasoning_format_start.replace("<", "").replace(">", "")
end_tag = self.reasoning_format_end.replace("<", "").replace(">", "")
llm_think = ""
match = re.search(
rf"<{re.escape(start_tag)}(.*?)>(.|\n)*?<{re.escape(end_tag)}>",
content,
flags=re.DOTALL,
)
if match:
llm_think = match.group(0).replace("<think>", "").replace("</think>", "")
llm_result = re.sub(
rf"<{re.escape(start_tag)}(.*?)>(.|\n)*?<{re.escape(end_tag)}>",
"",
content,
flags=re.DOTALL,
)
llm_result = self.__resolve_json__(llm_result, self.json_parse)
return llm_think, llm_result
def __resolve_json__(self, content, json_parse = False):
if json_parse:
if content.__contains__("```json"):
content = content.replace("```json", "").replace("```", "")
return json.loads(content)
return content
def output_type(self):
return "message_output"
class StepOutput(Output):
name: str
step_num: int
alias_name: Optional[str] = Field(default=None, description="alias_name of the step")
status: Optional[str] = Field(default="START", description="step_status")
started_at: str = Field(default_factory=lambda: datetime.now().isoformat(), description="started at")
finished_at: str = Field(default_factory=lambda: datetime.now().isoformat(), description="finished at")
@classmethod
def build_start_output(cls, name, step_num, alias_name=None, data=None):
return cls(name=name, step_num=step_num, alias_name=alias_name, data=data)
@classmethod
def build_finished_output(cls, name, step_num, alias_name=None, data=None):
return cls(name=name, step_num=step_num, alias_name=alias_name, status='FINISHED', data=data)
@classmethod
def build_failed_output(cls, name, step_num, alias_name=None, data=None):
return cls(name=name, step_num=step_num, alias_name=alias_name, status='FAILED', data=data)
def output_type(self):
return "step_output"
@property
def show_name(self):
return self.alias_name if self.alias_name else self.name
class SearchItem(BaseModel):
title: str = Field(default="", description="search result title")
url: str = Field(default="", description="search result url")
snippet: str = Field(default="", description="search result snippet")
content: str = Field(default="", description="search content", exclude=True)
raw_content: Optional[str] = Field(default="", description="search raw content", exclude=True)
metadata: Optional[Dict[str, Any]] = Field(default_factory=dict, description="metadata")
class SearchOutput(ToolResultOutput):
query: str = Field(..., description="Search query string")
results: list[SearchItem] = Field(default_factory=list, description="List of search results")
@classmethod
def from_dict(cls, data: dict) -> "SearchOutput":
if not isinstance(data, dict):
data = {}
query = data.get("query")
if query is None:
raise ValueError("query is required")
results_data = data.get("results", [])
search_items = []
for result in results_data:
if isinstance(result, SearchItem):
search_items.append(result)
elif isinstance(result, dict):
search_items.append(SearchItem(**result))
else:
raise ValueError(f"Invalid result type: {type(result)}")
return cls(
query=query,
results=search_items
)
def output_type(self):
return "search_output"