File size: 11,639 Bytes
5d9ca4f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
#
# SPDX-FileCopyrightText: Hadad <hadad@linuxmail.org>
# SPDX-License-Identifier: Apache-2.0
#

import json  # Import json module to decode JSON formatted strings from server responses
import httpx  # Import httpx library to perform asynchronous HTTP requests with HTTP/2 support
# Import functions to add opening and closing tags around reasoning text for proper formatting
from src.utils.reasoning import reasoning_tag_open, reasoning_tag_close  # Functions to wrap reasoning with tags

# Define asynchronous function to send a POST request and stream the response from the server
async def httpx_transport(host: str, headers: dict, payload: dict, mode: str):
    """
    This asynchronous function establishes a streaming POST request to the specified server endpoint using the httpx library with HTTP/2 support.
    It is designed to handle incremental server responses that include both reasoning and content parts, which are streamed as separate chunks.
    The function processes each line of the streamed response, parsing JSON data prefixed by "data:", and yields partial outputs to the caller in real-time.

    The function maintains internal state to manage the reasoning text separately from the main content. It detects whether the response includes a reasoning section
    by inspecting the first chunk containing the 'reasoning' field. If reasoning is present and the mode allows it (i.e., mode is not "/no_think"), it wraps the reasoning text 
    within custom tags (<think> ... </think>) to clearly demarcate this part of the output. The opening tag is inserted once at the start of reasoning, and subsequent chunks 
    append reasoning text after cleansing redundant tags.

    Once the reasoning section is complete and the content part begins, the function closes the reasoning tags properly before yielding the final reasoning block. It then yields 
    an empty string as a separator, followed by the streamed content chunks. If reasoning is absent or disabled, the function directly accumulates and yields content chunks.

    The function is robust against malformed data or transient connection issues, gracefully skipping any problematic chunks without interrupting the stream. It reads each line 
    as a UTF-8 decoded string, strips whitespace, and only processes lines starting with the "data:" prefix to ensure valid data handling.

    Parameters:
    - host (str): The URL of the server endpoint to which the POST request is sent.
    - headers (dict): HTTP headers to include in the request, such as authorization and content type.
    - payload (dict): The JSON payload containing the request data, including model, messages, and generation parameters.
    - mode (str): A string controlling behavior such as enabling or disabling reasoning output (e.g., "/no_think" disables reasoning).

    Yields:
    - str: Partial chunks of reasoning or content as they are received from the server, allowing real-time streaming output.

    Workflow:
    1. Initializes empty strings and flags to track reasoning text, content text, and reasoning state.
    2. Opens an asynchronous HTTP client session with HTTP/2 enabled and no timeout to allow indefinite streaming.
    3. Sends a POST request to the specified host with provided headers and JSON payload, initiating a streaming response.
    4. Iterates asynchronously over each line of the streamed response.
       - Skips any lines that do not start with the "data:" prefix to filter valid data chunks.
       - Parses the JSON content after the "data:" prefix into a Python dictionary.
       - Extracts the 'delta' field from the first choice in the response, which contains incremental updates.
    5. On the first chunk, checks if the 'reasoning' field is present and non-empty to determine if reasoning should be processed.
    6. If reasoning is present and allowed by mode, and reasoning is not yet complete:
       - Inserts the opening <think> tag once.
       - Appends reasoning text chunks, removing redundant opening tags if necessary.
       - Yields the accumulated reasoning text for real-time consumption.
    7. When reasoning ends and content begins:
       - Marks reasoning as done.
       - Closes the reasoning tag properly if it was opened.
       - Yields the finalized reasoning block.
       - Yields an empty string as a separator.
       - Starts accumulating content text and yields the first content chunk.
    8. If reasoning is absent, finished, or disabled, accumulates and yields content chunks directly.
    9. Handles any exceptions during parsing or connection by skipping malformed chunks, ensuring the stream continues uninterrupted.

    This design allows clients to receive partial reasoning and content outputs as they are generated by the server, enabling interactive and responsive user experiences.
    """

    # Initialize an empty string to accumulate streamed reasoning text from the response
    reasoning = ""  # Holds reasoning text segments as they are received incrementally

    # Boolean flag to track whether the opening <think> tag has been inserted to avoid duplicates
    reasoning_tag = False  # Ensures the reasoning opening tag is added only once during streaming

    # Variable to check presence of reasoning field in the first chunk of streamed data
    reasoning_check = None  # Used to determine if reasoning should be processed for this response

    # Flag to indicate that reasoning section has finished and content streaming should start
    reasoning_done = False  # Marks when reasoning is complete and content output begins

    # Initialize an empty string to accumulate the main content text from the response
    content = ""  # Will hold the actual content output after reasoning is finished

    # Create an asynchronous HTTP client session with HTTP/2 enabled and no timeout to allow indefinite streaming
    async with httpx.AsyncClient(timeout=None, http2=True) as client:  # Establish persistent HTTP/2 connection
        # Send a POST request to the given host with specified headers and JSON payload, and start streaming response
        async with client.stream("POST", host, headers=headers, json=payload) as response:  # Initiate streaming POST request
            # Iterate asynchronously over each line of text in the streamed response content
            async for chunk in response.aiter_lines():  # Read response line by line as it arrives from the server
                # Skip processing for lines that do not start with the expected "data:" prefix
                if not chunk.strip().startswith("data:"):  # Only process lines that contain data payloads
                    continue  # Ignore non-data lines and continue to next streamed line

                try:
                    # Parse the JSON object from the line after removing the "data:" prefix
                    data = json.loads(chunk[5:])  # Convert JSON string to Python dictionary

                    # Extract the 'delta' dictionary which contains incremental update fields
                    choice = data["choices"][0]["delta"]  # Access the partial update from the streamed response

                    # Perform a one-time check on the first chunk to detect if reasoning field exists and is non-empty
                    if reasoning_check is None:  # Only check once on the initial chunk received
                        # Set reasoning_check to empty string if reasoning key exists and has content, else None
                        reasoning_check = "" if ("reasoning" in choice and choice["reasoning"]) else None

                    # If reasoning is present, mode allows thinking, reasoning not done, and reasoning text exists
                    if (
                        reasoning_check == ""  # Reasoning field detected in first chunk
                        and mode != "/no_think"  # Mode does not disable reasoning output
                        and not reasoning_done  # Reasoning section is still in progress
                        and "reasoning" in choice  # Current chunk contains reasoning text
                        and choice["reasoning"]  # Reasoning text is non-empty
                    ):
                        # Insert opening reasoning tag once and append the first reasoning chunk
                        if not reasoning_tag:  # Only add opening tag once at the start of reasoning
                            reasoning_tag = True  # Mark that opening tag has been inserted
                            reasoning = reasoning_tag_open(reasoning)  # Add opening <think> tag to reasoning string
                            reasoning += choice["reasoning"]  # Append initial reasoning text chunk
                        else:
                            # Remove any duplicate opening tags and append subsequent reasoning chunks
                            reasoning_content = choice["reasoning"].replace("<think>", "")  # Clean redundant tags
                            reasoning += reasoning_content  # Append next reasoning segment to accumulated text

                        yield reasoning  # Yield the intermediate reasoning text chunk to the caller
                        continue  # Continue to next streamed line without further processing

                    # If reasoning is done and content starts arriving, finalize reasoning output
                    if (
                        reasoning_check == ""  # Reasoning was detected initially
                        and mode != "/no_think"  # Mode allows reasoning output
                        and not reasoning_done  # Reasoning not yet marked as done
                        and "content" in choice  # Current chunk contains content field
                        and choice["content"]  # Content text is non-empty
                    ):
                        reasoning_done = True  # Mark reasoning section as complete

                        # If reasoning tag was opened, close it properly before yielding final reasoning block
                        if reasoning_tag:  # Only close tag if it was previously opened
                            reasoning = reasoning_tag_close(reasoning)  # Append closing </think> tag
                            yield reasoning  # Yield the complete reasoning text block

                        yield ""  # Yield an empty string as a separator between reasoning and content
                        content += choice["content"]  # Start accumulating content text from this chunk
                        yield content  # Yield the first chunk of content text to the caller
                        continue  # Proceed to next line in the stream

                    # Handle cases where reasoning is absent, finished, or mode disables reasoning, but content is present
                    if (
                        (reasoning_check is None or reasoning_done or mode == "/no_think")  # No reasoning or reasoning done or disabled mode
                        and "content" in choice  # Current chunk contains content field
                        and choice["content"]  # Content text is non-empty
                    ):
                        content += choice["content"]  # Append the content chunk to accumulated content string
                        yield content  # Yield the updated content string so far

                # Catch any exceptions from JSON parsing errors or connection issues to prevent stream break
                except Exception:  # Gracefully handle any error encountered during streaming or parsing
                    continue  # Ignore malformed chunks or transient errors and continue processing next lines