File size: 8,015 Bytes
0a38286
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
#
# SPDX-FileCopyrightText: Hadad <hadad@linuxmail.org>
# SPDX-License-Identifier: Apache-2.0
#

import os  # Used for accessing environment variables such as API credentials
import json  # Used for encoding and decoding JSON data
import httpx  # Used to make asynchronous HTTP requests with support for HTTP/2
import gradio as gr  # Used to create the user interface for the chatbot
import random  # Used to randomly shuffle available server hosts
from datetime import datetime, timedelta  # Used to manage time for session expiry
from src.assets.css.reasoning import styles  # Function to apply CSS styling to reasoning output

# Load authentication information stored as a JSON string from an environment variable
auth = json.loads(os.getenv("auth"))

# This dictionary is used to track which server sessions are currently marked as busy
# The key is the session name and the value is the expiration time of the busy state
busy = {}

def server():
    """
    Clean up any expired server sessions from the 'busy' tracking dictionary.
    
    This function checks if any server marked as busy has passed its timeout period.
    If so, it removes them from the busy list, making them available again for use.
    """
    now = datetime.now()
    for session, expiry in list(busy.items()):
        if expiry <= now:
            del busy[session]

def setup(user_message, history):
    """
    Append the current user message to the conversation history.
    
    Parameters:
    - user_message: The new message entered by the user.
    - history: A list of dictionaries containing previous conversation turns.
    
    Returns:
    - A new message list with the latest user message added.
    """
    messages = history.copy()
    messages.append({"role": "user", "content": user_message})
    return messages

def connection(host, messages):
    """
    Send the conversation to a specific server and receive a streamed response.
    
    This function prepares the headers and request payload, then opens a stream
    to the target host using the HTTP/2 protocol. It handles incremental data
    and separates reasoning content from main response content.
    
    Parameters:
    - host: A dictionary containing host settings such as token, model, and endpoint.
    - messages: A list of conversation history messages in proper format.
    
    Yields:
    - Incremental reasoning or content responses as they are received from the stream.
    """
    headers = {
        "Authorization": f"Bearer {host['token']}",  # Use bearer token for authentication
        "Content-Type": "application/json"  # Set the request body to JSON format
    }
    payload = {
        "model": host["model"],  # Specify the AI model to use
        "messages": messages,  # Provide the chat history including the user input
        "stream": True,  # Enable streaming mode to receive response in real time
        "temperature": 0.7,  # Control randomness of output, higher values = more random
        "top_p": 0.8,  # Control diversity via nucleus sampling
        "top_k": 20,  # Consider the top-k most likely tokens at each step
        "repetition_penalty": 1.05  # Penalize repeated tokens to reduce redundancy
    }

    reasoning = ""  # This variable stores any intermediate explanation or reasoning
    content = ""  # This variable accumulates the final response content

    # Use a streaming HTTP client with support for HTTP/2
    with httpx.Client(http2=True) as client:
        with client.stream("POST", host["endpoint"], headers=headers, json=payload) as resp:
            resp.raise_for_status()  # Raise an exception if the response indicates an error
            # Process each line of the streaming response
            for line in resp.iter_lines():
                if not line:
                    continue  # Skip empty lines
                # Remove the "data: " prefix if it exists
                raw = line[6:] if line.startswith("data: ") else line
                try:
                    data = json.loads(raw)  # Parse the line as JSON
                    # Extract incremental reasoning if available
                    delta = data["choices"][0]["delta"]
                    reasoning_response = delta.get("reasoning")
                    if reasoning_response:
                        reasoning += reasoning_response
                        # Yield current accumulated reasoning
                        yield styles(reasoning=reasoning, expanded=True)
                    content_response = delta.get("content")
                    # Extract incremental content response if available
                    if content_response:
                        content += content_response
                        # If reasoning exists, yield both reasoning and content
                        if reasoning:
                            yield styles(reasoning=reasoning, expanded=False) + content
                        else:
                            yield content
                    # Stop streaming if the finish condition is met
                    if data["choices"][0].get("finish_reason") == "stop":
                        return
                except json.JSONDecodeError:
                    continue  # Skip malformed JSON lines

def request(user_message, history):
    """
    Main request handler that distributes the user's question to an available server.
    
    This function validates the input, prepares the message history, rotates through
    available hosts, and forwards the message to one that is not currently busy.
    If a server fails due to a known error, it is temporarily marked as unavailable.
    
    Parameters:
    - user_message: The latest message input by the user.
    - history: The chat history containing all prior messages.
    
    Yields:
    - Either the generated reply or a busy message if all hosts are unavailable.
    """
    # Ignore empty or whitespace-only input
    if not user_message or not user_message.strip():
        yield []
        return
    # Clean up expired server sessions before handling the new request
    server()
    # Append the current message to the conversation
    messages = setup(user_message, history)
    # Identify servers that are not currently marked as busy
    available = [h for h in auth if h["jarvis"] not in busy]
    # Shuffle the available list to randomly balance load across servers
    random.shuffle(available)
    # Try each available host one by one until one succeeds
    for host in available:
        try:
            # Attempt to connect and stream a response
            yield from connection(host, messages)
            return  # Stop after successful response
        except httpx.HTTPStatusError as e:
            # If the failure matches the expected error code, mark the host as busy
            if e.response.status_code == host.get("error"):
                busy[host["jarvis"]] = datetime.now() + timedelta(hours=1)
        except Exception:
            continue  # Ignore all other errors and try the next server
    # If all hosts fail or are busy, notify the user
    yield "The server is currently busy. Please wait a moment or try again later."

# Create a Gradio application
gr.ChatInterface(
    fn=request,  # The function that handles user messages
    type="messages",  # OpenAI style
    chatbot=gr.Chatbot(
        type="messages",  # OpenAI style (duplicate to silence warning log)
        show_copy_button=True,  # Enable a button for users to copy responses
        scale=1  # Use standard display scaling
    ),
    examples=[
        ["Please introduce yourself."],
        ["Give me a short introduction to large language model."],
        ["Please generate a highly complex code snippet on any topic."],
        ["Explain about quantum computers."]
    ],  # Provide sample inputs for users to try
    cache_examples=False,  # Disable caching to ensure responses are always fresh
    concurrency_limit=2  # Queue limit
).launch()  # Start the app and open the interface