Spaces:
Running
Running
# | |
# SPDX-FileCopyrightText: Hadad <hadad@linuxmail.org> | |
# SPDX-License-Identifier: Apache-2.0 | |
# | |
import json # Import json module to decode JSON formatted strings from server responses | |
import httpx # Import httpx library to perform asynchronous HTTP requests with HTTP/2 support | |
# Import functions to add opening and closing tags around reasoning text for proper formatting | |
from src.utils.reasoning import reasoning_tag_open, reasoning_tag_close # Functions to wrap reasoning with tags | |
# Define asynchronous function to send a POST request and stream the response from the server | |
async def httpx_transport(host: str, headers: dict, payload: dict, mode: str): | |
""" | |
This asynchronous function establishes a streaming POST request to the specified server endpoint using the httpx library with HTTP/2 support. | |
It is designed to handle incremental server responses that include both reasoning and content parts, which are streamed as separate chunks. | |
The function processes each line of the streamed response, parsing JSON data prefixed by "data:", and yields partial outputs to the caller in real-time. | |
The function maintains internal state to manage the reasoning text separately from the main content. It detects whether the response includes a reasoning section | |
by inspecting the first chunk containing the 'reasoning' field. If reasoning is present and the mode allows it (i.e., mode is not "/no_think"), it wraps the reasoning text | |
within custom tags (<think> ... </think>) to clearly demarcate this part of the output. The opening tag is inserted once at the start of reasoning, and subsequent chunks | |
append reasoning text after cleansing redundant tags. | |
Once the reasoning section is complete and the content part begins, the function closes the reasoning tags properly before yielding the final reasoning block. It then yields | |
an empty string as a separator, followed by the streamed content chunks. If reasoning is absent or disabled, the function directly accumulates and yields content chunks. | |
The function is robust against malformed data or transient connection issues, gracefully skipping any problematic chunks without interrupting the stream. It reads each line | |
as a UTF-8 decoded string, strips whitespace, and only processes lines starting with the "data:" prefix to ensure valid data handling. | |
Parameters: | |
- host (str): The URL of the server endpoint to which the POST request is sent. | |
- headers (dict): HTTP headers to include in the request, such as authorization and content type. | |
- payload (dict): The JSON payload containing the request data, including model, messages, and generation parameters. | |
- mode (str): A string controlling behavior such as enabling or disabling reasoning output (e.g., "/no_think" disables reasoning). | |
Yields: | |
- str: Partial chunks of reasoning or content as they are received from the server, allowing real-time streaming output. | |
Workflow: | |
1. Initializes empty strings and flags to track reasoning text, content text, and reasoning state. | |
2. Opens an asynchronous HTTP client session with HTTP/2 enabled and no timeout to allow indefinite streaming. | |
3. Sends a POST request to the specified host with provided headers and JSON payload, initiating a streaming response. | |
4. Iterates asynchronously over each line of the streamed response. | |
- Skips any lines that do not start with the "data:" prefix to filter valid data chunks. | |
- Parses the JSON content after the "data:" prefix into a Python dictionary. | |
- Extracts the 'delta' field from the first choice in the response, which contains incremental updates. | |
5. On the first chunk, checks if the 'reasoning' field is present and non-empty to determine if reasoning should be processed. | |
6. If reasoning is present and allowed by mode, and reasoning is not yet complete: | |
- Inserts the opening <think> tag once. | |
- Appends reasoning text chunks, removing redundant opening tags if necessary. | |
- Yields the accumulated reasoning text for real-time consumption. | |
7. When reasoning ends and content begins: | |
- Marks reasoning as done. | |
- Closes the reasoning tag properly if it was opened. | |
- Yields the finalized reasoning block. | |
- Yields an empty string as a separator. | |
- Starts accumulating content text and yields the first content chunk. | |
8. If reasoning is absent, finished, or disabled, accumulates and yields content chunks directly. | |
9. Handles any exceptions during parsing or connection by skipping malformed chunks, ensuring the stream continues uninterrupted. | |
This design allows clients to receive partial reasoning and content outputs as they are generated by the server, enabling interactive and responsive user experiences. | |
""" | |
# Initialize an empty string to accumulate streamed reasoning text from the response | |
reasoning = "" # Holds reasoning text segments as they are received incrementally | |
# Boolean flag to track whether the opening <think> tag has been inserted to avoid duplicates | |
reasoning_tag = False # Ensures the reasoning opening tag is added only once during streaming | |
# Variable to check presence of reasoning field in the first chunk of streamed data | |
reasoning_check = None # Used to determine if reasoning should be processed for this response | |
# Flag to indicate that reasoning section has finished and content streaming should start | |
reasoning_done = False # Marks when reasoning is complete and content output begins | |
# Initialize an empty string to accumulate the main content text from the response | |
content = "" # Will hold the actual content output after reasoning is finished | |
# Create an asynchronous HTTP client session with HTTP/2 enabled and no timeout to allow indefinite streaming | |
async with httpx.AsyncClient(timeout=None, http2=True) as client: # Establish persistent HTTP/2 connection | |
# Send a POST request to the given host with specified headers and JSON payload, and start streaming response | |
async with client.stream("POST", host, headers=headers, json=payload) as response: # Initiate streaming POST request | |
# Iterate asynchronously over each line of text in the streamed response content | |
async for chunk in response.aiter_lines(): # Read response line by line as it arrives from the server | |
# Skip processing for lines that do not start with the expected "data:" prefix | |
if not chunk.strip().startswith("data:"): # Only process lines that contain data payloads | |
continue # Ignore non-data lines and continue to next streamed line | |
try: | |
# Parse the JSON object from the line after removing the "data:" prefix | |
data = json.loads(chunk[5:]) # Convert JSON string to Python dictionary | |
# Extract the 'delta' dictionary which contains incremental update fields | |
choice = data["choices"][0]["delta"] # Access the partial update from the streamed response | |
# Perform a one-time check on the first chunk to detect if reasoning field exists and is non-empty | |
if reasoning_check is None: # Only check once on the initial chunk received | |
# Set reasoning_check to empty string if reasoning key exists and has content, else None | |
reasoning_check = "" if ("reasoning" in choice and choice["reasoning"]) else None | |
# If reasoning is present, mode allows thinking, reasoning not done, and reasoning text exists | |
if ( | |
reasoning_check == "" # Reasoning field detected in first chunk | |
and mode != "/no_think" # Mode does not disable reasoning output | |
and not reasoning_done # Reasoning section is still in progress | |
and "reasoning" in choice # Current chunk contains reasoning text | |
and choice["reasoning"] # Reasoning text is non-empty | |
): | |
# Insert opening reasoning tag once and append the first reasoning chunk | |
if not reasoning_tag: # Only add opening tag once at the start of reasoning | |
reasoning_tag = True # Mark that opening tag has been inserted | |
reasoning = reasoning_tag_open(reasoning) # Add opening <think> tag to reasoning string | |
reasoning += choice["reasoning"] # Append initial reasoning text chunk | |
else: | |
# Remove any duplicate opening tags and append subsequent reasoning chunks | |
reasoning_content = choice["reasoning"].replace("<think>", "") # Clean redundant tags | |
reasoning += reasoning_content # Append next reasoning segment to accumulated text | |
yield reasoning # Yield the intermediate reasoning text chunk to the caller | |
continue # Continue to next streamed line without further processing | |
# If reasoning is done and content starts arriving, finalize reasoning output | |
if ( | |
reasoning_check == "" # Reasoning was detected initially | |
and mode != "/no_think" # Mode allows reasoning output | |
and not reasoning_done # Reasoning not yet marked as done | |
and "content" in choice # Current chunk contains content field | |
and choice["content"] # Content text is non-empty | |
): | |
reasoning_done = True # Mark reasoning section as complete | |
# If reasoning tag was opened, close it properly before yielding final reasoning block | |
if reasoning_tag: # Only close tag if it was previously opened | |
reasoning = reasoning_tag_close(reasoning) # Append closing </think> tag | |
yield reasoning # Yield the complete reasoning text block | |
yield "" # Yield an empty string as a separator between reasoning and content | |
content += choice["content"] # Start accumulating content text from this chunk | |
yield content # Yield the first chunk of content text to the caller | |
continue # Proceed to next line in the stream | |
# Handle cases where reasoning is absent, finished, or mode disables reasoning, but content is present | |
if ( | |
(reasoning_check is None or reasoning_done or mode == "/no_think") # No reasoning or reasoning done or disabled mode | |
and "content" in choice # Current chunk contains content field | |
and choice["content"] # Content text is non-empty | |
): | |
content += choice["content"] # Append the content chunk to accumulated content string | |
yield content # Yield the updated content string so far | |
# Catch any exceptions from JSON parsing errors or connection issues to prevent stream break | |
except Exception: # Gracefully handle any error encountered during streaming or parsing | |
continue # Ignore malformed chunks or transient errors and continue processing next lines |