File size: 14,734 Bytes
3e8c06e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
import json
import logging
from builtins import anext
from datetime import datetime
from typing import Any, Dict, Generator, AsyncGenerator, Optional

from pydantic import Field, BaseModel, model_validator

from aworld.models.model_response import ModelResponse, ToolCall


class OutputPart(BaseModel):
    content: Any
    metadata: Optional[Dict[str, Any]] = Field(default_factory=dict, description="metadata")

    @model_validator(mode='after')
    def setup_metadata(self):
        # Ensure metadata is initialized
        if self.metadata is None:
            self.metadata = {}
        return self
    

class Output(BaseModel):
    metadata: Optional[Dict[str, Any]] = Field(default_factory=dict, description="metadata")
    parts: Any = Field(default_factory=list, exclude=True, description="parts of Output")
    data: Any = Field(default=None, exclude=True, description="Output Data")

    @model_validator(mode='after')
    def setup_defaults(self):
        # Ensure metadata and parts are initialized
        if self.metadata is None:
            self.metadata = {}
        if self.parts is None:
            self.parts = []
        return self

    def add_part(self, content: Any):
        if self.parts is None:
            self.parts = []
        self.parts.append(OutputPart(content=content))

    def output_type(self):
        return "default"


class ToolCallOutput(Output):

    @classmethod
    def from_tool_call(cls, tool_call: ToolCall):
        return cls(data = tool_call)

    def output_type(self):
        return "tool_call"

class ToolResultOutput(Output):

    origin_tool_call: Optional[ToolCall] = Field(default=None, description="origin tool call", exclude=True)

    image: str = Field(default=None)

    images: list[str] = Field(default_factory=list)

    tool_type: str = Field(default=None)

    tool_name: str = Field(default=None)

    def output_type(self):
        return "tool_call_result"

    pass

class RunFinishedSignal(Output):

    def output_type(self):
        return "finished_signal"

RUN_FINISHED_SIGNAL = RunFinishedSignal()


