File size: 2,166 Bytes
ef088c2
 
 
a0e37e2
 
 
 
 
 
 
 
 
 
 
 
 
 
ef088c2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
from collections.abc import Callable
from functools import wraps
from time import sleep
import asyncio


def async_tasks(*tasks):

    async def gather(*t):
        t = [await _ for _ in t]
        return await asyncio.gather(*t)

    loop = asyncio.new_event_loop()
    results = loop.run_until_complete(gather(*tasks))
    loop.stop()
    loop.close()
    return results


def retry_on_status(
    num_retries: int = 3,
    backoff_factor: float = 0.5,
    max_backoff: float | None = None,
    retry_statuses: tuple[int, ...] = (501, 503)
):
    """
    Retry decorator for functions making httpx requests.
    Retries on specific HTTP status codes with exponential backoff.

    Args:
        num_retries (int): Max number of retries.
        backoff_factor (float): Multiplier for delay (e.g., 0.5, 1, etc.).
        max_backoff (float, optional): Cap on the backoff delay in seconds.
        retry_statuses (tuple): HTTP status codes to retry on.
    """

    def decorator(func: Callable):

        if asyncio.iscoroutinefunction(func):
            # Async version
            @wraps(func)
            async def async_wrapper(*args, **kwargs):
                for attempt in range(num_retries + 1):
                    response = await func(*args, **kwargs)
                    if response.status_code not in retry_statuses:
                        return response
                    if attempt < num_retries:
                        delay = min(backoff_factor * (2 ** attempt), max_backoff or float('inf'))
                        await asyncio.sleep(delay)
                return response
            return async_wrapper

        # Sync version
        @wraps(func)
        def sync_wrapper(*args, **kwargs):
            for attempt in range(num_retries + 1):
                response = func(*args, **kwargs)
                if response.status_code not in retry_statuses:
                    return response
                if attempt < num_retries:
                    delay = min(backoff_factor * (2 ** attempt), max_backoff or float('inf'))
                    sleep(delay)
            return response
        return sync_wrapper

    return decorator