File size: 5,740 Bytes
01d9265 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 |
import aiohttp
import asyncio
from RTN import parse
from comet.utils.general import is_video
from comet.utils.logger import logger
class TorBox:
def __init__(self, session: aiohttp.ClientSession, debrid_api_key: str):
session.headers["Authorization"] = f"Bearer {debrid_api_key}"
self.session = session
self.proxy = None
self.api_url = "https://api.torbox.app/v1/api"
self.debrid_api_key = debrid_api_key
async def check_premium(self):
try:
check_premium = await self.session.get(
f"{self.api_url}/user/me?settings=false"
)
check_premium = await check_premium.text()
if '"success":true' in check_premium and '"plan":0' not in check_premium:
return True
except Exception as e:
logger.warning(f"Exception while checking premium status on TorBox: {e}")
return False
async def get_instant(self, chunk: list):
try:
response = await self.session.get(
f"{self.api_url}/torrents/checkcached?hash={','.join(chunk)}&format=list&list_files=true"
)
return await response.json()
except Exception as e:
logger.warning(
f"Exception while checking hash instant availability on TorBox: {e}"
)
async def get_files(
self, torrent_hashes: list, type: str, season: str, episode: str, kitsu: bool
):
chunk_size = 100
chunks = [
torrent_hashes[i : i + chunk_size]
for i in range(0, len(torrent_hashes), chunk_size)
]
tasks = []
for chunk in chunks:
tasks.append(self.get_instant(chunk))
responses = await asyncio.gather(*tasks)
availability = [response for response in responses if response is not None]
files = {}
if type == "series":
for result in availability:
if not result["success"] or not result["data"]:
continue
for torrent in result["data"]:
torrent_files = torrent["files"]
for file in torrent_files:
filename = file["name"].split("/")[1]
if not is_video(filename):
continue
if "sample" in filename:
continue
filename_parsed = parse(filename)
if episode not in filename_parsed.episodes:
continue
if kitsu:
if filename_parsed.seasons:
continue
else:
if season not in filename_parsed.seasons:
continue
files[torrent["hash"]] = {
"index": torrent_files.index(file),
"title": filename,
"size": file["size"],
}
break
else:
for result in availability:
if not result["success"] or not result["data"]:
continue
for torrent in result["data"]:
torrent_files = torrent["files"]
for file in torrent_files:
filename = file["name"].split("/")[1]
if not is_video(filename):
continue
if "sample" in filename:
continue
files[torrent["hash"]] = {
"index": torrent_files.index(file),
"title": filename,
"size": file["size"],
}
break
return files
async def generate_download_link(self, hash: str, index: str):
try:
get_torrents = await self.session.get(
f"{self.api_url}/torrents/mylist?bypass_cache=true"
)
get_torrents = await get_torrents.json()
exists = False
for torrent in get_torrents["data"]:
if torrent["hash"] == hash:
torrent_id = torrent["id"]
exists = True
break
if not exists:
create_torrent = await self.session.post(
f"{self.api_url}/torrents/createtorrent",
data={"magnet": f"magnet:?xt=urn:btih:{hash}"},
)
create_torrent = await create_torrent.json()
torrent_id = create_torrent["data"]["torrent_id"]
# get_torrents = await self.session.get(
# f"{self.api_url}/torrents/mylist?bypass_cache=true"
# )
# get_torrents = await get_torrents.json()
# for torrent in get_torrents["data"]:
# if torrent["id"] == torrent_id:
# file_id = torrent["files"][int(index)]["id"]
# Useless, we already have file index
get_download_link = await self.session.get(
f"{self.api_url}/torrents/requestdl?token={self.debrid_api_key}&torrent_id={torrent_id}&file_id={index}&zip=false",
)
get_download_link = await get_download_link.json()
return get_download_link["data"]
except Exception as e:
logger.warning(
f"Exception while getting download link from TorBox for {hash}|{index}: {e}"
)
|