class MessageOutput(Output):

    """
    MessageOutput structure of LLM output
    if you want to get the only response, you must first call reasoning_generator or set parameter only_response to True , then call response_generator
    if you model not reasoning, you do not need care about reasoning_generator and reasoning

    1. source: async/sync generator of the message
    2. reasoning_generator: async/sync reasoning generator of the message
    3. response_generator: async/sync response generator of the message;
    4. reasoning: reasoning of the message
    5. response: response of the message
    6. tool_calls

    """

    source: Any = Field(default=None, exclude=True, description="Source of the message")
    
    reason_generator: Any = Field(default=None, exclude=True, description="reasoning generator of the message")
    response_generator: Any = Field(default=None, exclude=True, description="response generator of the message")

    """
    result
    """
    reasoning: str = Field(default=None, description="reasoning of the message")
    response: Any = Field(default=None, description="response of the message")
    tool_calls: list[ToolCallOutput] = Field(default_factory=list, description="tool_calls")


    """
    other config
    """
    reasoning_format_start: str = Field(default="<think>", description="reasoning format start of the message")
    reasoning_format_end: str = Field(default="</think>", description="reasoning format end of the message")

    json_parse: bool = Field(default=False, description="json parse of the message", exclude=True)
    has_reasoning: bool = Field(default=False, description="has reasoning of the message")
    finished: bool = Field(default=False, description="finished of the message")

    @model_validator(mode='after')
    def setup_generators(self):
        """
        Setup generators for reasoning and response
        """
        source = self.source

        # if ModelResponse
        if isinstance(self.source, ModelResponse):
            source = self.source.content
            if self.source.tool_calls:
                [self.tool_calls.append(ToolCallOutput.from_tool_call(tool_call)) for tool_call in
                 self.source.tool_calls]

        if source is not None and isinstance(source, AsyncGenerator):
            # Create empty generators first, they will be initialized when actually used
            self.reason_generator = self.__aget_reasoning_generator()
            self.response_generator = self.__aget_response_generator()
        elif source is not None and isinstance(source, Generator):
            self.reason_generator, self.response_generator = self.__split_reasoning_and_response__()
        elif source is not None and isinstance(source, str):
            self.reasoning, self.response = self.__resolve_think__(source)
        return self

    async def get_finished_reasoning(self):
        if self.reasoning:
            return self.reasoning
        else:
            if self.has_reasoning and not self.reasoning:
                async for reason in self.reason_generator:
                    pass
                return self.reasoning
            else:
                return self.reasoning

    async def get_finished_response(self):
        if self.response:
            return self.response
        else:
            if self.response_generator:
                async for item in self.response_generator:
                    pass
            return self.response
    
    async def __aget_reasoning_generator(self) -> AsyncGenerator[str, None]:
        """
        Get reasoning content as async generator
        """
        if not self.has_reasoning:
            yield ""
            self.reasoning = ""
            return  
        
        reasoning_buffer = ""
        is_in_reasoning = False
        if self.reasoning and len(self.reasoning) > 0:
            yield self.reasoning
            return
        
        try:
            while True:
                chunk = await anext(self.source)
                chunk_content = self.get_chunk_content(chunk)
                if not chunk_content:
                    continue
                if chunk_content.startswith(self.reasoning_format_start):
                    is_in_reasoning = True
                    reasoning_buffer = chunk_content
                    yield chunk_content
                elif chunk_content.endswith(self.reasoning_format_end) and is_in_reasoning:
                    reasoning_buffer += chunk_content
                    yield chunk_content
                    self.reasoning = reasoning_buffer
                    break
                elif is_in_reasoning:
                    reasoning_buffer += chunk_content
                    yield chunk_content
        except StopAsyncIteration:
            logging.info("StopAsyncIteration")

    async def __aget_response_generator(self) -> AsyncGenerator[str, None]:
        """
        Get response content as async generator

        if has_reasoning is True, system will first call reasoning_generator if you not call it;
        else it will return content contains reasoning and response
        """
        response_buffer = ""

        if self.response and len(self.response) > 0:
            yield self.response
            return
        
        # if has_reasoning is True, system will first call reasoning_generator if you not call it;
        if self.has_reasoning and not self.reasoning:
            async for reason in self.reason_generator:
                pass

        try:
            while True:
                chunk = await anext(self.source)
                chunk_content = self.get_chunk_content(chunk)

                if not chunk_content:
                    continue
                response_buffer += chunk_content
                yield chunk_content
        except StopAsyncIteration:
            self.finished = True
            self.response = self.__resolve_json__(response_buffer, self.json_parse)

    def get_chunk_content(self, chunk):
        if isinstance(chunk, ModelResponse):
            return chunk.content
        else:
            return chunk

    def __split_reasoning_and_response__(self) -> tuple[Generator[str, None, None], Generator[str, None, None]]: # type: ignore
        """
        Split source into reasoning and response generators for sync source
        Returns:
            tuple: (reasoning_generator, response_generator)
        """
        if not self.has_reasoning:
            yield ""
            self.reasoning = ""
            return  
        
        if not isinstance(self.source, Generator):
            raise ValueError("Source must be a Generator")

        def reasoning_generator():
            if self.reasoning and len(self.reasoning) > 0:
                yield self.reasoning
                return

            reasoning_buffer = ""
            is_in_reasoning = False
            
            try:
                while True:
                    chunk = next(self.source)
                    chunk_content = self.get_chunk_content(chunk)
                    if chunk_content.startswith(self.reasoning_format_start):
                        is_in_reasoning = True
                        reasoning_buffer = chunk_content
                        yield chunk_content
                    elif chunk_content.endswith(self.reasoning_format_end) and is_in_reasoning:
                        reasoning_buffer += chunk_content
                        self.reasoning = reasoning_buffer
                        yield chunk_content
                        break
                    elif is_in_reasoning:
                        yield chunk_content
                        reasoning_buffer += chunk_content
            except StopIteration:
                print("StopIteration")
                self.reasoning = reasoning_buffer

        def response_generator():
            if self.response and len(self.response) > 0:
                yield self.response
                return
            
            # if has_reasoning is True, system will first call reasoning_generator if you not call it;
            if self.has_reasoning and not self.reasoning:
                for reason in self.reason_generator:
                    pass
            
            response_buffer = ""
            try:
                while True:
                    chunk = next(self.source)
                    chunk_content = self.get_chunk_content(chunk)
                    response_buffer += chunk_content
                    self.response = response_buffer
                    yield chunk_content
            except StopIteration:
                self.response = self.__resolve_json__(response_buffer,self.json_parse)
                self.finished = True


        return reasoning_generator(), response_generator()

    def __resolve_think__(self, content):
        import re
        start_tag = self.reasoning_format_start.replace("<", "").replace(">", "")
        end_tag = self.reasoning_format_end.replace("<", "").replace(">", "")

        llm_think = ""
        match = re.search(
            rf"<{re.escape(start_tag)}(.*?)>(.|\n)*?<{re.escape(end_tag)}>",
            content,
            flags=re.DOTALL,
        )
        if match:
            llm_think = match.group(0).replace("<think>", "").replace("</think>", "")
        llm_result = re.sub(
            rf"<{re.escape(start_tag)}(.*?)>(.|\n)*?<{re.escape(end_tag)}>",
            "",
            content,
            flags=re.DOTALL,
        )
        llm_result = self.__resolve_json__(llm_result, self.json_parse)

        return llm_think, llm_result

    def __resolve_json__(self, content, json_parse = False):
        if json_parse:
            if content.__contains__("```json"):
                content = content.replace("```json", "").replace("```", "")
            return json.loads(content)
        return content

    def output_type(self):
        return "message_output"

