Spaces:
Running
Running
# | |
# SPDX-FileCopyrightText: Hadad <hadad@linuxmail.org> | |
# SPDX-License-Identifier: Apache-2.0 | |
# | |
import random # Import the random module to enable random selection of elements from a list, which helps in load balancing host assignments | |
from datetime import datetime # Import the datetime class to work with timestamps, particularly to check and compare current UTC time against host busy status | |
from typing import Dict, List # Import type hinting classes for dictionaries and lists to improve code readability and static analysis, though not explicitly used here | |
from config import auth # Import the 'auth' configuration, which is expected to be a list of host dictionaries containing credentials and identifiers for available hosts | |
from src.utils.helper import busy, mark # Import 'busy', a dictionary tracking host busy states with expiration timestamps, and 'mark', a function to update the busy status of hosts when assigned | |
# Initialize a global dictionary named 'mapping' that will maintain a persistent association between session IDs and their assigned hosts | |
mapping = {} # This dictionary stores session_id keys mapped to host dictionaries, ensuring consistent host allocation for repeated requests within the same session | |
# Define a function to get an available hosts for a given session | |
def get_host(session_id: str, exclude_hosts: List[str] = None) -> dict: | |
""" | |
Obtain a host for the specified session ID, guaranteeing that the function never raises exceptions or returns None. | |
This function implements a robust mechanism to ensure a host is always returned by dynamically excluding busy or failed hosts, | |
and retrying until a suitable host becomes available. | |
Args: | |
session_id (str): A unique string identifier representing the current user or process session requesting a host. | |
exclude_hosts (List[str], optional): An optional list of host identifiers to be excluded from selection, useful for avoiding problematic or previously failed hosts. Defaults to None. | |
Returns: | |
dict: A dictionary representing the selected host from the 'auth' configuration, including its credentials and identifier. | |
Detailed Explanation: | |
The function first checks if the session already has an assigned host in the 'mapping' dictionary. If so, it verifies whether | |
the assigned host is neither excluded nor currently busy. If the assigned host is available, it refreshes its busy status to | |
extend its reservation and returns it immediately, ensuring session affinity. | |
If the assigned host is excluded or busy, the mapping is deleted to allow reassignment. The function then filters the list of | |
all hosts from 'auth' to exclude busy hosts, explicitly excluded hosts, and hosts already tried in the current function call. | |
From the filtered list, it randomly selects a host to distribute load evenly. | |
If no suitable hosts are found, the function updates the set of tried hosts to include all currently busy and excluded hosts, | |
and clears the exclude list to retry all hosts again. This loop continues indefinitely until a host becomes available, | |
ensuring that the function never fails or returns None. | |
This design supports high availability and fault tolerance by dynamically adapting to host availability and avoiding deadlocks | |
or infinite retries on unavailable hosts. | |
""" | |
# Initialize exclude_hosts as an empty list if no value is provided to avoid errors during host filtering | |
if exclude_hosts is None: | |
exclude_hosts = [] # Assign an empty list to exclude_hosts to safely perform membership checks and list operations later | |
# Create a set to track hosts that have been attempted and found unsuitable during this invocation of the function | |
tried_hosts = set() # Using a set for efficient membership testing and to prevent repeated attempts on the same hosts within this call | |
# Enter an infinite loop that will only exit when a valid host is found and returned, ensuring robustness and continuous operation | |
while True: | |
# Check if the current session ID already has a host assigned in the mapping dictionary | |
if session_id in mapping: | |
assigned_host = mapping[session_id] # Retrieve the previously assigned host dictionary for this session to maintain session consistency | |
# Determine if the assigned host is not in the exclude list and is either not busy or its busy period has expired | |
if ( | |
assigned_host["jarvis"] not in exclude_hosts # Confirm the assigned host is not explicitly excluded for this request | |
and ( | |
assigned_host["jarvis"] not in busy # Check if the host is not currently marked as busy | |
or busy[assigned_host["jarvis"]] <= datetime.utcnow() # Alternatively, check if the host's busy timestamp has expired, meaning it is free | |
) | |
): | |
# Since the assigned host is available, update its busy timestamp to extend its reservation for this session | |
mark(assigned_host["jarvis"]) # Call the helper function to refresh the busy status, preventing other sessions from using it simultaneously | |
return assigned_host # Return the assigned host dictionary immediately to maintain session affinity and reduce latency | |
else: | |
# If the assigned host is either excluded or currently busy, remove the mapping to allow reassignment of a new host | |
del mapping[session_id] # Delete the session-to-host association to enable the selection of a different host in subsequent steps | |
# Capture the current UTC time once to use for filtering hosts based on their busy status | |
now = datetime.utcnow() # Store the current time to compare against busy timestamps for all hosts | |
# Generate a list of hosts that are eligible for selection by applying multiple filters: | |
# 1. Hosts not currently busy or whose busy period has expired | |
# 2. Hosts not included in the exclude_hosts list provided by the caller | |
# 3. Hosts not already attempted in this function call to avoid redundant retries | |
available_hosts = [ | |
h for h in auth # Iterate over all hosts defined in the authentication configuration list | |
if h["jarvis"] not in busy or busy[h["jarvis"]] <= now # Include only hosts that are free or whose busy reservation has expired | |
if h["jarvis"] not in exclude_hosts # Exclude hosts explicitly marked to be avoided for this selection | |
if h["jarvis"] not in tried_hosts # Exclude hosts that have already been tried and failed during this function call to prevent infinite loops | |
] | |
# If there are any hosts available after filtering, proceed to select one randomly | |
if available_hosts: | |
selected = random.choice(available_hosts) # Randomly pick one host from the available list to distribute load fairly among hosts | |
# Store the selected host in the global mapping dictionary to maintain session affinity for future requests with the same session ID | |
mapping[session_id] = selected # Cache the selected host so subsequent calls with this session ID return the same host | |
# Mark the selected host as busy by updating its busy timestamp, indicating it is currently reserved for this session | |
mark(selected["jarvis"]) # Update the busy dictionary to reflect that this host is now occupied, preventing concurrent use | |
# Return the selected host dictionary to the caller, completing the host assignment process for the session | |
return selected | |
else: | |
# If no hosts are available that have not already been tried, update the tried_hosts set to include all hosts currently busy or excluded | |
# This prevents immediate reattempts on these hosts in the next iteration, allowing time for busy hosts to become free | |
tried_hosts.update( | |
h["jarvis"] for h in auth # Iterate over all hosts in the auth list to identify those currently busy | |
if h["jarvis"] in busy and busy[h["jarvis"]] > now # Add hosts whose busy timestamp is still in the future, indicating they are occupied | |
) | |
tried_hosts.update(exclude_hosts) # Also add all explicitly excluded hosts to the tried_hosts set to avoid retrying them immediately | |
# Create a set of all host identifiers from the auth configuration to compare against tried_hosts | |
all_host_ids = {h["jarvis"] for h in auth} # Extract all host IDs to check if all hosts have been attempted | |
# If the set of tried hosts now includes every host in the configuration, clear the tried_hosts set to reset the retry mechanism | |
if tried_hosts >= all_host_ids: | |
tried_hosts.clear() # Clear the tried_hosts set to allow all hosts to be considered again, enabling retries after some time | |
# Clear the exclude_hosts list as well to allow all hosts to be eligible for selection in the next iteration | |
exclude_hosts.clear() # Reset the exclude list so that no hosts are excluded in the upcoming retry cycle, maximizing availability |