ai / src /core /transport /aiohttp.py
hadadrjt's picture
ai: Enable API for Next-Gen!
5d9ca4f
#
# SPDX-FileCopyrightText: Hadad <hadad@linuxmail.org>
# SPDX-License-Identifier: Apache-2.0
#
import json # Import json module to parse JSON formatted strings from server response lines
import aiohttp # Import aiohttp library to perform asynchronous HTTP requests and handle streaming responses
# Import helper functions to add opening and closing reasoning tags around reasoning text
from src.utils.reasoning import reasoning_tag_open, reasoning_tag_close # Functions to wrap reasoning with tags
# Define an asynchronous function to send a POST request and stream the response from the server
async def aiohttp_transport(host: str, headers: dict, payload: dict, mode: str):
"""
This asynchronous function establishes a streaming HTTP POST connection to the specified server endpoint
using the aiohttp library. It sends a JSON payload containing the request parameters and headers, and
processes the server's streamed response line by line in real time.
The function is designed to handle responses that include two types of data chunks: reasoning text and
content text. Reasoning text represents intermediate thought processes or explanations generated by the AI,
while content text represents the final output or answer.
The function maintains several internal state variables to manage the streaming process:
- 'reasoning' accumulates the reasoning text segments as they arrive incrementally from the server.
- 'reasoning_tag' is a boolean flag that ensures the opening reasoning tag (<think>) is inserted only once.
- 'reasoning_check' is used to detect if the reasoning field is present in the initial streamed data chunk,
which determines whether reasoning processing should occur.
- 'reasoning_done' indicates when the reasoning phase has completed and the function should switch to
accumulating content text.
- 'content' accumulates the main content text after reasoning finishes.
The function reads the response stream asynchronously, decoding each line from bytes to UTF-8 strings,
and filters out any lines that do not start with the expected "data:" prefix. For valid data lines, it
parses the JSON payload to extract incremental updates contained within the 'delta' field of the first
choice in the response.
Upon detecting reasoning text in the delta, and if the current mode allows reasoning output (i.e., mode is
not "/no_think"), the function inserts an opening <think> tag once and appends subsequent reasoning chunks,
carefully removing any duplicate tags to maintain clean formatting. It yields these reasoning segments
progressively to the caller, enabling real-time display of the AI's intermediate thoughts.
When the response transitions from reasoning to content (indicated by the presence of 'content' in the delta),
the function closes the reasoning block with a closing </think> tag if it was opened, yields the final reasoning
block, and then begins accumulating and yielding content chunks. An empty string is yielded as a separator
between reasoning and content for clarity.
If reasoning is absent, completed, or disabled by mode, the function directly accumulates and yields content
chunks as they arrive.
The function includes robust error handling to gracefully skip over any malformed JSON chunks or transient
connection issues without interrupting the streaming process. This ensures continuous and reliable streaming
of AI responses even in the face of occasional data irregularities.
Overall, this function provides a comprehensive and efficient mechanism to stream, parse, and yield AI-generated
reasoning and content in real time, supporting interactive and dynamic user experiences.
"""
# Initialize an empty string to accumulate streamed reasoning text segments from the response
reasoning = "" # This will hold the reasoning text as it is received incrementally
# Boolean flag to track if the opening <think> tag has been inserted to avoid duplicates
reasoning_tag = False # Ensures the reasoning opening tag is added only once
# Variable to check presence of reasoning field in the first chunk of streamed data
reasoning_check = None # Used to determine if reasoning should be processed for this response
# Flag to indicate that reasoning section has finished and content streaming should start
reasoning_done = False # Marks when reasoning is complete and content output begins
# Initialize an empty string to accumulate the main content text from the response
content = "" # Will hold the actual content output after reasoning is finished
# Create an aiohttp client session with no timeout to allow indefinite streaming
async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=None)) as session:
# Send a POST request to the given host with specified headers and JSON payload
async with session.post(host, headers=headers, json=payload) as resp:
resp.raise_for_status() # Raise an exception if HTTP response status is not successful (2xx)
# Iterate asynchronously over each line of bytes in the streamed response content
async for line_bytes in resp.content:
line = line_bytes.decode("utf-8").strip() # Decode bytes to UTF-8 string and strip whitespace
# Skip processing for lines that do not start with the expected "data:" prefix
if not line.startswith("data:"):
continue # Ignore lines without data prefix and continue to next streamed line
try:
# Parse the JSON object from the line after removing the "data:" prefix
data = json.loads(line[5:]) # Convert JSON string to Python dictionary
# Extract the 'delta' dictionary which contains incremental update fields
choice = data["choices"][0]["delta"] # Access the partial update from the streamed response
# Perform a one-time check on the first chunk to detect if reasoning field exists and is non-empty
if reasoning_check is None: # Only check once on the initial chunk received
# Set reasoning_check to empty string if reasoning key exists and has content, else None
reasoning_check = "" if ("reasoning" in choice and choice["reasoning"]) else None
# If reasoning is present, mode allows thinking, reasoning not done, and reasoning text exists
if (
reasoning_check == "" # Reasoning field detected in first chunk
and mode != "/no_think" # Mode does not disable reasoning output
and not reasoning_done # Reasoning section is still in progress
and "reasoning" in choice # Current chunk contains reasoning text
and choice["reasoning"] # Reasoning text is non-empty
):
# Insert opening reasoning tag once and append the first reasoning chunk
if not reasoning_tag: # Only add opening tag once at the start of reasoning
reasoning_tag = True # Mark that opening tag has been inserted
reasoning = reasoning_tag_open(reasoning) # Add opening <think> tag to reasoning string
reasoning += choice["reasoning"] # Append initial reasoning text chunk
else:
# Remove any duplicate opening tags and append subsequent reasoning chunks
reasoning_content = choice["reasoning"].replace("<think>", "") # Clean redundant tags
reasoning += reasoning_content # Append next reasoning segment to accumulated text
yield reasoning # Yield the intermediate reasoning text chunk to the caller
continue # Continue to next streamed line without further processing
# If reasoning is done and content starts arriving, finalize reasoning output
if (
reasoning_check == "" # Reasoning was detected initially
and mode != "/no_think" # Mode allows reasoning output
and not reasoning_done # Reasoning not yet marked as done
and "content" in choice # Current chunk contains content field
and choice["content"] # Content text is non-empty
):
reasoning_done = True # Mark reasoning section as complete
# If reasoning tag was opened, close it properly before yielding final reasoning block
if reasoning_tag: # Only close tag if it was previously opened
reasoning = reasoning_tag_close(reasoning) # Append closing </think> tag
yield reasoning # Yield the complete reasoning text block
yield "" # Yield an empty string as a separator between reasoning and content
content += choice["content"] # Start accumulating content text from this chunk
yield content # Yield the first chunk of content text to the caller
continue # Proceed to next line in the stream
# Handle cases where reasoning is absent, finished, or mode disables reasoning, but content is present
if (
(reasoning_check is None or reasoning_done or mode == "/no_think") # No reasoning or reasoning done or disabled mode
and "content" in choice # Current chunk contains content field
and choice["content"] # Content text is non-empty
):
content += choice["content"] # Append the content chunk to accumulated content string
yield content # Yield the updated content string so far
# Catch any exceptions from JSON parsing errors or connection issues to prevent stream break
except Exception:
continue # Ignore malformed chunks or transient errors and continue processing next lines