Spaces:
Paused
Paused
# | |
# SPDX-FileCopyrightText: Hadad <hadad@linuxmail.org> | |
# SPDX-License-Identifier: Apache-2.0 | |
# | |
import os # Used for accessing environment variables such as API credentials | |
import json # Used for encoding and decoding JSON data | |
import httpx # Used to make asynchronous HTTP requests with support for HTTP/2 | |
import gradio as gr # Used to create the user interface for the chatbot | |
import random # Used to randomly shuffle available server hosts | |
from datetime import datetime, timedelta # Used to manage time for session expiry | |
from src.assets.css.reasoning import styles # Function to apply CSS styling to reasoning output | |
# Load authentication information stored as a JSON string from an environment variable | |
auth = json.loads(os.getenv("auth")) | |
# This dictionary is used to track which server sessions are currently marked as busy | |
# The key is the session name and the value is the expiration time of the busy state | |
busy = {} | |
def server(): | |
""" | |
Clean up any expired server sessions from the 'busy' tracking dictionary. | |
This function checks if any server marked as busy has passed its timeout period. | |
If so, it removes them from the busy list, making them available again for use. | |
""" | |
now = datetime.now() | |
for session, expiry in list(busy.items()): | |
if expiry <= now: | |
del busy[session] | |
def setup(user_message, history): | |
""" | |
Append the current user message to the conversation history. | |
Parameters: | |
- user_message: The new message entered by the user. | |
- history: A list of dictionaries containing previous conversation turns. | |
Returns: | |
- A new message list with the latest user message added. | |
""" | |
messages = history.copy() | |
messages.append({"role": "user", "content": user_message}) | |
return messages | |
def connection(host, messages): | |
""" | |
Send the conversation to a specific server and receive a streamed response. | |
This function prepares the headers and request payload, then opens a stream | |
to the target host using the HTTP/2 protocol. It handles incremental data | |
and separates reasoning content from main response content. | |
Parameters: | |
- host: A dictionary containing host settings such as token, model, and endpoint. | |
- messages: A list of conversation history messages in proper format. | |
Yields: | |
- Incremental reasoning or content responses as they are received from the stream. | |
""" | |
headers = { | |
"Authorization": f"Bearer {host['token']}", # Use bearer token for authentication | |
"Content-Type": "application/json" # Set the request body to JSON format | |
} | |
payload = { | |
"model": host["model"], # Specify the AI model to use | |
"messages": messages, # Provide the chat history including the user input | |
"stream": True # Enable streaming mode to receive response in real time | |
} | |
reasoning = "" # This variable stores any intermediate explanation or reasoning | |
content = "" # This variable accumulates the final response content | |
# Use a streaming HTTP client with support for HTTP/2 | |
with httpx.Client(http2=True) as client: | |
with client.stream("POST", host["endpoint"], headers=headers, json=payload) as resp: | |
resp.raise_for_status() # Raise an exception if the response indicates an error | |
# Process each line of the streaming response | |
for line in resp.iter_lines(): | |
if not line: | |
continue # Skip empty lines | |
# Remove the "data: " prefix if it exists | |
raw = line[6:] if line.startswith("data: ") else line | |
try: | |
data = json.loads(raw) # Parse the line as JSON | |
# Extract incremental reasoning if available | |
delta = data["choices"][0]["delta"] | |
reasoning_response = delta.get("reasoning") | |
if reasoning_response: | |
reasoning += reasoning_response | |
# Yield current accumulated reasoning | |
yield styles(reasoning=reasoning, expanded=True) | |
content_response = delta.get("content") | |
# Extract incremental content response if available | |
if content_response: | |
content += content_response | |
# If reasoning exists, yield both reasoning and content | |
if reasoning: | |
yield styles(reasoning=reasoning, expanded=False) + content | |
else: | |
yield content | |
# Stop streaming if the finish condition is met | |
if data["choices"][0].get("finish_reason") == "stop": | |
return | |
except json.JSONDecodeError: | |
continue # Skip malformed JSON lines | |
def request(user_message, history): | |
""" | |
Main request handler that distributes the user's question to an available server. | |
This function validates the input, prepares the message history, rotates through | |
available hosts, and forwards the message to one that is not currently busy. | |
If a server fails due to a known error, it is temporarily marked as unavailable. | |
Parameters: | |
- user_message: The latest message input by the user. | |
- history: The chat history containing all prior messages. | |
Yields: | |
- Either the generated reply or a busy message if all hosts are unavailable. | |
""" | |
# Ignore empty or whitespace-only input | |
if not user_message or not user_message.strip(): | |
yield [] | |
return | |
# Clean up expired server sessions before handling the new request | |
server() | |
# Append the current message to the conversation | |
messages = setup(user_message, history) | |
# Identify servers that are not currently marked as busy | |
available = [h for h in auth if h["jarvis"] not in busy] | |
# Shuffle the available list to randomly balance load across servers | |
random.shuffle(available) | |
# Try each available host one by one until one succeeds | |
for host in available: | |
try: | |
# Attempt to connect and stream a response | |
yield from connection(host, messages) | |
return # Stop after successful response | |
except httpx.HTTPStatusError as e: | |
# If the failure matches the expected error code, mark the host as busy | |
if e.response.status_code == host.get("error"): | |
busy[host["jarvis"]] = datetime.now() + timedelta(hours=1) | |
except Exception: | |
continue # Ignore all other errors and try the next server | |
# If all hosts fail or are busy, notify the user | |
yield "The server is currently busy. Please wait a moment or try again later." | |
# Create a Gradio application | |
gr.ChatInterface( | |
fn=request, # The function that handles user messages | |
type="messages", # OpenAI style | |
chatbot=gr.Chatbot( | |
type="messages", # OpenAI style (duplicate to silence warning log) | |
show_copy_button=True, # Enable a button for users to copy responses | |
scale=1 # Use standard display scaling | |
), | |
examples=[ | |
["Please introduce yourself."], | |
["Give me a short introduction to large language model."], | |
["Please generate a highly complex code snippet on any topic."], | |
["Explain about quantum computers."] | |
], # Provide sample inputs for users to try | |
cache_examples=False, # Disable caching to ensure responses are always fresh | |
concurrency_limit=2 # Queue limit | |
).launch() # Start the app and open the interface |