File size: 10,483 Bytes
5d9ca4f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
#
# SPDX-FileCopyrightText: Hadad <hadad@linuxmail.org>
# SPDX-License-Identifier: Apache-2.0
#

import json  # Import json module to parse JSON formatted strings from server response lines
import aiohttp  # Import aiohttp library to perform asynchronous HTTP requests and handle streaming responses
# Import helper functions to add opening and closing reasoning tags around reasoning text
from src.utils.reasoning import reasoning_tag_open, reasoning_tag_close  # Functions to wrap reasoning with tags

# Define an asynchronous function to send a POST request and stream the response from the server
async def aiohttp_transport(host: str, headers: dict, payload: dict, mode: str):
    """
    This asynchronous function establishes a streaming HTTP POST connection to the specified server endpoint
    using the aiohttp library. It sends a JSON payload containing the request parameters and headers, and
    processes the server's streamed response line by line in real time.

    The function is designed to handle responses that include two types of data chunks: reasoning text and
    content text. Reasoning text represents intermediate thought processes or explanations generated by the AI,
    while content text represents the final output or answer.

    The function maintains several internal state variables to manage the streaming process:

    - 'reasoning' accumulates the reasoning text segments as they arrive incrementally from the server.
    - 'reasoning_tag' is a boolean flag that ensures the opening reasoning tag (<think>) is inserted only once.
    - 'reasoning_check' is used to detect if the reasoning field is present in the initial streamed data chunk,
      which determines whether reasoning processing should occur.
    - 'reasoning_done' indicates when the reasoning phase has completed and the function should switch to
      accumulating content text.
    - 'content' accumulates the main content text after reasoning finishes.

    The function reads the response stream asynchronously, decoding each line from bytes to UTF-8 strings,
    and filters out any lines that do not start with the expected "data:" prefix. For valid data lines, it
    parses the JSON payload to extract incremental updates contained within the 'delta' field of the first
    choice in the response.

    Upon detecting reasoning text in the delta, and if the current mode allows reasoning output (i.e., mode is
    not "/no_think"), the function inserts an opening <think> tag once and appends subsequent reasoning chunks,
    carefully removing any duplicate tags to maintain clean formatting. It yields these reasoning segments
    progressively to the caller, enabling real-time display of the AI's intermediate thoughts.

    When the response transitions from reasoning to content (indicated by the presence of 'content' in the delta),
    the function closes the reasoning block with a closing </think> tag if it was opened, yields the final reasoning
    block, and then begins accumulating and yielding content chunks. An empty string is yielded as a separator
    between reasoning and content for clarity.

    If reasoning is absent, completed, or disabled by mode, the function directly accumulates and yields content
    chunks as they arrive.

    The function includes robust error handling to gracefully skip over any malformed JSON chunks or transient
    connection issues without interrupting the streaming process. This ensures continuous and reliable streaming
    of AI responses even in the face of occasional data irregularities.

    Overall, this function provides a comprehensive and efficient mechanism to stream, parse, and yield AI-generated
    reasoning and content in real time, supporting interactive and dynamic user experiences.
    """

    # Initialize an empty string to accumulate streamed reasoning text segments from the response
    reasoning = ""  # This will hold the reasoning text as it is received incrementally

    # Boolean flag to track if the opening <think> tag has been inserted to avoid duplicates
    reasoning_tag = False  # Ensures the reasoning opening tag is added only once

    # Variable to check presence of reasoning field in the first chunk of streamed data
    reasoning_check = None  # Used to determine if reasoning should be processed for this response

    # Flag to indicate that reasoning section has finished and content streaming should start
    reasoning_done = False  # Marks when reasoning is complete and content output begins

    # Initialize an empty string to accumulate the main content text from the response
    content = ""  # Will hold the actual content output after reasoning is finished

    # Create an aiohttp client session with no timeout to allow indefinite streaming
    async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=None)) as session:
        # Send a POST request to the given host with specified headers and JSON payload
        async with session.post(host, headers=headers, json=payload) as resp:
            resp.raise_for_status()  # Raise an exception if HTTP response status is not successful (2xx)

            # Iterate asynchronously over each line of bytes in the streamed response content
            async for line_bytes in resp.content:
                line = line_bytes.decode("utf-8").strip()  # Decode bytes to UTF-8 string and strip whitespace

                # Skip processing for lines that do not start with the expected "data:" prefix
                if not line.startswith("data:"):
                    continue  # Ignore lines without data prefix and continue to next streamed line

                try:
                    # Parse the JSON object from the line after removing the "data:" prefix
                    data = json.loads(line[5:])  # Convert JSON string to Python dictionary

                    # Extract the 'delta' dictionary which contains incremental update fields
                    choice = data["choices"][0]["delta"]  # Access the partial update from the streamed response

                    # Perform a one-time check on the first chunk to detect if reasoning field exists and is non-empty
                    if reasoning_check is None:  # Only check once on the initial chunk received
                        # Set reasoning_check to empty string if reasoning key exists and has content, else None
                        reasoning_check = "" if ("reasoning" in choice and choice["reasoning"]) else None

                    # If reasoning is present, mode allows thinking, reasoning not done, and reasoning text exists
                    if (
                        reasoning_check == ""  # Reasoning field detected in first chunk
                        and mode != "/no_think"  # Mode does not disable reasoning output
                        and not reasoning_done  # Reasoning section is still in progress
                        and "reasoning" in choice  # Current chunk contains reasoning text
                        and choice["reasoning"]  # Reasoning text is non-empty
                    ):
                        # Insert opening reasoning tag once and append the first reasoning chunk
                        if not reasoning_tag:  # Only add opening tag once at the start of reasoning
                            reasoning_tag = True  # Mark that opening tag has been inserted
                            reasoning = reasoning_tag_open(reasoning)  # Add opening <think> tag to reasoning string
                            reasoning += choice["reasoning"]  # Append initial reasoning text chunk
                        else:
                            # Remove any duplicate opening tags and append subsequent reasoning chunks
                            reasoning_content = choice["reasoning"].replace("<think>", "")  # Clean redundant tags
                            reasoning += reasoning_content  # Append next reasoning segment to accumulated text

                        yield reasoning  # Yield the intermediate reasoning text chunk to the caller
                        continue  # Continue to next streamed line without further processing

                    # If reasoning is done and content starts arriving, finalize reasoning output
                    if (
                        reasoning_check == ""  # Reasoning was detected initially
                        and mode != "/no_think"  # Mode allows reasoning output
                        and not reasoning_done  # Reasoning not yet marked as done
                        and "content" in choice  # Current chunk contains content field
                        and choice["content"]  # Content text is non-empty
                    ):
                        reasoning_done = True  # Mark reasoning section as complete

                        # If reasoning tag was opened, close it properly before yielding final reasoning block
                        if reasoning_tag:  # Only close tag if it was previously opened
                            reasoning = reasoning_tag_close(reasoning)  # Append closing </think> tag
                            yield reasoning  # Yield the complete reasoning text block

                        yield ""  # Yield an empty string as a separator between reasoning and content
                        content += choice["content"]  # Start accumulating content text from this chunk
                        yield content  # Yield the first chunk of content text to the caller
                        continue  # Proceed to next line in the stream

                    # Handle cases where reasoning is absent, finished, or mode disables reasoning, but content is present
                    if (
                        (reasoning_check is None or reasoning_done or mode == "/no_think")  # No reasoning or reasoning done or disabled mode
                        and "content" in choice  # Current chunk contains content field
                        and choice["content"]  # Content text is non-empty
                    ):
                        content += choice["content"]  # Append the content chunk to accumulated content string
                        yield content  # Yield the updated content string so far

                # Catch any exceptions from JSON parsing errors or connection issues to prevent stream break
                except Exception:
                    continue  # Ignore malformed chunks or transient errors and continue processing next lines