Comet00 / comet /debrid /torbox.py
Mythus's picture
Upload 26 files
01d9265 verified
import aiohttp
import asyncio
from RTN import parse
from comet.utils.general import is_video
from comet.utils.logger import logger
class TorBox:
def __init__(self, session: aiohttp.ClientSession, debrid_api_key: str):
session.headers["Authorization"] = f"Bearer {debrid_api_key}"
self.session = session
self.proxy = None
self.api_url = "https://api.torbox.app/v1/api"
self.debrid_api_key = debrid_api_key
async def check_premium(self):
try:
check_premium = await self.session.get(
f"{self.api_url}/user/me?settings=false"
)
check_premium = await check_premium.text()
if '"success":true' in check_premium and '"plan":0' not in check_premium:
return True
except Exception as e:
logger.warning(f"Exception while checking premium status on TorBox: {e}")
return False
async def get_instant(self, chunk: list):
try:
response = await self.session.get(
f"{self.api_url}/torrents/checkcached?hash={','.join(chunk)}&format=list&list_files=true"
)
return await response.json()
except Exception as e:
logger.warning(
f"Exception while checking hash instant availability on TorBox: {e}"
)
async def get_files(
self, torrent_hashes: list, type: str, season: str, episode: str, kitsu: bool
):
chunk_size = 100
chunks = [
torrent_hashes[i : i + chunk_size]
for i in range(0, len(torrent_hashes), chunk_size)
]
tasks = []
for chunk in chunks:
tasks.append(self.get_instant(chunk))
responses = await asyncio.gather(*tasks)
availability = [response for response in responses if response is not None]
files = {}
if type == "series":
for result in availability:
if not result["success"] or not result["data"]:
continue
for torrent in result["data"]:
torrent_files = torrent["files"]
for file in torrent_files:
filename = file["name"].split("/")[1]
if not is_video(filename):
continue
if "sample" in filename:
continue
filename_parsed = parse(filename)
if episode not in filename_parsed.episodes:
continue
if kitsu:
if filename_parsed.seasons:
continue
else:
if season not in filename_parsed.seasons:
continue
files[torrent["hash"]] = {
"index": torrent_files.index(file),
"title": filename,
"size": file["size"],
}
break
else:
for result in availability:
if not result["success"] or not result["data"]:
continue
for torrent in result["data"]:
torrent_files = torrent["files"]
for file in torrent_files:
filename = file["name"].split("/")[1]
if not is_video(filename):
continue
if "sample" in filename:
continue
files[torrent["hash"]] = {
"index": torrent_files.index(file),
"title": filename,
"size": file["size"],
}
break
return files
async def generate_download_link(self, hash: str, index: str):
try:
get_torrents = await self.session.get(
f"{self.api_url}/torrents/mylist?bypass_cache=true"
)
get_torrents = await get_torrents.json()
exists = False
for torrent in get_torrents["data"]:
if torrent["hash"] == hash:
torrent_id = torrent["id"]
exists = True
break
if not exists:
create_torrent = await self.session.post(
f"{self.api_url}/torrents/createtorrent",
data={"magnet": f"magnet:?xt=urn:btih:{hash}"},
)
create_torrent = await create_torrent.json()
torrent_id = create_torrent["data"]["torrent_id"]
# get_torrents = await self.session.get(
# f"{self.api_url}/torrents/mylist?bypass_cache=true"
# )
# get_torrents = await get_torrents.json()
# for torrent in get_torrents["data"]:
# if torrent["id"] == torrent_id:
# file_id = torrent["files"][int(index)]["id"]
# Useless, we already have file index
get_download_link = await self.session.get(
f"{self.api_url}/torrents/requestdl?token={self.debrid_api_key}&torrent_id={torrent_id}&file_id={index}&zip=false",
)
get_download_link = await get_download_link.json()
return get_download_link["data"]
except Exception as e:
logger.warning(
f"Exception while getting download link from TorBox for {hash}|{index}: {e}"
)