Spaces:
Running
Running
# | |
# SPDX-FileCopyrightText: Hadad <hadad@linuxmail.org> | |
# SPDX-License-Identifier: Apache-2.0 | |
# | |
import json # Import json module to parse JSON formatted strings from server response lines | |
import aiohttp # Import aiohttp library to perform asynchronous HTTP requests and handle streaming responses | |
# Import helper functions to add opening and closing reasoning tags around reasoning text | |
from src.utils.reasoning import reasoning_tag_open, reasoning_tag_close # Functions to wrap reasoning with tags | |
# Define an asynchronous function to send a POST request and stream the response from the server | |
async def aiohttp_transport(host: str, headers: dict, payload: dict, mode: str): | |
""" | |
This asynchronous function establishes a streaming HTTP POST connection to the specified server endpoint | |
using the aiohttp library. It sends a JSON payload containing the request parameters and headers, and | |
processes the server's streamed response line by line in real time. | |
The function is designed to handle responses that include two types of data chunks: reasoning text and | |
content text. Reasoning text represents intermediate thought processes or explanations generated by the AI, | |
while content text represents the final output or answer. | |
The function maintains several internal state variables to manage the streaming process: | |
- 'reasoning' accumulates the reasoning text segments as they arrive incrementally from the server. | |
- 'reasoning_tag' is a boolean flag that ensures the opening reasoning tag (<think>) is inserted only once. | |
- 'reasoning_check' is used to detect if the reasoning field is present in the initial streamed data chunk, | |
which determines whether reasoning processing should occur. | |
- 'reasoning_done' indicates when the reasoning phase has completed and the function should switch to | |
accumulating content text. | |
- 'content' accumulates the main content text after reasoning finishes. | |
The function reads the response stream asynchronously, decoding each line from bytes to UTF-8 strings, | |
and filters out any lines that do not start with the expected "data:" prefix. For valid data lines, it | |
parses the JSON payload to extract incremental updates contained within the 'delta' field of the first | |
choice in the response. | |
Upon detecting reasoning text in the delta, and if the current mode allows reasoning output (i.e., mode is | |
not "/no_think"), the function inserts an opening <think> tag once and appends subsequent reasoning chunks, | |
carefully removing any duplicate tags to maintain clean formatting. It yields these reasoning segments | |
progressively to the caller, enabling real-time display of the AI's intermediate thoughts. | |
When the response transitions from reasoning to content (indicated by the presence of 'content' in the delta), | |
the function closes the reasoning block with a closing </think> tag if it was opened, yields the final reasoning | |
block, and then begins accumulating and yielding content chunks. An empty string is yielded as a separator | |
between reasoning and content for clarity. | |
If reasoning is absent, completed, or disabled by mode, the function directly accumulates and yields content | |
chunks as they arrive. | |
The function includes robust error handling to gracefully skip over any malformed JSON chunks or transient | |
connection issues without interrupting the streaming process. This ensures continuous and reliable streaming | |
of AI responses even in the face of occasional data irregularities. | |
Overall, this function provides a comprehensive and efficient mechanism to stream, parse, and yield AI-generated | |
reasoning and content in real time, supporting interactive and dynamic user experiences. | |
""" | |
# Initialize an empty string to accumulate streamed reasoning text segments from the response | |
reasoning = "" # This will hold the reasoning text as it is received incrementally | |
# Boolean flag to track if the opening <think> tag has been inserted to avoid duplicates | |
reasoning_tag = False # Ensures the reasoning opening tag is added only once | |
# Variable to check presence of reasoning field in the first chunk of streamed data | |
reasoning_check = None # Used to determine if reasoning should be processed for this response | |
# Flag to indicate that reasoning section has finished and content streaming should start | |
reasoning_done = False # Marks when reasoning is complete and content output begins | |
# Initialize an empty string to accumulate the main content text from the response | |
content = "" # Will hold the actual content output after reasoning is finished | |
# Create an aiohttp client session with no timeout to allow indefinite streaming | |
async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=None)) as session: | |
# Send a POST request to the given host with specified headers and JSON payload | |
async with session.post(host, headers=headers, json=payload) as resp: | |
resp.raise_for_status() # Raise an exception if HTTP response status is not successful (2xx) | |
# Iterate asynchronously over each line of bytes in the streamed response content | |
async for line_bytes in resp.content: | |
line = line_bytes.decode("utf-8").strip() # Decode bytes to UTF-8 string and strip whitespace | |
# Skip processing for lines that do not start with the expected "data:" prefix | |
if not line.startswith("data:"): | |
continue # Ignore lines without data prefix and continue to next streamed line | |
try: | |
# Parse the JSON object from the line after removing the "data:" prefix | |
data = json.loads(line[5:]) # Convert JSON string to Python dictionary | |
# Extract the 'delta' dictionary which contains incremental update fields | |
choice = data["choices"][0]["delta"] # Access the partial update from the streamed response | |
# Perform a one-time check on the first chunk to detect if reasoning field exists and is non-empty | |
if reasoning_check is None: # Only check once on the initial chunk received | |
# Set reasoning_check to empty string if reasoning key exists and has content, else None | |
reasoning_check = "" if ("reasoning" in choice and choice["reasoning"]) else None | |
# If reasoning is present, mode allows thinking, reasoning not done, and reasoning text exists | |
if ( | |
reasoning_check == "" # Reasoning field detected in first chunk | |
and mode != "/no_think" # Mode does not disable reasoning output | |
and not reasoning_done # Reasoning section is still in progress | |
and "reasoning" in choice # Current chunk contains reasoning text | |
and choice["reasoning"] # Reasoning text is non-empty | |
): | |
# Insert opening reasoning tag once and append the first reasoning chunk | |
if not reasoning_tag: # Only add opening tag once at the start of reasoning | |
reasoning_tag = True # Mark that opening tag has been inserted | |
reasoning = reasoning_tag_open(reasoning) # Add opening <think> tag to reasoning string | |
reasoning += choice["reasoning"] # Append initial reasoning text chunk | |
else: | |
# Remove any duplicate opening tags and append subsequent reasoning chunks | |
reasoning_content = choice["reasoning"].replace("<think>", "") # Clean redundant tags | |
reasoning += reasoning_content # Append next reasoning segment to accumulated text | |
yield reasoning # Yield the intermediate reasoning text chunk to the caller | |
continue # Continue to next streamed line without further processing | |
# If reasoning is done and content starts arriving, finalize reasoning output | |
if ( | |
reasoning_check == "" # Reasoning was detected initially | |
and mode != "/no_think" # Mode allows reasoning output | |
and not reasoning_done # Reasoning not yet marked as done | |
and "content" in choice # Current chunk contains content field | |
and choice["content"] # Content text is non-empty | |
): | |
reasoning_done = True # Mark reasoning section as complete | |
# If reasoning tag was opened, close it properly before yielding final reasoning block | |
if reasoning_tag: # Only close tag if it was previously opened | |
reasoning = reasoning_tag_close(reasoning) # Append closing </think> tag | |
yield reasoning # Yield the complete reasoning text block | |
yield "" # Yield an empty string as a separator between reasoning and content | |
content += choice["content"] # Start accumulating content text from this chunk | |
yield content # Yield the first chunk of content text to the caller | |
continue # Proceed to next line in the stream | |
# Handle cases where reasoning is absent, finished, or mode disables reasoning, but content is present | |
if ( | |
(reasoning_check is None or reasoning_done or mode == "/no_think") # No reasoning or reasoning done or disabled mode | |
and "content" in choice # Current chunk contains content field | |
and choice["content"] # Content text is non-empty | |
): | |
content += choice["content"] # Append the content chunk to accumulated content string | |
yield content # Yield the updated content string so far | |
# Catch any exceptions from JSON parsing errors or connection issues to prevent stream break | |
except Exception: | |
continue # Ignore malformed chunks or transient errors and continue processing next lines |