brainsqueeze's picture
v2 of public chat
ef088c2 verified
raw
history blame
2.17 kB
from collections.abc import Callable
from functools import wraps
from time import sleep
import asyncio
def async_tasks(*tasks):
async def gather(*t):
t = [await _ for _ in t]
return await asyncio.gather(*t)
loop = asyncio.new_event_loop()
results = loop.run_until_complete(gather(*tasks))
loop.stop()
loop.close()
return results
def retry_on_status(
num_retries: int = 3,
backoff_factor: float = 0.5,
max_backoff: float | None = None,
retry_statuses: tuple[int, ...] = (501, 503)
):
"""
Retry decorator for functions making httpx requests.
Retries on specific HTTP status codes with exponential backoff.
Args:
num_retries (int): Max number of retries.
backoff_factor (float): Multiplier for delay (e.g., 0.5, 1, etc.).
max_backoff (float, optional): Cap on the backoff delay in seconds.
retry_statuses (tuple): HTTP status codes to retry on.
"""
def decorator(func: Callable):
if asyncio.iscoroutinefunction(func):
# Async version
@wraps(func)
async def async_wrapper(*args, **kwargs):
for attempt in range(num_retries + 1):
response = await func(*args, **kwargs)
if response.status_code not in retry_statuses:
return response
if attempt < num_retries:
delay = min(backoff_factor * (2 ** attempt), max_backoff or float('inf'))
await asyncio.sleep(delay)
return response
return async_wrapper
# Sync version
@wraps(func)
def sync_wrapper(*args, **kwargs):
for attempt in range(num_retries + 1):
response = func(*args, **kwargs)
if response.status_code not in retry_statuses:
return response
if attempt < num_retries:
delay = min(backoff_factor * (2 ** attempt), max_backoff or float('inf'))
sleep(delay)
return response
return sync_wrapper
return decorator