class StepOutput(Output):
    name: str
    step_num: int
    alias_name: Optional[str] = Field(default=None, description="alias_name of the step")
    status: Optional[str] = Field(default="START", description="step_status")
    started_at: str = Field(default_factory=lambda: datetime.now().isoformat(), description="started at")
    finished_at: str = Field(default_factory=lambda: datetime.now().isoformat(), description="finished at")

    @classmethod
    def build_start_output(cls, name, step_num, alias_name=None, data=None):
        return cls(name=name, step_num=step_num, alias_name=alias_name, data=data)

    @classmethod
    def build_finished_output(cls, name, step_num, alias_name=None, data=None):
        return cls(name=name, step_num=step_num, alias_name=alias_name, status='FINISHED', data=data)

    @classmethod
    def build_failed_output(cls, name, step_num, alias_name=None, data=None):
        return cls(name=name, step_num=step_num, alias_name=alias_name, status='FAILED', data=data)

    def output_type(self):
        return "step_output"

    @property
    def show_name(self):
        return self.alias_name if self.alias_name else self.name



class SearchItem(BaseModel):
    title: str = Field(default="", description="search result title")
    url: str = Field(default="", description="search result url")
    snippet: str = Field(default="", description="search result snippet")
    content: str = Field(default="", description="search content", exclude=True)
    raw_content: Optional[str] = Field(default="", description="search raw content", exclude=True)
    metadata: Optional[Dict[str, Any]] = Field(default_factory=dict, description="metadata")


class SearchOutput(ToolResultOutput):
    query: str = Field(..., description="Search query string")
    results: list[SearchItem] = Field(default_factory=list, description="List of search results")

    @classmethod
    def from_dict(cls, data: dict) -> "SearchOutput":
        if not isinstance(data, dict):
            data = {}

        query = data.get("query")
        if query is None:
            raise ValueError("query is required")

        results_data = data.get("results", [])
        
        search_items = []
        for result in results_data:
            if isinstance(result, SearchItem):
                search_items.append(result)
            elif isinstance(result, dict):
                search_items.append(SearchItem(**result))
            else:
                raise ValueError(f"Invalid result type: {type(result)}")

        return cls(
            query=query,
            results=search_items
        )

    def output_type(self):
        return "search_output"