ai / src /core /transport /httpx.py
hadadrjt's picture
ai: Enable API for Next-Gen!
5d9ca4f
#
# SPDX-FileCopyrightText: Hadad <hadad@linuxmail.org>
# SPDX-License-Identifier: Apache-2.0
#
import json # Import json module to decode JSON formatted strings from server responses
import httpx # Import httpx library to perform asynchronous HTTP requests with HTTP/2 support
# Import functions to add opening and closing tags around reasoning text for proper formatting
from src.utils.reasoning import reasoning_tag_open, reasoning_tag_close # Functions to wrap reasoning with tags
# Define asynchronous function to send a POST request and stream the response from the server
async def httpx_transport(host: str, headers: dict, payload: dict, mode: str):
"""
This asynchronous function establishes a streaming POST request to the specified server endpoint using the httpx library with HTTP/2 support.
It is designed to handle incremental server responses that include both reasoning and content parts, which are streamed as separate chunks.
The function processes each line of the streamed response, parsing JSON data prefixed by "data:", and yields partial outputs to the caller in real-time.
The function maintains internal state to manage the reasoning text separately from the main content. It detects whether the response includes a reasoning section
by inspecting the first chunk containing the 'reasoning' field. If reasoning is present and the mode allows it (i.e., mode is not "/no_think"), it wraps the reasoning text
within custom tags (<think> ... </think>) to clearly demarcate this part of the output. The opening tag is inserted once at the start of reasoning, and subsequent chunks
append reasoning text after cleansing redundant tags.
Once the reasoning section is complete and the content part begins, the function closes the reasoning tags properly before yielding the final reasoning block. It then yields
an empty string as a separator, followed by the streamed content chunks. If reasoning is absent or disabled, the function directly accumulates and yields content chunks.
The function is robust against malformed data or transient connection issues, gracefully skipping any problematic chunks without interrupting the stream. It reads each line
as a UTF-8 decoded string, strips whitespace, and only processes lines starting with the "data:" prefix to ensure valid data handling.
Parameters:
- host (str): The URL of the server endpoint to which the POST request is sent.
- headers (dict): HTTP headers to include in the request, such as authorization and content type.
- payload (dict): The JSON payload containing the request data, including model, messages, and generation parameters.
- mode (str): A string controlling behavior such as enabling or disabling reasoning output (e.g., "/no_think" disables reasoning).
Yields:
- str: Partial chunks of reasoning or content as they are received from the server, allowing real-time streaming output.
Workflow:
1. Initializes empty strings and flags to track reasoning text, content text, and reasoning state.
2. Opens an asynchronous HTTP client session with HTTP/2 enabled and no timeout to allow indefinite streaming.
3. Sends a POST request to the specified host with provided headers and JSON payload, initiating a streaming response.
4. Iterates asynchronously over each line of the streamed response.
- Skips any lines that do not start with the "data:" prefix to filter valid data chunks.
- Parses the JSON content after the "data:" prefix into a Python dictionary.
- Extracts the 'delta' field from the first choice in the response, which contains incremental updates.
5. On the first chunk, checks if the 'reasoning' field is present and non-empty to determine if reasoning should be processed.
6. If reasoning is present and allowed by mode, and reasoning is not yet complete:
- Inserts the opening <think> tag once.
- Appends reasoning text chunks, removing redundant opening tags if necessary.
- Yields the accumulated reasoning text for real-time consumption.
7. When reasoning ends and content begins:
- Marks reasoning as done.
- Closes the reasoning tag properly if it was opened.
- Yields the finalized reasoning block.
- Yields an empty string as a separator.
- Starts accumulating content text and yields the first content chunk.
8. If reasoning is absent, finished, or disabled, accumulates and yields content chunks directly.
9. Handles any exceptions during parsing or connection by skipping malformed chunks, ensuring the stream continues uninterrupted.
This design allows clients to receive partial reasoning and content outputs as they are generated by the server, enabling interactive and responsive user experiences.
"""
# Initialize an empty string to accumulate streamed reasoning text from the response
reasoning = "" # Holds reasoning text segments as they are received incrementally
# Boolean flag to track whether the opening <think> tag has been inserted to avoid duplicates
reasoning_tag = False # Ensures the reasoning opening tag is added only once during streaming
# Variable to check presence of reasoning field in the first chunk of streamed data
reasoning_check = None # Used to determine if reasoning should be processed for this response
# Flag to indicate that reasoning section has finished and content streaming should start
reasoning_done = False # Marks when reasoning is complete and content output begins
# Initialize an empty string to accumulate the main content text from the response
content = "" # Will hold the actual content output after reasoning is finished
# Create an asynchronous HTTP client session with HTTP/2 enabled and no timeout to allow indefinite streaming
async with httpx.AsyncClient(timeout=None, http2=True) as client: # Establish persistent HTTP/2 connection
# Send a POST request to the given host with specified headers and JSON payload, and start streaming response
async with client.stream("POST", host, headers=headers, json=payload) as response: # Initiate streaming POST request
# Iterate asynchronously over each line of text in the streamed response content
async for chunk in response.aiter_lines(): # Read response line by line as it arrives from the server
# Skip processing for lines that do not start with the expected "data:" prefix
if not chunk.strip().startswith("data:"): # Only process lines that contain data payloads
continue # Ignore non-data lines and continue to next streamed line
try:
# Parse the JSON object from the line after removing the "data:" prefix
data = json.loads(chunk[5:]) # Convert JSON string to Python dictionary
# Extract the 'delta' dictionary which contains incremental update fields
choice = data["choices"][0]["delta"] # Access the partial update from the streamed response
# Perform a one-time check on the first chunk to detect if reasoning field exists and is non-empty
if reasoning_check is None: # Only check once on the initial chunk received
# Set reasoning_check to empty string if reasoning key exists and has content, else None
reasoning_check = "" if ("reasoning" in choice and choice["reasoning"]) else None
# If reasoning is present, mode allows thinking, reasoning not done, and reasoning text exists
if (
reasoning_check == "" # Reasoning field detected in first chunk
and mode != "/no_think" # Mode does not disable reasoning output
and not reasoning_done # Reasoning section is still in progress
and "reasoning" in choice # Current chunk contains reasoning text
and choice["reasoning"] # Reasoning text is non-empty
):
# Insert opening reasoning tag once and append the first reasoning chunk
if not reasoning_tag: # Only add opening tag once at the start of reasoning
reasoning_tag = True # Mark that opening tag has been inserted
reasoning = reasoning_tag_open(reasoning) # Add opening <think> tag to reasoning string
reasoning += choice["reasoning"] # Append initial reasoning text chunk
else:
# Remove any duplicate opening tags and append subsequent reasoning chunks
reasoning_content = choice["reasoning"].replace("<think>", "") # Clean redundant tags
reasoning += reasoning_content # Append next reasoning segment to accumulated text
yield reasoning # Yield the intermediate reasoning text chunk to the caller
continue # Continue to next streamed line without further processing
# If reasoning is done and content starts arriving, finalize reasoning output
if (
reasoning_check == "" # Reasoning was detected initially
and mode != "/no_think" # Mode allows reasoning output
and not reasoning_done # Reasoning not yet marked as done
and "content" in choice # Current chunk contains content field
and choice["content"] # Content text is non-empty
):
reasoning_done = True # Mark reasoning section as complete
# If reasoning tag was opened, close it properly before yielding final reasoning block
if reasoning_tag: # Only close tag if it was previously opened
reasoning = reasoning_tag_close(reasoning) # Append closing </think> tag
yield reasoning # Yield the complete reasoning text block
yield "" # Yield an empty string as a separator between reasoning and content
content += choice["content"] # Start accumulating content text from this chunk
yield content # Yield the first chunk of content text to the caller
continue # Proceed to next line in the stream
# Handle cases where reasoning is absent, finished, or mode disables reasoning, but content is present
if (
(reasoning_check is None or reasoning_done or mode == "/no_think") # No reasoning or reasoning done or disabled mode
and "content" in choice # Current chunk contains content field
and choice["content"] # Content text is non-empty
):
content += choice["content"] # Append the content chunk to accumulated content string
yield content # Yield the updated content string so far
# Catch any exceptions from JSON parsing errors or connection issues to prevent stream break
except Exception: # Gracefully handle any error encountered during streaming or parsing
continue # Ignore malformed chunks or transient errors and continue processing next lines