hadadrjt's picture
qwen: Initial.
0a38286
#
# SPDX-FileCopyrightText: Hadad <hadad@linuxmail.org>
# SPDX-License-Identifier: Apache-2.0
#
import os # Used for accessing environment variables such as API credentials
import json # Used for encoding and decoding JSON data
import httpx # Used to make asynchronous HTTP requests with support for HTTP/2
import gradio as gr # Used to create the user interface for the chatbot
import random # Used to randomly shuffle available server hosts
from datetime import datetime, timedelta # Used to manage time for session expiry
from src.assets.css.reasoning import styles # Function to apply CSS styling to reasoning output
# Load authentication information stored as a JSON string from an environment variable
auth = json.loads(os.getenv("auth"))
# This dictionary is used to track which server sessions are currently marked as busy
# The key is the session name and the value is the expiration time of the busy state
busy = {}
def server():
"""
Clean up any expired server sessions from the 'busy' tracking dictionary.
This function checks if any server marked as busy has passed its timeout period.
If so, it removes them from the busy list, making them available again for use.
"""
now = datetime.now()
for session, expiry in list(busy.items()):
if expiry <= now:
del busy[session]
def setup(user_message, history):
"""
Append the current user message to the conversation history.
Parameters:
- user_message: The new message entered by the user.
- history: A list of dictionaries containing previous conversation turns.
Returns:
- A new message list with the latest user message added.
"""
messages = history.copy()
messages.append({"role": "user", "content": user_message})
return messages
def connection(host, messages):
"""
Send the conversation to a specific server and receive a streamed response.
This function prepares the headers and request payload, then opens a stream
to the target host using the HTTP/2 protocol. It handles incremental data
and separates reasoning content from main response content.
Parameters:
- host: A dictionary containing host settings such as token, model, and endpoint.
- messages: A list of conversation history messages in proper format.
Yields:
- Incremental reasoning or content responses as they are received from the stream.
"""
headers = {
"Authorization": f"Bearer {host['token']}", # Use bearer token for authentication
"Content-Type": "application/json" # Set the request body to JSON format
}
payload = {
"model": host["model"], # Specify the AI model to use
"messages": messages, # Provide the chat history including the user input
"stream": True, # Enable streaming mode to receive response in real time
"temperature": 0.7, # Control randomness of output, higher values = more random
"top_p": 0.8, # Control diversity via nucleus sampling
"top_k": 20, # Consider the top-k most likely tokens at each step
"repetition_penalty": 1.05 # Penalize repeated tokens to reduce redundancy
}
reasoning = "" # This variable stores any intermediate explanation or reasoning
content = "" # This variable accumulates the final response content
# Use a streaming HTTP client with support for HTTP/2
with httpx.Client(http2=True) as client:
with client.stream("POST", host["endpoint"], headers=headers, json=payload) as resp:
resp.raise_for_status() # Raise an exception if the response indicates an error
# Process each line of the streaming response
for line in resp.iter_lines():
if not line:
continue # Skip empty lines
# Remove the "data: " prefix if it exists
raw = line[6:] if line.startswith("data: ") else line
try:
data = json.loads(raw) # Parse the line as JSON
# Extract incremental reasoning if available
delta = data["choices"][0]["delta"]
reasoning_response = delta.get("reasoning")
if reasoning_response:
reasoning += reasoning_response
# Yield current accumulated reasoning
yield styles(reasoning=reasoning, expanded=True)
content_response = delta.get("content")
# Extract incremental content response if available
if content_response:
content += content_response
# If reasoning exists, yield both reasoning and content
if reasoning:
yield styles(reasoning=reasoning, expanded=False) + content
else:
yield content
# Stop streaming if the finish condition is met
if data["choices"][0].get("finish_reason") == "stop":
return
except json.JSONDecodeError:
continue # Skip malformed JSON lines
def request(user_message, history):
"""
Main request handler that distributes the user's question to an available server.
This function validates the input, prepares the message history, rotates through
available hosts, and forwards the message to one that is not currently busy.
If a server fails due to a known error, it is temporarily marked as unavailable.
Parameters:
- user_message: The latest message input by the user.
- history: The chat history containing all prior messages.
Yields:
- Either the generated reply or a busy message if all hosts are unavailable.
"""
# Ignore empty or whitespace-only input
if not user_message or not user_message.strip():
yield []
return
# Clean up expired server sessions before handling the new request
server()
# Append the current message to the conversation
messages = setup(user_message, history)
# Identify servers that are not currently marked as busy
available = [h for h in auth if h["jarvis"] not in busy]
# Shuffle the available list to randomly balance load across servers
random.shuffle(available)
# Try each available host one by one until one succeeds
for host in available:
try:
# Attempt to connect and stream a response
yield from connection(host, messages)
return # Stop after successful response
except httpx.HTTPStatusError as e:
# If the failure matches the expected error code, mark the host as busy
if e.response.status_code == host.get("error"):
busy[host["jarvis"]] = datetime.now() + timedelta(hours=1)
except Exception:
continue # Ignore all other errors and try the next server
# If all hosts fail or are busy, notify the user
yield "The server is currently busy. Please wait a moment or try again later."
# Create a Gradio application
gr.ChatInterface(
fn=request, # The function that handles user messages
type="messages", # OpenAI style
chatbot=gr.Chatbot(
type="messages", # OpenAI style (duplicate to silence warning log)
show_copy_button=True, # Enable a button for users to copy responses
scale=1 # Use standard display scaling
),
examples=[
["Please introduce yourself."],
["Give me a short introduction to large language model."],
["Please generate a highly complex code snippet on any topic."],
["Explain about quantum computers."]
], # Provide sample inputs for users to try
cache_examples=False, # Disable caching to ensure responses are always fresh
concurrency_limit=2 # Queue limit
).launch() # Start the app and open the interface