# # SPDX-FileCopyrightText: Hadad # SPDX-License-Identifier: Apache-2.0 # import os # Used for accessing environment variables such as API credentials import json # Used for encoding and decoding JSON data import httpx # Used to make asynchronous HTTP requests with support for HTTP/2 import gradio as gr # Used to create the user interface for the chatbot import random # Used to randomly shuffle available server hosts from datetime import datetime, timedelta # Used to manage time for session expiry from src.assets.css.reasoning import styles # Function to apply CSS styling to reasoning output # Load authentication information stored as a JSON string from an environment variable auth = json.loads(os.getenv("auth")) # This dictionary is used to track which server sessions are currently marked as busy # The key is the session name and the value is the expiration time of the busy state busy = {} def server(): """ Clean up any expired server sessions from the 'busy' tracking dictionary. This function checks if any server marked as busy has passed its timeout period. If so, it removes them from the busy list, making them available again for use. """ now = datetime.now() for session, expiry in list(busy.items()): if expiry <= now: del busy[session] def setup(user_message, history): """ Append the current user message to the conversation history. Parameters: - user_message: The new message entered by the user. - history: A list of dictionaries containing previous conversation turns. Returns: - A new message list with the latest user message added. """ messages = history.copy() messages.append({"role": "user", "content": user_message}) return messages def connection(host, messages): """ Send the conversation to a specific server and receive a streamed response. This function prepares the headers and request payload, then opens a stream to the target host using the HTTP/2 protocol. It handles incremental data and separates reasoning content from main response content. Parameters: - host: A dictionary containing host settings such as token, model, and endpoint. - messages: A list of conversation history messages in proper format. Yields: - Incremental reasoning or content responses as they are received from the stream. """ headers = { "Authorization": f"Bearer {host['token']}", # Use bearer token for authentication "Content-Type": "application/json" # Set the request body to JSON format } payload = { "model": host["model"], # Specify the AI model to use "messages": messages, # Provide the chat history including the user input "stream": True, # Enable streaming mode to receive response in real time "temperature": 0.7, # Control randomness of output, higher values = more random "top_p": 0.8, # Control diversity via nucleus sampling "top_k": 20, # Consider the top-k most likely tokens at each step "repetition_penalty": 1.05 # Penalize repeated tokens to reduce redundancy } reasoning = "" # This variable stores any intermediate explanation or reasoning content = "" # This variable accumulates the final response content # Use a streaming HTTP client with support for HTTP/2 with httpx.Client(http2=True) as client: with client.stream("POST", host["endpoint"], headers=headers, json=payload) as resp: resp.raise_for_status() # Raise an exception if the response indicates an error # Process each line of the streaming response for line in resp.iter_lines(): if not line: continue # Skip empty lines # Remove the "data: " prefix if it exists raw = line[6:] if line.startswith("data: ") else line try: data = json.loads(raw) # Parse the line as JSON # Extract incremental reasoning if available delta = data["choices"][0]["delta"] reasoning_response = delta.get("reasoning") if reasoning_response: reasoning += reasoning_response # Yield current accumulated reasoning yield styles(reasoning=reasoning, expanded=True) content_response = delta.get("content") # Extract incremental content response if available if content_response: content += content_response # If reasoning exists, yield both reasoning and content if reasoning: yield styles(reasoning=reasoning, expanded=False) + content else: yield content # Stop streaming if the finish condition is met if data["choices"][0].get("finish_reason") == "stop": return except json.JSONDecodeError: continue # Skip malformed JSON lines def request(user_message, history): """ Main request handler that distributes the user's question to an available server. This function validates the input, prepares the message history, rotates through available hosts, and forwards the message to one that is not currently busy. If a server fails due to a known error, it is temporarily marked as unavailable. Parameters: - user_message: The latest message input by the user. - history: The chat history containing all prior messages. Yields: - Either the generated reply or a busy message if all hosts are unavailable. """ # Ignore empty or whitespace-only input if not user_message or not user_message.strip(): yield [] return # Clean up expired server sessions before handling the new request server() # Append the current message to the conversation messages = setup(user_message, history) # Identify servers that are not currently marked as busy available = [h for h in auth if h["jarvis"] not in busy] # Shuffle the available list to randomly balance load across servers random.shuffle(available) # Try each available host one by one until one succeeds for host in available: try: # Attempt to connect and stream a response yield from connection(host, messages) return # Stop after successful response except httpx.HTTPStatusError as e: # If the failure matches the expected error code, mark the host as busy if e.response.status_code == host.get("error"): busy[host["jarvis"]] = datetime.now() + timedelta(hours=1) except Exception: continue # Ignore all other errors and try the next server # If all hosts fail or are busy, notify the user yield "The server is currently busy. Please wait a moment or try again later." # Create a Gradio application gr.ChatInterface( fn=request, # The function that handles user messages type="messages", # OpenAI style chatbot=gr.Chatbot( type="messages", # OpenAI style (duplicate to silence warning log) show_copy_button=True, # Enable a button for users to copy responses scale=1 # Use standard display scaling ), examples=[ ["Please introduce yourself."], ["Give me a short introduction to large language model."], ["Please generate a highly complex code snippet on any topic."], ["Explain about quantum computers."] ], # Provide sample inputs for users to try cache_examples=False, # Disable caching to ensure responses are always fresh concurrency_limit=2 # Queue limit ).launch() # Start the app and open the interface