Spaces:
Running
Running
from collections.abc import Callable | |
from functools import wraps | |
from time import sleep | |
import asyncio | |
def async_tasks(*tasks): | |
async def gather(*t): | |
t = [await _ for _ in t] | |
return await asyncio.gather(*t) | |
loop = asyncio.new_event_loop() | |
results = loop.run_until_complete(gather(*tasks)) | |
loop.stop() | |
loop.close() | |
return results | |
def retry_on_status( | |
num_retries: int = 3, | |
backoff_factor: float = 0.5, | |
max_backoff: float | None = None, | |
retry_statuses: tuple[int, ...] = (501, 503) | |
): | |
""" | |
Retry decorator for functions making httpx requests. | |
Retries on specific HTTP status codes with exponential backoff. | |
Args: | |
num_retries (int): Max number of retries. | |
backoff_factor (float): Multiplier for delay (e.g., 0.5, 1, etc.). | |
max_backoff (float, optional): Cap on the backoff delay in seconds. | |
retry_statuses (tuple): HTTP status codes to retry on. | |
""" | |
def decorator(func: Callable): | |
if asyncio.iscoroutinefunction(func): | |
# Async version | |
async def async_wrapper(*args, **kwargs): | |
for attempt in range(num_retries + 1): | |
response = await func(*args, **kwargs) | |
if response.status_code not in retry_statuses: | |
return response | |
if attempt < num_retries: | |
delay = min(backoff_factor * (2 ** attempt), max_backoff or float('inf')) | |
await asyncio.sleep(delay) | |
return response | |
return async_wrapper | |
# Sync version | |
def sync_wrapper(*args, **kwargs): | |
for attempt in range(num_retries + 1): | |
response = func(*args, **kwargs) | |
if response.status_code not in retry_statuses: | |
return response | |
if attempt < num_retries: | |
delay = min(backoff_factor * (2 ** attempt), max_backoff or float('inf')) | |
sleep(delay) | |
return response | |
return sync_wrapper | |
return decorator | |