Spaces:
Running
Running
""" | |
Chat functionality handler for AI-Inferoxy AI Hub. | |
Handles chat completion requests with streaming responses. | |
""" | |
import os | |
import gradio as gr | |
import time | |
import threading | |
from concurrent.futures import ThreadPoolExecutor, TimeoutError as FutureTimeoutError | |
from huggingface_hub import InferenceClient | |
from huggingface_hub.errors import HfHubHTTPError | |
from requests.exceptions import ConnectionError, Timeout, RequestException | |
from hf_token_utils import get_proxy_token, report_token_status | |
from utils import ( | |
validate_proxy_key, | |
format_error_message, | |
render_with_reasoning_toggle | |
) | |
# Timeout configuration for inference requests | |
INFERENCE_TIMEOUT = 120 # 2 minutes max for inference | |
def chat_respond( | |
message, | |
history: list[dict[str, str]], | |
system_message, | |
model_name, | |
provider_override, | |
max_tokens, | |
temperature, | |
top_p, | |
client_name: str | None = None, | |
): | |
""" | |
Chat completion function using AI-Inferoxy token management. | |
""" | |
# Validate proxy API key | |
is_valid, error_msg = validate_proxy_key() | |
if not is_valid: | |
yield error_msg | |
return | |
proxy_api_key = os.getenv("PROXY_KEY") | |
token_id = None | |
try: | |
# Get token from AI-Inferoxy proxy server with timeout handling | |
print(f"🔑 Chat: Requesting token from proxy...") | |
token, token_id = get_proxy_token(api_key=proxy_api_key) | |
print(f"✅ Chat: Got token: {token_id}") | |
# Enforce explicit provider selection via dropdown | |
model = model_name | |
provider = provider_override or "auto" | |
print(f"🤖 Chat: Using model='{model}', provider='{provider if provider else 'auto'}'") | |
# Prepare messages first | |
messages = [{"role": "system", "content": system_message}] | |
messages.extend(history) | |
messages.append({"role": "user", "content": message}) | |
print(f"💬 Chat: Prepared {len(messages)} messages, creating client...") | |
# Create client with provider (auto if none specified) and always pass model | |
client = InferenceClient( | |
provider=provider if provider else "auto", | |
api_key=token | |
) | |
print(f"🚀 Chat: Client created, starting inference with timeout...") | |
chat_completion_kwargs = { | |
"model": model, | |
"messages": messages, | |
"max_tokens": max_tokens, | |
"stream": True, | |
"temperature": temperature, | |
"top_p": top_p, | |
} | |
response = "" | |
print(f"📡 Chat: Making streaming request with {INFERENCE_TIMEOUT}s timeout...") | |
# Create streaming function for timeout handling | |
def create_stream(): | |
return client.chat_completion(**chat_completion_kwargs) | |
# Execute with timeout using ThreadPoolExecutor | |
with ThreadPoolExecutor(max_workers=1) as executor: | |
future = executor.submit(create_stream) | |
try: | |
# Get the stream with timeout | |
stream = future.result(timeout=INFERENCE_TIMEOUT) | |
print(f"🔄 Chat: Got stream, starting to iterate...") | |
# Track streaming time to detect hangs | |
last_token_time = time.time() | |
token_timeout = 30 # 30 seconds between tokens | |
for message in stream: | |
current_time = time.time() | |
# Check if we've been waiting too long for a token | |
if current_time - last_token_time > token_timeout: | |
raise TimeoutError(f"No response received for {token_timeout} seconds during streaming") | |
choices = message.choices | |
token_content = "" | |
if len(choices) and choices[0].delta.content: | |
token_content = choices[0].delta.content | |
last_token_time = current_time # Reset timer when we get content | |
response += token_content | |
yield response | |
except FutureTimeoutError: | |
future.cancel() # Cancel the running task | |
raise TimeoutError(f"Chat request timed out after {INFERENCE_TIMEOUT} seconds") | |
# Report successful token usage | |
if token_id: | |
report_token_status(token_id, "success", api_key=proxy_api_key, client_name=client_name) | |
except ConnectionError as e: | |
# Handle proxy connection errors | |
error_msg = f"Cannot connect to AI-Inferoxy server: {str(e)}" | |
print(f"🔌 Chat connection error: {error_msg}") | |
if token_id: | |
report_token_status(token_id, "error", error_msg, api_key=proxy_api_key, client_name=client_name) | |
yield format_error_message("Connection Error", "Unable to connect to the proxy server. Please check if it's running.") | |
except TimeoutError as e: | |
# Handle timeout errors | |
error_msg = f"Request timed out: {str(e)}" | |
print(f"⏰ Chat timeout: {error_msg}") | |
if token_id: | |
report_token_status(token_id, "error", error_msg, api_key=proxy_api_key, client_name=client_name) | |
yield format_error_message("Timeout Error", "The request took too long. The server may be overloaded. Please try again.") | |
except HfHubHTTPError as e: | |
# Handle HuggingFace API errors | |
error_msg = str(e) | |
print(f"🤗 Chat HF error: {error_msg}") | |
if token_id: | |
report_token_status(token_id, "error", error_msg, api_key=proxy_api_key, client_name=client_name) | |
# Provide more user-friendly error messages | |
if "401" in error_msg: | |
yield format_error_message("Authentication Error", "Invalid or expired API token. The proxy will provide a new token on retry.") | |
elif "402" in error_msg: | |
yield format_error_message("Quota Exceeded", "API quota exceeded. The proxy will try alternative providers.") | |
elif "429" in error_msg: | |
yield format_error_message("Rate Limited", "Too many requests. Please wait a moment and try again.") | |
else: | |
yield format_error_message("HuggingFace API Error", error_msg) | |
except Exception as e: | |
# Handle all other errors | |
error_msg = str(e) | |
print(f"❌ Chat unexpected error: {error_msg}") | |
if token_id: | |
report_token_status(token_id, "error", error_msg, api_key=proxy_api_key) | |
yield format_error_message("Unexpected Error", f"An unexpected error occurred: {error_msg}") | |
def handle_chat_submit(message, history, system_msg, model_name, provider, max_tokens, temperature, top_p, hf_token: gr.OAuthToken = None, hf_profile: gr.OAuthProfile = None): | |
""" | |
Handle chat submission and manage conversation history with streaming. | |
""" | |
if not message.strip(): | |
yield history, "" | |
return | |
# Require sign-in: if no token present, prompt login | |
access_token = getattr(hf_token, "token", None) if hf_token is not None else None | |
username = getattr(hf_profile, "username", None) if hf_profile is not None else None | |
if not access_token: | |
assistant_response = format_error_message("Access Required", "Please sign in with Hugging Face (sidebar Login button).") | |
current_history = history + [{"role": "assistant", "content": assistant_response}] | |
yield current_history, "" | |
return | |
# Add user message to history | |
history = history + [{"role": "user", "content": message}] | |
# Generate response with streaming | |
response_generator = chat_respond( | |
message, | |
history[:-1], # Don't include the current message in history for the function | |
system_msg, | |
model_name, | |
provider, | |
max_tokens, | |
temperature, | |
top_p, | |
client_name=username | |
) | |
# Stream the assistant response token by token | |
assistant_response = "" | |
for partial_response in response_generator: | |
assistant_response = render_with_reasoning_toggle(partial_response, True) | |
# Update history with the current partial response and yield it | |
current_history = history + [{"role": "assistant", "content": assistant_response}] | |
yield current_history, "" | |
def handle_chat_retry(history, system_msg, model_name, provider, max_tokens, temperature, top_p, hf_token: gr.OAuthToken = None, hf_profile: gr.OAuthProfile = None, retry_data=None): | |
""" | |
Retry the assistant response for the selected message. | |
Works with gr.Chatbot.retry() which provides retry_data.index for the message. | |
""" | |
# Require sign-in: if no token present, prompt login | |
access_token = getattr(hf_token, "token", None) if hf_token is not None else None | |
username = getattr(hf_profile, "username", None) if hf_profile is not None else None | |
if not access_token: | |
assistant_response = format_error_message("Access Required", "Please sign in with Hugging Face (sidebar Login button).") | |
current_history = (history or []) + [{"role": "assistant", "content": assistant_response}] | |
yield current_history | |
return | |
# Guard: empty history | |
if not history: | |
yield history | |
return | |
# Determine which assistant message index to retry | |
retry_index = None | |
try: | |
retry_index = getattr(retry_data, "index", None) | |
except Exception: | |
retry_index = None | |
if retry_index is None: | |
# Fallback to last assistant message | |
retry_index = len(history) - 1 | |
# Trim history up to the message being retried (exclude that assistant msg) | |
trimmed_history = list(history[:retry_index]) | |
# Find the most recent user message before retry_index | |
last_user_idx = None | |
for idx in range(retry_index - 1, -1, -1): | |
if trimmed_history[idx].get("role") == "user": | |
last_user_idx = idx | |
break | |
# Nothing to retry if no prior user message | |
if last_user_idx is None: | |
yield history | |
return | |
# Message to retry and prior conversation context (before that user msg) | |
message = trimmed_history[last_user_idx].get("content", "") | |
prior_history = trimmed_history[:last_user_idx] | |
if not message.strip(): | |
yield history | |
return | |
# Stream a new assistant response | |
response_generator = chat_respond( | |
message, | |
prior_history, | |
system_msg, | |
model_name, | |
provider, | |
max_tokens, | |
temperature, | |
top_p, | |
client_name=username | |
) | |
assistant_response = "" | |
for partial_response in response_generator: | |
assistant_response = render_with_reasoning_toggle(partial_response, True) | |
current_history = trimmed_history + [{"role": "assistant", "content": assistant_response}] | |
yield current_history | |