|
import asyncio
|
|
import time
|
|
from typing import Optional
|
|
|
|
import aiohttp
|
|
|
|
import utils.config as config
|
|
import utils.helpers as helpers
|
|
|
|
|
|
class AiohttpManager:
|
|
"""
|
|
The AiohttpManager class manages the aiohttp session.
|
|
"""
|
|
|
|
def __init__(self):
|
|
"""
|
|
Initialize the aiohttp manager.
|
|
"""
|
|
self._session = None
|
|
self._semaphore = asyncio.Semaphore(config.max_api_load)
|
|
|
|
async def refresh_session(self) -> None:
|
|
"""
|
|
Refresh the aiohttp session, or
|
|
create it if it doesn't exist yet.
|
|
"""
|
|
await self.close_session()
|
|
self._session = aiohttp.ClientSession()
|
|
helpers.log("Aiohttp session created.")
|
|
|
|
async def close_session(self) -> None:
|
|
"""
|
|
Close the aiohttp session.
|
|
"""
|
|
if self._session:
|
|
await self._session.close()
|
|
helpers.log("Aiohttp session closed.")
|
|
|
|
async def read_api(self, url: str, api_key: Optional[str] = None) -> list[dict]:
|
|
"""
|
|
Get the contents of the api using aiohttp.
|
|
Ignores fail status codes other than 401.
|
|
"""
|
|
if not self._session:
|
|
raise ValueError("Session not initialized")
|
|
|
|
try:
|
|
headers = {"Authorization": f"Bearer {api_key}"} if api_key else None
|
|
async with self._semaphore:
|
|
async with self._session.get(url, headers=headers) as response:
|
|
if response.status in {401, 404}:
|
|
try:
|
|
response.raise_for_status()
|
|
except aiohttp.ClientResponseError as e:
|
|
helpers.log(e)
|
|
return []
|
|
|
|
response.raise_for_status()
|
|
|
|
|
|
return await response.json()
|
|
|
|
except aiohttp.ClientError as e:
|
|
raise APIRequestError("API request error (do not act):", e)
|
|
|
|
class APIRequestError(Exception):
|
|
pass |