Spaces:
Sleeping
Sleeping
import json | |
import logging | |
from builtins import anext | |
from datetime import datetime | |
from typing import Any, Dict, Generator, AsyncGenerator, Optional | |
from pydantic import Field, BaseModel, model_validator | |
from aworld.models.model_response import ModelResponse, ToolCall | |
class OutputPart(BaseModel): | |
content: Any | |
metadata: Optional[Dict[str, Any]] = Field(default_factory=dict, description="metadata") | |
def setup_metadata(self): | |
# Ensure metadata is initialized | |
if self.metadata is None: | |
self.metadata = {} | |
return self | |
class Output(BaseModel): | |
metadata: Optional[Dict[str, Any]] = Field(default_factory=dict, description="metadata") | |
parts: Any = Field(default_factory=list, exclude=True, description="parts of Output") | |
data: Any = Field(default=None, exclude=True, description="Output Data") | |
def setup_defaults(self): | |
# Ensure metadata and parts are initialized | |
if self.metadata is None: | |
self.metadata = {} | |
if self.parts is None: | |
self.parts = [] | |
return self | |
def add_part(self, content: Any): | |
if self.parts is None: | |
self.parts = [] | |
self.parts.append(OutputPart(content=content)) | |
def output_type(self): | |
return "default" | |
class ToolCallOutput(Output): | |
def from_tool_call(cls, tool_call: ToolCall): | |
return cls(data = tool_call) | |
def output_type(self): | |
return "tool_call" | |
class ToolResultOutput(Output): | |
origin_tool_call: Optional[ToolCall] = Field(default=None, description="origin tool call", exclude=True) | |
image: str = Field(default=None) | |
images: list[str] = Field(default_factory=list) | |
tool_type: str = Field(default=None) | |
tool_name: str = Field(default=None) | |
def output_type(self): | |
return "tool_call_result" | |
pass | |
class RunFinishedSignal(Output): | |
def output_type(self): | |
return "finished_signal" | |
RUN_FINISHED_SIGNAL = RunFinishedSignal() | |
class MessageOutput(Output): | |
""" | |
MessageOutput structure of LLM output | |
if you want to get the only response, you must first call reasoning_generator or set parameter only_response to True , then call response_generator | |
if you model not reasoning, you do not need care about reasoning_generator and reasoning | |
1. source: async/sync generator of the message | |
2. reasoning_generator: async/sync reasoning generator of the message | |
3. response_generator: async/sync response generator of the message; | |
4. reasoning: reasoning of the message | |
5. response: response of the message | |
6. tool_calls | |
""" | |
source: Any = Field(default=None, exclude=True, description="Source of the message") | |
reason_generator: Any = Field(default=None, exclude=True, description="reasoning generator of the message") | |
response_generator: Any = Field(default=None, exclude=True, description="response generator of the message") | |
""" | |
result | |
""" | |
reasoning: str = Field(default=None, description="reasoning of the message") | |
response: Any = Field(default=None, description="response of the message") | |
tool_calls: list[ToolCallOutput] = Field(default_factory=list, description="tool_calls") | |
""" | |
other config | |
""" | |
reasoning_format_start: str = Field(default="<think>", description="reasoning format start of the message") | |
reasoning_format_end: str = Field(default="</think>", description="reasoning format end of the message") | |
json_parse: bool = Field(default=False, description="json parse of the message", exclude=True) | |
has_reasoning: bool = Field(default=False, description="has reasoning of the message") | |
finished: bool = Field(default=False, description="finished of the message") | |
def setup_generators(self): | |
""" | |
Setup generators for reasoning and response | |
""" | |
source = self.source | |
# if ModelResponse | |
if isinstance(self.source, ModelResponse): | |
source = self.source.content | |
if self.source.tool_calls: | |
[self.tool_calls.append(ToolCallOutput.from_tool_call(tool_call)) for tool_call in | |
self.source.tool_calls] | |
if source is not None and isinstance(source, AsyncGenerator): | |
# Create empty generators first, they will be initialized when actually used | |
self.reason_generator = self.__aget_reasoning_generator() | |
self.response_generator = self.__aget_response_generator() | |
elif source is not None and isinstance(source, Generator): | |
self.reason_generator, self.response_generator = self.__split_reasoning_and_response__() | |
elif source is not None and isinstance(source, str): | |
self.reasoning, self.response = self.__resolve_think__(source) | |
return self | |
async def get_finished_reasoning(self): | |
if self.reasoning: | |
return self.reasoning | |
else: | |
if self.has_reasoning and not self.reasoning: | |
async for reason in self.reason_generator: | |
pass | |
return self.reasoning | |
else: | |
return self.reasoning | |
async def get_finished_response(self): | |
if self.response: | |
return self.response | |
else: | |
if self.response_generator: | |
async for item in self.response_generator: | |
pass | |
return self.response | |
async def __aget_reasoning_generator(self) -> AsyncGenerator[str, None]: | |
""" | |
Get reasoning content as async generator | |
""" | |
if not self.has_reasoning: | |
yield "" | |
self.reasoning = "" | |
return | |
reasoning_buffer = "" | |
is_in_reasoning = False | |
if self.reasoning and len(self.reasoning) > 0: | |
yield self.reasoning | |
return | |
try: | |
while True: | |
chunk = await anext(self.source) | |
chunk_content = self.get_chunk_content(chunk) | |
if not chunk_content: | |
continue | |
if chunk_content.startswith(self.reasoning_format_start): | |
is_in_reasoning = True | |
reasoning_buffer = chunk_content | |
yield chunk_content | |
elif chunk_content.endswith(self.reasoning_format_end) and is_in_reasoning: | |
reasoning_buffer += chunk_content | |
yield chunk_content | |
self.reasoning = reasoning_buffer | |
break | |
elif is_in_reasoning: | |
reasoning_buffer += chunk_content | |
yield chunk_content | |
except StopAsyncIteration: | |
logging.info("StopAsyncIteration") | |
async def __aget_response_generator(self) -> AsyncGenerator[str, None]: | |
""" | |
Get response content as async generator | |
if has_reasoning is True, system will first call reasoning_generator if you not call it; | |
else it will return content contains reasoning and response | |
""" | |
response_buffer = "" | |
if self.response and len(self.response) > 0: | |
yield self.response | |
return | |
# if has_reasoning is True, system will first call reasoning_generator if you not call it; | |
if self.has_reasoning and not self.reasoning: | |
async for reason in self.reason_generator: | |
pass | |
try: | |
while True: | |
chunk = await anext(self.source) | |
chunk_content = self.get_chunk_content(chunk) | |
if not chunk_content: | |
continue | |
response_buffer += chunk_content | |
yield chunk_content | |
except StopAsyncIteration: | |
self.finished = True | |
self.response = self.__resolve_json__(response_buffer, self.json_parse) | |
def get_chunk_content(self, chunk): | |
if isinstance(chunk, ModelResponse): | |
return chunk.content | |
else: | |
return chunk | |
def __split_reasoning_and_response__(self) -> tuple[Generator[str, None, None], Generator[str, None, None]]: # type: ignore | |
""" | |
Split source into reasoning and response generators for sync source | |
Returns: | |
tuple: (reasoning_generator, response_generator) | |
""" | |
if not self.has_reasoning: | |
yield "" | |
self.reasoning = "" | |
return | |
if not isinstance(self.source, Generator): | |
raise ValueError("Source must be a Generator") | |
def reasoning_generator(): | |
if self.reasoning and len(self.reasoning) > 0: | |
yield self.reasoning | |
return | |
reasoning_buffer = "" | |
is_in_reasoning = False | |
try: | |
while True: | |
chunk = next(self.source) | |
chunk_content = self.get_chunk_content(chunk) | |
if chunk_content.startswith(self.reasoning_format_start): | |
is_in_reasoning = True | |
reasoning_buffer = chunk_content | |
yield chunk_content | |
elif chunk_content.endswith(self.reasoning_format_end) and is_in_reasoning: | |
reasoning_buffer += chunk_content | |
self.reasoning = reasoning_buffer | |
yield chunk_content | |
break | |
elif is_in_reasoning: | |
yield chunk_content | |
reasoning_buffer += chunk_content | |
except StopIteration: | |
print("StopIteration") | |
self.reasoning = reasoning_buffer | |
def response_generator(): | |
if self.response and len(self.response) > 0: | |
yield self.response | |
return | |
# if has_reasoning is True, system will first call reasoning_generator if you not call it; | |
if self.has_reasoning and not self.reasoning: | |
for reason in self.reason_generator: | |
pass | |
response_buffer = "" | |
try: | |
while True: | |
chunk = next(self.source) | |
chunk_content = self.get_chunk_content(chunk) | |
response_buffer += chunk_content | |
self.response = response_buffer | |
yield chunk_content | |
except StopIteration: | |
self.response = self.__resolve_json__(response_buffer,self.json_parse) | |
self.finished = True | |
return reasoning_generator(), response_generator() | |
def __resolve_think__(self, content): | |
import re | |
start_tag = self.reasoning_format_start.replace("<", "").replace(">", "") | |
end_tag = self.reasoning_format_end.replace("<", "").replace(">", "") | |
llm_think = "" | |
match = re.search( | |
rf"<{re.escape(start_tag)}(.*?)>(.|\n)*?<{re.escape(end_tag)}>", | |
content, | |
flags=re.DOTALL, | |
) | |
if match: | |
llm_think = match.group(0).replace("<think>", "").replace("</think>", "") | |
llm_result = re.sub( | |
rf"<{re.escape(start_tag)}(.*?)>(.|\n)*?<{re.escape(end_tag)}>", | |
"", | |
content, | |
flags=re.DOTALL, | |
) | |
llm_result = self.__resolve_json__(llm_result, self.json_parse) | |
return llm_think, llm_result | |
def __resolve_json__(self, content, json_parse = False): | |
if json_parse: | |
if content.__contains__("```json"): | |
content = content.replace("```json", "").replace("```", "") | |
return json.loads(content) | |
return content | |
def output_type(self): | |
return "message_output" | |
class StepOutput(Output): | |
name: str | |
step_num: int | |
alias_name: Optional[str] = Field(default=None, description="alias_name of the step") | |
status: Optional[str] = Field(default="START", description="step_status") | |
started_at: str = Field(default_factory=lambda: datetime.now().isoformat(), description="started at") | |
finished_at: str = Field(default_factory=lambda: datetime.now().isoformat(), description="finished at") | |
def build_start_output(cls, name, step_num, alias_name=None, data=None): | |
return cls(name=name, step_num=step_num, alias_name=alias_name, data=data) | |
def build_finished_output(cls, name, step_num, alias_name=None, data=None): | |
return cls(name=name, step_num=step_num, alias_name=alias_name, status='FINISHED', data=data) | |
def build_failed_output(cls, name, step_num, alias_name=None, data=None): | |
return cls(name=name, step_num=step_num, alias_name=alias_name, status='FAILED', data=data) | |
def output_type(self): | |
return "step_output" | |
def show_name(self): | |
return self.alias_name if self.alias_name else self.name | |
class SearchItem(BaseModel): | |
title: str = Field(default="", description="search result title") | |
url: str = Field(default="", description="search result url") | |
snippet: str = Field(default="", description="search result snippet") | |
content: str = Field(default="", description="search content", exclude=True) | |
raw_content: Optional[str] = Field(default="", description="search raw content", exclude=True) | |
metadata: Optional[Dict[str, Any]] = Field(default_factory=dict, description="metadata") | |
class SearchOutput(ToolResultOutput): | |
query: str = Field(..., description="Search query string") | |
results: list[SearchItem] = Field(default_factory=list, description="List of search results") | |
def from_dict(cls, data: dict) -> "SearchOutput": | |
if not isinstance(data, dict): | |
data = {} | |
query = data.get("query") | |
if query is None: | |
raise ValueError("query is required") | |
results_data = data.get("results", []) | |
search_items = [] | |
for result in results_data: | |
if isinstance(result, SearchItem): | |
search_items.append(result) | |
elif isinstance(result, dict): | |
search_items.append(SearchItem(**result)) | |
else: | |
raise ValueError(f"Invalid result type: {type(result)}") | |
return cls( | |
query=query, | |
results=search_items | |
) | |
def output_type(self): | |
return "search_output" |