|
import asyncio |
|
import hashlib |
|
import json |
|
import time |
|
import aiohttp |
|
import httpx |
|
import uuid |
|
import orjson |
|
|
|
from fastapi import APIRouter, Request |
|
from fastapi.responses import ( |
|
RedirectResponse, |
|
StreamingResponse, |
|
FileResponse, |
|
Response, |
|
) |
|
from starlette.background import BackgroundTask |
|
from RTN import Torrent, sort_torrents |
|
|
|
from comet.debrid.manager import getDebrid |
|
from comet.utils.general import ( |
|
config_check, |
|
get_debrid_extension, |
|
get_indexer_manager, |
|
get_zilean, |
|
get_torrentio, |
|
filter, |
|
get_torrent_hash, |
|
translate, |
|
get_balanced_hashes, |
|
format_title, |
|
get_client_ip, |
|
) |
|
from comet.utils.logger import logger |
|
from comet.utils.models import database, rtn, settings |
|
|
|
streams = APIRouter() |
|
|
|
|
|
@streams.get("/stream/{type}/{id}.json") |
|
@streams.get("/{b64config}/stream/{type}/{id}.json") |
|
async def stream(request: Request, b64config: str, type: str, id: str): |
|
config = config_check(b64config) |
|
if not config: |
|
return { |
|
"streams": [ |
|
{ |
|
"name": "[⚠️] Comet", |
|
"title": "Invalid Comet config.", |
|
"url": "https://comet.fast", |
|
} |
|
] |
|
} |
|
|
|
connector = aiohttp.TCPConnector(limit=0) |
|
async with aiohttp.ClientSession(connector=connector) as session: |
|
full_id = id |
|
season = None |
|
episode = None |
|
if type == "series": |
|
info = id.split(":") |
|
id = info[0] |
|
season = int(info[1]) |
|
episode = int(info[2]) |
|
|
|
try: |
|
kitsu = False |
|
if id == "kitsu": |
|
kitsu = True |
|
get_metadata = await session.get( |
|
f"https://kitsu.io/api/edge/anime/{season}" |
|
) |
|
metadata = await get_metadata.json() |
|
name = metadata["data"]["attributes"]["canonicalTitle"] |
|
season = 1 |
|
year = None |
|
else: |
|
get_metadata = await session.get( |
|
f"https://v3.sg.media-imdb.com/suggestion/a/{id}.json" |
|
) |
|
metadata = await get_metadata.json() |
|
element = metadata["d"][ |
|
0 |
|
if metadata["d"][0]["id"] |
|
not in ["/imdbpicks/summer-watch-guide", "/emmys"] |
|
else 1 |
|
] |
|
|
|
for element in metadata["d"]: |
|
if "/" not in element["id"]: |
|
break |
|
|
|
name = element["l"] |
|
year = element["y"] |
|
except Exception as e: |
|
logger.warning(f"Exception while getting metadata for {id}: {e}") |
|
|
|
return { |
|
"streams": [ |
|
{ |
|
"name": "[⚠️] Comet", |
|
"title": f"Can't get metadata for {id}", |
|
"url": "https://comet.fast", |
|
} |
|
] |
|
} |
|
|
|
name = translate(name) |
|
log_name = name |
|
if type == "series": |
|
log_name = f"{name} S0{season}E0{episode}" |
|
|
|
cache_key = hashlib.md5( |
|
json.dumps( |
|
{ |
|
"debridService": config["debridService"], |
|
"name": name, |
|
"season": season, |
|
"episode": episode, |
|
"indexers": config["indexers"], |
|
} |
|
).encode("utf-8") |
|
).hexdigest() |
|
cached = await database.fetch_one( |
|
f"SELECT EXISTS (SELECT 1 FROM cache WHERE cacheKey = '{cache_key}')" |
|
) |
|
if cached[0] != 0: |
|
logger.info(f"Cache found for {log_name}") |
|
|
|
timestamp = await database.fetch_one( |
|
f"SELECT timestamp FROM cache WHERE cacheKey = '{cache_key}'" |
|
) |
|
if timestamp[0] + settings.CACHE_TTL < time.time(): |
|
await database.execute( |
|
f"DELETE FROM cache WHERE cacheKey = '{cache_key}'" |
|
) |
|
|
|
logger.info(f"Cache expired for {log_name}") |
|
else: |
|
sorted_ranked_files = await database.fetch_one( |
|
f"SELECT results FROM cache WHERE cacheKey = '{cache_key}'" |
|
) |
|
sorted_ranked_files = json.loads(sorted_ranked_files[0]) |
|
|
|
debrid_extension = get_debrid_extension(config["debridService"]) |
|
|
|
balanced_hashes = get_balanced_hashes(sorted_ranked_files, config) |
|
|
|
results = [] |
|
if ( |
|
config["debridStreamProxyPassword"] != "" |
|
and settings.PROXY_DEBRID_STREAM |
|
and settings.PROXY_DEBRID_STREAM_PASSWORD |
|
!= config["debridStreamProxyPassword"] |
|
): |
|
results.append( |
|
{ |
|
"name": "[⚠️] Comet", |
|
"title": "Debrid Stream Proxy Password incorrect.\nStreams will not be proxied.", |
|
"url": "https://comet.fast", |
|
} |
|
) |
|
|
|
for ( |
|
hash, |
|
hash_data, |
|
) in sorted_ranked_files.items(): |
|
for resolution, hash_list in balanced_hashes.items(): |
|
if hash in hash_list: |
|
data = hash_data["data"] |
|
results.append( |
|
{ |
|
"name": f"[{debrid_extension}⚡] Comet {data['resolution']}", |
|
"title": format_title(data, config), |
|
"torrentTitle": ( |
|
data["torrent_title"] |
|
if "torrent_title" in data |
|
else None |
|
), |
|
"torrentSize": ( |
|
data["torrent_size"] |
|
if "torrent_size" in data |
|
else None |
|
), |
|
"url": f"{request.url.scheme}://{request.url.netloc}/{b64config}/playback/{hash}/{data['index']}", |
|
"behaviorHints": { |
|
"filename": data["raw_title"], |
|
"bingeGroup": "comet|" + hash, |
|
}, |
|
} |
|
) |
|
|
|
continue |
|
|
|
return {"streams": results} |
|
else: |
|
logger.info(f"No cache found for {log_name} with user configuration") |
|
|
|
if ( |
|
settings.PROXY_DEBRID_STREAM |
|
and settings.PROXY_DEBRID_STREAM_PASSWORD |
|
== config["debridStreamProxyPassword"] |
|
and config["debridApiKey"] == "" |
|
): |
|
config["debridService"] = ( |
|
settings.PROXY_DEBRID_STREAM_DEBRID_DEFAULT_SERVICE |
|
) |
|
config["debridApiKey"] = settings.PROXY_DEBRID_STREAM_DEBRID_DEFAULT_APIKEY |
|
|
|
debrid = getDebrid(session, config, get_client_ip(request)) |
|
|
|
check_premium = await debrid.check_premium() |
|
if not check_premium: |
|
additional_info = "" |
|
if config["debridService"] == "alldebrid": |
|
additional_info = "\nCheck your email!" |
|
|
|
return { |
|
"streams": [ |
|
{ |
|
"name": "[⚠️] Comet", |
|
"title": f"Invalid {config['debridService']} account.{additional_info}", |
|
"url": "https://comet.fast", |
|
} |
|
] |
|
} |
|
|
|
indexer_manager_type = settings.INDEXER_MANAGER_TYPE |
|
|
|
search_indexer = len(config["indexers"]) != 0 |
|
torrents = [] |
|
tasks = [] |
|
if indexer_manager_type and search_indexer: |
|
logger.info( |
|
f"Start of {indexer_manager_type} search for {log_name} with indexers {config['indexers']}" |
|
) |
|
|
|
search_terms = [name] |
|
if type == "series": |
|
if not kitsu: |
|
search_terms.append(f"{name} S0{season}E0{episode}") |
|
else: |
|
search_terms.append(f"{name} {episode}") |
|
tasks.extend( |
|
get_indexer_manager( |
|
session, indexer_manager_type, config["indexers"], term |
|
) |
|
for term in search_terms |
|
) |
|
else: |
|
logger.info( |
|
f"No indexer {'manager ' if not indexer_manager_type else ''}{'selected by user' if indexer_manager_type else 'defined'} for {log_name}" |
|
) |
|
|
|
if settings.ZILEAN_URL: |
|
tasks.append(get_zilean(session, name, log_name, season, episode)) |
|
|
|
if settings.SCRAPE_TORRENTIO: |
|
tasks.append(get_torrentio(log_name, type, full_id)) |
|
|
|
search_response = await asyncio.gather(*tasks) |
|
for results in search_response: |
|
for result in results: |
|
torrents.append(result) |
|
|
|
logger.info( |
|
f"{len(torrents)} torrents found for {log_name}" |
|
+ ( |
|
" with " |
|
+ ", ".join( |
|
part |
|
for part in [ |
|
indexer_manager_type, |
|
"Zilean" if settings.ZILEAN_URL else None, |
|
"Torrentio" if settings.SCRAPE_TORRENTIO else None, |
|
] |
|
if part |
|
) |
|
if any( |
|
[ |
|
indexer_manager_type, |
|
settings.ZILEAN_URL, |
|
settings.SCRAPE_TORRENTIO, |
|
] |
|
) |
|
else "" |
|
) |
|
) |
|
|
|
if len(torrents) == 0: |
|
return {"streams": []} |
|
|
|
if settings.TITLE_MATCH_CHECK: |
|
indexed_torrents = [(i, torrents[i]["Title"]) for i in range(len(torrents))] |
|
chunk_size = 50 |
|
chunks = [ |
|
indexed_torrents[i : i + chunk_size] |
|
for i in range(0, len(indexed_torrents), chunk_size) |
|
] |
|
|
|
tasks = [] |
|
for chunk in chunks: |
|
tasks.append(filter(chunk, name, year)) |
|
|
|
filtered_torrents = await asyncio.gather(*tasks) |
|
index_less = 0 |
|
for result in filtered_torrents: |
|
for filtered in result: |
|
if not filtered[1]: |
|
del torrents[filtered[0] - index_less] |
|
index_less += 1 |
|
continue |
|
|
|
logger.info( |
|
f"{len(torrents)} torrents passed title match check for {log_name}" |
|
) |
|
|
|
if len(torrents) == 0: |
|
return {"streams": []} |
|
|
|
tasks = [] |
|
for i in range(len(torrents)): |
|
tasks.append(get_torrent_hash(session, (i, torrents[i]))) |
|
|
|
torrent_hashes = await asyncio.gather(*tasks) |
|
index_less = 0 |
|
for hash in torrent_hashes: |
|
if not hash[1]: |
|
del torrents[hash[0] - index_less] |
|
index_less += 1 |
|
continue |
|
|
|
torrents[hash[0] - index_less]["InfoHash"] = hash[1] |
|
|
|
logger.info(f"{len(torrents)} info hashes found for {log_name}") |
|
|
|
if len(torrents) == 0: |
|
return {"streams": []} |
|
|
|
files = await debrid.get_files( |
|
[hash[1] for hash in torrent_hashes if hash[1] is not None], |
|
type, |
|
season, |
|
episode, |
|
kitsu, |
|
) |
|
|
|
ranked_files = set() |
|
for hash in files: |
|
try: |
|
ranked_file = rtn.rank( |
|
files[hash]["title"], |
|
hash, |
|
) |
|
|
|
ranked_files.add(ranked_file) |
|
except: |
|
pass |
|
|
|
sorted_ranked_files = sort_torrents(ranked_files) |
|
|
|
len_sorted_ranked_files = len(sorted_ranked_files) |
|
logger.info( |
|
f"{len_sorted_ranked_files} cached files found on {config['debridService']} for {log_name}" |
|
) |
|
|
|
if len_sorted_ranked_files == 0: |
|
return {"streams": []} |
|
|
|
sorted_ranked_files = { |
|
key: (value.model_dump() if isinstance(value, Torrent) else value) |
|
for key, value in sorted_ranked_files.items() |
|
} |
|
torrents_by_hash = {torrent["InfoHash"]: torrent for torrent in torrents} |
|
for hash in sorted_ranked_files: |
|
sorted_ranked_files[hash]["data"]["title"] = files[hash]["title"] |
|
sorted_ranked_files[hash]["data"]["torrent_title"] = torrents_by_hash[hash][ |
|
"Title" |
|
] |
|
sorted_ranked_files[hash]["data"]["tracker"] = torrents_by_hash[hash][ |
|
"Tracker" |
|
] |
|
sorted_ranked_files[hash]["data"]["size"] = files[hash]["size"] |
|
torrent_size = torrents_by_hash[hash]["Size"] |
|
sorted_ranked_files[hash]["data"]["torrent_size"] = ( |
|
torrent_size if torrent_size else files[hash]["size"] |
|
) |
|
sorted_ranked_files[hash]["data"]["index"] = files[hash]["index"] |
|
|
|
json_data = json.dumps(sorted_ranked_files).replace("'", "''") |
|
await database.execute( |
|
f"INSERT {'OR IGNORE ' if settings.DATABASE_TYPE == 'sqlite' else ''}INTO cache (cacheKey, results, timestamp) VALUES (:cache_key, :json_data, :timestamp){' ON CONFLICT DO NOTHING' if settings.DATABASE_TYPE == 'postgresql' else ''}", |
|
{"cache_key": cache_key, "json_data": json_data, "timestamp": time.time()}, |
|
) |
|
logger.info(f"Results have been cached for {log_name}") |
|
|
|
debrid_extension = get_debrid_extension(config["debridService"]) |
|
|
|
balanced_hashes = get_balanced_hashes(sorted_ranked_files, config) |
|
|
|
results = [] |
|
if ( |
|
config["debridStreamProxyPassword"] != "" |
|
and settings.PROXY_DEBRID_STREAM |
|
and settings.PROXY_DEBRID_STREAM_PASSWORD |
|
!= config["debridStreamProxyPassword"] |
|
): |
|
results.append( |
|
{ |
|
"name": "[⚠️] Comet", |
|
"title": "Debrid Stream Proxy Password incorrect.\nStreams will not be proxied.", |
|
"url": "https://comet.fast", |
|
} |
|
) |
|
|
|
for hash, hash_data in sorted_ranked_files.items(): |
|
for resolution, hash_list in balanced_hashes.items(): |
|
if hash in hash_list: |
|
data = hash_data["data"] |
|
results.append( |
|
{ |
|
"name": f"[{debrid_extension}⚡] Comet {data['resolution']}", |
|
"title": format_title(data, config), |
|
"torrentTitle": data["torrent_title"], |
|
"torrentSize": data["torrent_size"], |
|
"url": f"{request.url.scheme}://{request.url.netloc}/{b64config}/playback/{hash}/{data['index']}", |
|
"behaviorHints": { |
|
"filename": data["raw_title"], |
|
"bingeGroup": "comet|" + hash, |
|
}, |
|
} |
|
) |
|
|
|
continue |
|
|
|
return {"streams": results} |
|
|
|
|
|
@streams.head("/{b64config}/playback/{hash}/{index}") |
|
async def playback(b64config: str, hash: str, index: str): |
|
return RedirectResponse("https://stremio.fast", status_code=302) |
|
|
|
|
|
class CustomORJSONResponse(Response): |
|
media_type = "application/json" |
|
|
|
def render(self, content) -> bytes: |
|
assert orjson is not None, "orjson must be installed" |
|
return orjson.dumps(content, option=orjson.OPT_INDENT_2) |
|
|
|
|
|
@streams.get("/active-connections", response_class=CustomORJSONResponse) |
|
async def active_connections(request: Request, password: str): |
|
if password != settings.DASHBOARD_ADMIN_PASSWORD: |
|
return "Invalid Password" |
|
|
|
active_connections = await database.fetch_all("SELECT * FROM active_connections") |
|
|
|
return { |
|
"total_connections": len(active_connections), |
|
"active_connections": active_connections, |
|
} |
|
|
|
|
|
@streams.get("/{b64config}/playback/{hash}/{index}") |
|
async def playback(request: Request, b64config: str, hash: str, index: str): |
|
config = config_check(b64config) |
|
if not config: |
|
return FileResponse("comet/assets/invalidconfig.mp4") |
|
|
|
if ( |
|
settings.PROXY_DEBRID_STREAM |
|
and settings.PROXY_DEBRID_STREAM_PASSWORD == config["debridStreamProxyPassword"] |
|
and config["debridApiKey"] == "" |
|
): |
|
config["debridService"] = settings.PROXY_DEBRID_STREAM_DEBRID_DEFAULT_SERVICE |
|
config["debridApiKey"] = settings.PROXY_DEBRID_STREAM_DEBRID_DEFAULT_APIKEY |
|
|
|
async with aiohttp.ClientSession() as session: |
|
|
|
cached_link = await database.fetch_one( |
|
f"SELECT link, timestamp FROM download_links WHERE debrid_key = '{config['debridApiKey']}' AND hash = '{hash}' AND file_index = '{index}'" |
|
) |
|
|
|
current_time = time.time() |
|
download_link = None |
|
if cached_link: |
|
link = cached_link["link"] |
|
timestamp = cached_link["timestamp"] |
|
|
|
if current_time - timestamp < 3600: |
|
download_link = link |
|
else: |
|
|
|
await database.execute( |
|
f"DELETE FROM download_links WHERE debrid_key = '{config['debridApiKey']}' AND hash = '{hash}' AND file_index = '{index}'" |
|
) |
|
|
|
ip = get_client_ip(request) |
|
if not download_link: |
|
debrid = getDebrid(session, config, ip) |
|
download_link = await debrid.generate_download_link(hash, index) |
|
if not download_link: |
|
return FileResponse("comet/assets/uncached.mp4") |
|
|
|
|
|
await database.execute( |
|
f"INSERT {'OR IGNORE ' if settings.DATABASE_TYPE == 'sqlite' else ''}INTO download_links (debrid_key, hash, file_index, link, timestamp) VALUES (:debrid_key, :hash, :file_index, :link, :timestamp){' ON CONFLICT DO NOTHING' if settings.DATABASE_TYPE == 'postgresql' else ''}", |
|
{ |
|
"debrid_key": config["debridApiKey"], |
|
"hash": hash, |
|
"file_index": index, |
|
"link": download_link, |
|
"timestamp": current_time, |
|
}, |
|
) |
|
|
|
if ( |
|
settings.PROXY_DEBRID_STREAM |
|
and settings.PROXY_DEBRID_STREAM_PASSWORD |
|
== config["debridStreamProxyPassword"] |
|
): |
|
active_ip_connections = await database.fetch_all( |
|
"SELECT ip, COUNT(*) as connections FROM active_connections GROUP BY ip" |
|
) |
|
if any( |
|
connection["ip"] == ip |
|
and connection["connections"] |
|
>= settings.PROXY_DEBRID_STREAM_MAX_CONNECTIONS |
|
for connection in active_ip_connections |
|
): |
|
return FileResponse("comet/assets/proxylimit.mp4") |
|
|
|
proxy = None |
|
|
|
class Streamer: |
|
def __init__(self, id: str): |
|
self.id = id |
|
|
|
self.client = httpx.AsyncClient(proxy=proxy) |
|
self.response = None |
|
|
|
async def stream_content(self, headers: dict): |
|
async with self.client.stream( |
|
"GET", download_link, headers=headers |
|
) as self.response: |
|
async for chunk in self.response.aiter_raw(): |
|
yield chunk |
|
|
|
async def close(self): |
|
await database.execute( |
|
f"DELETE FROM active_connections WHERE id = '{self.id}'" |
|
) |
|
|
|
if self.response is not None: |
|
await self.response.aclose() |
|
if self.client is not None: |
|
await self.client.aclose() |
|
|
|
range_header = request.headers.get("range", "bytes=0-") |
|
|
|
response = await session.head( |
|
download_link, headers={"Range": range_header} |
|
) |
|
if response.status == 503 and config["debridService"] == "alldebrid": |
|
proxy = ( |
|
settings.DEBRID_PROXY_URL |
|
) |
|
|
|
response = await session.head( |
|
download_link, headers={"Range": range_header}, proxy=proxy |
|
) |
|
|
|
if response.status == 206: |
|
id = str(uuid.uuid4()) |
|
await database.execute( |
|
f"INSERT {'OR IGNORE ' if settings.DATABASE_TYPE == 'sqlite' else ''}INTO active_connections (id, ip, content, timestamp) VALUES (:id, :ip, :content, :timestamp){' ON CONFLICT DO NOTHING' if settings.DATABASE_TYPE == 'postgresql' else ''}", |
|
{ |
|
"id": id, |
|
"ip": ip, |
|
"content": str(response.url), |
|
"timestamp": current_time, |
|
}, |
|
) |
|
|
|
streamer = Streamer(id) |
|
|
|
return StreamingResponse( |
|
streamer.stream_content({"Range": range_header}), |
|
status_code=206, |
|
headers={ |
|
"Content-Range": response.headers["Content-Range"], |
|
"Content-Length": response.headers["Content-Length"], |
|
"Accept-Ranges": "bytes", |
|
}, |
|
background=BackgroundTask(streamer.close), |
|
) |
|
|
|
return FileResponse("comet/assets/uncached.mp4") |
|
|
|
return RedirectResponse(download_link, status_code=302) |
|
|