from collections.abc import Callable from functools import wraps from time import sleep import asyncio def async_tasks(*tasks): async def gather(*t): t = [await _ for _ in t] return await asyncio.gather(*t) loop = asyncio.new_event_loop() results = loop.run_until_complete(gather(*tasks)) loop.stop() loop.close() return results def retry_on_status( num_retries: int = 3, backoff_factor: float = 0.5, max_backoff: float | None = None, retry_statuses: tuple[int, ...] = (501, 503) ): """ Retry decorator for functions making httpx requests. Retries on specific HTTP status codes with exponential backoff. Args: num_retries (int): Max number of retries. backoff_factor (float): Multiplier for delay (e.g., 0.5, 1, etc.). max_backoff (float, optional): Cap on the backoff delay in seconds. retry_statuses (tuple): HTTP status codes to retry on. """ def decorator(func: Callable): if asyncio.iscoroutinefunction(func): # Async version @wraps(func) async def async_wrapper(*args, **kwargs): for attempt in range(num_retries + 1): response = await func(*args, **kwargs) if response.status_code not in retry_statuses: return response if attempt < num_retries: delay = min(backoff_factor * (2 ** attempt), max_backoff or float('inf')) await asyncio.sleep(delay) return response return async_wrapper # Sync version @wraps(func) def sync_wrapper(*args, **kwargs): for attempt in range(num_retries + 1): response = func(*args, **kwargs) if response.status_code not in retry_statuses: return response if attempt < num_retries: delay = min(backoff_factor * (2 ** attempt), max_backoff or float('inf')) sleep(delay) return response return sync_wrapper return decorator