Comet00 / comet /api /stream.py
Mythus's picture
Upload 26 files
01d9265 verified
import asyncio
import hashlib
import json
import time
import aiohttp
import httpx
import uuid
import orjson
from fastapi import APIRouter, Request
from fastapi.responses import (
RedirectResponse,
StreamingResponse,
FileResponse,
Response,
)
from starlette.background import BackgroundTask
from RTN import Torrent, sort_torrents
from comet.debrid.manager import getDebrid
from comet.utils.general import (
config_check,
get_debrid_extension,
get_indexer_manager,
get_zilean,
get_torrentio,
filter,
get_torrent_hash,
translate,
get_balanced_hashes,
format_title,
get_client_ip,
)
from comet.utils.logger import logger
from comet.utils.models import database, rtn, settings
streams = APIRouter()
@streams.get("/stream/{type}/{id}.json")
@streams.get("/{b64config}/stream/{type}/{id}.json")
async def stream(request: Request, b64config: str, type: str, id: str):
config = config_check(b64config)
if not config:
return {
"streams": [
{
"name": "[⚠️] Comet",
"title": "Invalid Comet config.",
"url": "https://comet.fast",
}
]
}
connector = aiohttp.TCPConnector(limit=0)
async with aiohttp.ClientSession(connector=connector) as session:
full_id = id
season = None
episode = None
if type == "series":
info = id.split(":")
id = info[0]
season = int(info[1])
episode = int(info[2])
try:
kitsu = False
if id == "kitsu":
kitsu = True
get_metadata = await session.get(
f"https://kitsu.io/api/edge/anime/{season}"
)
metadata = await get_metadata.json()
name = metadata["data"]["attributes"]["canonicalTitle"]
season = 1
year = None
else:
get_metadata = await session.get(
f"https://v3.sg.media-imdb.com/suggestion/a/{id}.json"
)
metadata = await get_metadata.json()
element = metadata["d"][
0
if metadata["d"][0]["id"]
not in ["/imdbpicks/summer-watch-guide", "/emmys"]
else 1
]
for element in metadata["d"]:
if "/" not in element["id"]:
break
name = element["l"]
year = element["y"]
except Exception as e:
logger.warning(f"Exception while getting metadata for {id}: {e}")
return {
"streams": [
{
"name": "[⚠️] Comet",
"title": f"Can't get metadata for {id}",
"url": "https://comet.fast",
}
]
}
name = translate(name)
log_name = name
if type == "series":
log_name = f"{name} S0{season}E0{episode}"
cache_key = hashlib.md5(
json.dumps(
{
"debridService": config["debridService"],
"name": name,
"season": season,
"episode": episode,
"indexers": config["indexers"],
}
).encode("utf-8")
).hexdigest()
cached = await database.fetch_one(
f"SELECT EXISTS (SELECT 1 FROM cache WHERE cacheKey = '{cache_key}')"
)
if cached[0] != 0:
logger.info(f"Cache found for {log_name}")
timestamp = await database.fetch_one(
f"SELECT timestamp FROM cache WHERE cacheKey = '{cache_key}'"
)
if timestamp[0] + settings.CACHE_TTL < time.time():
await database.execute(
f"DELETE FROM cache WHERE cacheKey = '{cache_key}'"
)
logger.info(f"Cache expired for {log_name}")
else:
sorted_ranked_files = await database.fetch_one(
f"SELECT results FROM cache WHERE cacheKey = '{cache_key}'"
)
sorted_ranked_files = json.loads(sorted_ranked_files[0])
debrid_extension = get_debrid_extension(config["debridService"])
balanced_hashes = get_balanced_hashes(sorted_ranked_files, config)
results = []
if (
config["debridStreamProxyPassword"] != ""
and settings.PROXY_DEBRID_STREAM
and settings.PROXY_DEBRID_STREAM_PASSWORD
!= config["debridStreamProxyPassword"]
):
results.append(
{
"name": "[⚠️] Comet",
"title": "Debrid Stream Proxy Password incorrect.\nStreams will not be proxied.",
"url": "https://comet.fast",
}
)
for (
hash,
hash_data,
) in sorted_ranked_files.items():
for resolution, hash_list in balanced_hashes.items():
if hash in hash_list:
data = hash_data["data"]
results.append(
{
"name": f"[{debrid_extension}⚡] Comet {data['resolution']}",
"title": format_title(data, config),
"torrentTitle": (
data["torrent_title"]
if "torrent_title" in data
else None
),
"torrentSize": (
data["torrent_size"]
if "torrent_size" in data
else None
),
"url": f"{request.url.scheme}://{request.url.netloc}/{b64config}/playback/{hash}/{data['index']}",
"behaviorHints": {
"filename": data["raw_title"],
"bingeGroup": "comet|" + hash,
},
}
)
continue
return {"streams": results}
else:
logger.info(f"No cache found for {log_name} with user configuration")
if (
settings.PROXY_DEBRID_STREAM
and settings.PROXY_DEBRID_STREAM_PASSWORD
== config["debridStreamProxyPassword"]
and config["debridApiKey"] == ""
):
config["debridService"] = (
settings.PROXY_DEBRID_STREAM_DEBRID_DEFAULT_SERVICE
)
config["debridApiKey"] = settings.PROXY_DEBRID_STREAM_DEBRID_DEFAULT_APIKEY
debrid = getDebrid(session, config, get_client_ip(request))
check_premium = await debrid.check_premium()
if not check_premium:
additional_info = ""
if config["debridService"] == "alldebrid":
additional_info = "\nCheck your email!"
return {
"streams": [
{
"name": "[⚠️] Comet",
"title": f"Invalid {config['debridService']} account.{additional_info}",
"url": "https://comet.fast",
}
]
}
indexer_manager_type = settings.INDEXER_MANAGER_TYPE
search_indexer = len(config["indexers"]) != 0
torrents = []
tasks = []
if indexer_manager_type and search_indexer:
logger.info(
f"Start of {indexer_manager_type} search for {log_name} with indexers {config['indexers']}"
)
search_terms = [name]
if type == "series":
if not kitsu:
search_terms.append(f"{name} S0{season}E0{episode}")
else:
search_terms.append(f"{name} {episode}")
tasks.extend(
get_indexer_manager(
session, indexer_manager_type, config["indexers"], term
)
for term in search_terms
)
else:
logger.info(
f"No indexer {'manager ' if not indexer_manager_type else ''}{'selected by user' if indexer_manager_type else 'defined'} for {log_name}"
)
if settings.ZILEAN_URL:
tasks.append(get_zilean(session, name, log_name, season, episode))
if settings.SCRAPE_TORRENTIO:
tasks.append(get_torrentio(log_name, type, full_id))
search_response = await asyncio.gather(*tasks)
for results in search_response:
for result in results:
torrents.append(result)
logger.info(
f"{len(torrents)} torrents found for {log_name}"
+ (
" with "
+ ", ".join(
part
for part in [
indexer_manager_type,
"Zilean" if settings.ZILEAN_URL else None,
"Torrentio" if settings.SCRAPE_TORRENTIO else None,
]
if part
)
if any(
[
indexer_manager_type,
settings.ZILEAN_URL,
settings.SCRAPE_TORRENTIO,
]
)
else ""
)
)
if len(torrents) == 0:
return {"streams": []}
if settings.TITLE_MATCH_CHECK:
indexed_torrents = [(i, torrents[i]["Title"]) for i in range(len(torrents))]
chunk_size = 50
chunks = [
indexed_torrents[i : i + chunk_size]
for i in range(0, len(indexed_torrents), chunk_size)
]
tasks = []
for chunk in chunks:
tasks.append(filter(chunk, name, year))
filtered_torrents = await asyncio.gather(*tasks)
index_less = 0
for result in filtered_torrents:
for filtered in result:
if not filtered[1]:
del torrents[filtered[0] - index_less]
index_less += 1
continue
logger.info(
f"{len(torrents)} torrents passed title match check for {log_name}"
)
if len(torrents) == 0:
return {"streams": []}
tasks = []
for i in range(len(torrents)):
tasks.append(get_torrent_hash(session, (i, torrents[i])))
torrent_hashes = await asyncio.gather(*tasks)
index_less = 0
for hash in torrent_hashes:
if not hash[1]:
del torrents[hash[0] - index_less]
index_less += 1
continue
torrents[hash[0] - index_less]["InfoHash"] = hash[1]
logger.info(f"{len(torrents)} info hashes found for {log_name}")
if len(torrents) == 0:
return {"streams": []}
files = await debrid.get_files(
[hash[1] for hash in torrent_hashes if hash[1] is not None],
type,
season,
episode,
kitsu,
)
ranked_files = set()
for hash in files:
try:
ranked_file = rtn.rank(
files[hash]["title"],
hash, # , correct_title=name, remove_trash=True
)
ranked_files.add(ranked_file)
except:
pass
sorted_ranked_files = sort_torrents(ranked_files)
len_sorted_ranked_files = len(sorted_ranked_files)
logger.info(
f"{len_sorted_ranked_files} cached files found on {config['debridService']} for {log_name}"
)
if len_sorted_ranked_files == 0:
return {"streams": []}
sorted_ranked_files = {
key: (value.model_dump() if isinstance(value, Torrent) else value)
for key, value in sorted_ranked_files.items()
}
torrents_by_hash = {torrent["InfoHash"]: torrent for torrent in torrents}
for hash in sorted_ranked_files: # needed for caching
sorted_ranked_files[hash]["data"]["title"] = files[hash]["title"]
sorted_ranked_files[hash]["data"]["torrent_title"] = torrents_by_hash[hash][
"Title"
]
sorted_ranked_files[hash]["data"]["tracker"] = torrents_by_hash[hash][
"Tracker"
]
sorted_ranked_files[hash]["data"]["size"] = files[hash]["size"]
torrent_size = torrents_by_hash[hash]["Size"]
sorted_ranked_files[hash]["data"]["torrent_size"] = (
torrent_size if torrent_size else files[hash]["size"]
)
sorted_ranked_files[hash]["data"]["index"] = files[hash]["index"]
json_data = json.dumps(sorted_ranked_files).replace("'", "''")
await database.execute(
f"INSERT {'OR IGNORE ' if settings.DATABASE_TYPE == 'sqlite' else ''}INTO cache (cacheKey, results, timestamp) VALUES (:cache_key, :json_data, :timestamp){' ON CONFLICT DO NOTHING' if settings.DATABASE_TYPE == 'postgresql' else ''}",
{"cache_key": cache_key, "json_data": json_data, "timestamp": time.time()},
)
logger.info(f"Results have been cached for {log_name}")
debrid_extension = get_debrid_extension(config["debridService"])
balanced_hashes = get_balanced_hashes(sorted_ranked_files, config)
results = []
if (
config["debridStreamProxyPassword"] != ""
and settings.PROXY_DEBRID_STREAM
and settings.PROXY_DEBRID_STREAM_PASSWORD
!= config["debridStreamProxyPassword"]
):
results.append(
{
"name": "[⚠️] Comet",
"title": "Debrid Stream Proxy Password incorrect.\nStreams will not be proxied.",
"url": "https://comet.fast",
}
)
for hash, hash_data in sorted_ranked_files.items():
for resolution, hash_list in balanced_hashes.items():
if hash in hash_list:
data = hash_data["data"]
results.append(
{
"name": f"[{debrid_extension}⚡] Comet {data['resolution']}",
"title": format_title(data, config),
"torrentTitle": data["torrent_title"],
"torrentSize": data["torrent_size"],
"url": f"{request.url.scheme}://{request.url.netloc}/{b64config}/playback/{hash}/{data['index']}",
"behaviorHints": {
"filename": data["raw_title"],
"bingeGroup": "comet|" + hash,
},
}
)
continue
return {"streams": results}
@streams.head("/{b64config}/playback/{hash}/{index}")
async def playback(b64config: str, hash: str, index: str):
return RedirectResponse("https://stremio.fast", status_code=302)
class CustomORJSONResponse(Response):
media_type = "application/json"
def render(self, content) -> bytes:
assert orjson is not None, "orjson must be installed"
return orjson.dumps(content, option=orjson.OPT_INDENT_2)
@streams.get("/active-connections", response_class=CustomORJSONResponse)
async def active_connections(request: Request, password: str):
if password != settings.DASHBOARD_ADMIN_PASSWORD:
return "Invalid Password"
active_connections = await database.fetch_all("SELECT * FROM active_connections")
return {
"total_connections": len(active_connections),
"active_connections": active_connections,
}
@streams.get("/{b64config}/playback/{hash}/{index}")
async def playback(request: Request, b64config: str, hash: str, index: str):
config = config_check(b64config)
if not config:
return FileResponse("comet/assets/invalidconfig.mp4")
if (
settings.PROXY_DEBRID_STREAM
and settings.PROXY_DEBRID_STREAM_PASSWORD == config["debridStreamProxyPassword"]
and config["debridApiKey"] == ""
):
config["debridService"] = settings.PROXY_DEBRID_STREAM_DEBRID_DEFAULT_SERVICE
config["debridApiKey"] = settings.PROXY_DEBRID_STREAM_DEBRID_DEFAULT_APIKEY
async with aiohttp.ClientSession() as session:
# Check for cached download link
cached_link = await database.fetch_one(
f"SELECT link, timestamp FROM download_links WHERE debrid_key = '{config['debridApiKey']}' AND hash = '{hash}' AND file_index = '{index}'"
)
current_time = time.time()
download_link = None
if cached_link:
link = cached_link["link"]
timestamp = cached_link["timestamp"]
if current_time - timestamp < 3600:
download_link = link
else:
# Cache expired, remove old entry
await database.execute(
f"DELETE FROM download_links WHERE debrid_key = '{config['debridApiKey']}' AND hash = '{hash}' AND file_index = '{index}'"
)
ip = get_client_ip(request)
if not download_link:
debrid = getDebrid(session, config, ip)
download_link = await debrid.generate_download_link(hash, index)
if not download_link:
return FileResponse("comet/assets/uncached.mp4")
# Cache the new download link
await database.execute(
f"INSERT {'OR IGNORE ' if settings.DATABASE_TYPE == 'sqlite' else ''}INTO download_links (debrid_key, hash, file_index, link, timestamp) VALUES (:debrid_key, :hash, :file_index, :link, :timestamp){' ON CONFLICT DO NOTHING' if settings.DATABASE_TYPE == 'postgresql' else ''}",
{
"debrid_key": config["debridApiKey"],
"hash": hash,
"file_index": index,
"link": download_link,
"timestamp": current_time,
},
)
if (
settings.PROXY_DEBRID_STREAM
and settings.PROXY_DEBRID_STREAM_PASSWORD
== config["debridStreamProxyPassword"]
):
active_ip_connections = await database.fetch_all(
"SELECT ip, COUNT(*) as connections FROM active_connections GROUP BY ip"
)
if any(
connection["ip"] == ip
and connection["connections"]
>= settings.PROXY_DEBRID_STREAM_MAX_CONNECTIONS
for connection in active_ip_connections
):
return FileResponse("comet/assets/proxylimit.mp4")
proxy = None
class Streamer:
def __init__(self, id: str):
self.id = id
self.client = httpx.AsyncClient(proxy=proxy)
self.response = None
async def stream_content(self, headers: dict):
async with self.client.stream(
"GET", download_link, headers=headers
) as self.response:
async for chunk in self.response.aiter_raw():
yield chunk
async def close(self):
await database.execute(
f"DELETE FROM active_connections WHERE id = '{self.id}'"
)
if self.response is not None:
await self.response.aclose()
if self.client is not None:
await self.client.aclose()
range_header = request.headers.get("range", "bytes=0-")
response = await session.head(
download_link, headers={"Range": range_header}
)
if response.status == 503 and config["debridService"] == "alldebrid":
proxy = (
settings.DEBRID_PROXY_URL
) # proxy is not needed to proxy realdebrid stream
response = await session.head(
download_link, headers={"Range": range_header}, proxy=proxy
)
if response.status == 206:
id = str(uuid.uuid4())
await database.execute(
f"INSERT {'OR IGNORE ' if settings.DATABASE_TYPE == 'sqlite' else ''}INTO active_connections (id, ip, content, timestamp) VALUES (:id, :ip, :content, :timestamp){' ON CONFLICT DO NOTHING' if settings.DATABASE_TYPE == 'postgresql' else ''}",
{
"id": id,
"ip": ip,
"content": str(response.url),
"timestamp": current_time,
},
)
streamer = Streamer(id)
return StreamingResponse(
streamer.stream_content({"Range": range_header}),
status_code=206,
headers={
"Content-Range": response.headers["Content-Range"],
"Content-Length": response.headers["Content-Length"],
"Accept-Ranges": "bytes",
},
background=BackgroundTask(streamer.close),
)
return FileResponse("comet/assets/uncached.mp4")
return RedirectResponse(download_link, status_code=302)