import base64 import hashlib import json import re import aiohttp import bencodepy import PTT import asyncio from RTN import parse, title_match from curl_cffi import requests from fastapi import Request from comet.utils.logger import logger from comet.utils.models import settings, ConfigModel languages_emojis = { "multi": "๐ŸŒŽ", # Dubbed "en": "๐Ÿ‡ฌ๐Ÿ‡ง", # English "ja": "๐Ÿ‡ฏ๐Ÿ‡ต", # Japanese "zh": "๐Ÿ‡จ๐Ÿ‡ณ", # Chinese "ru": "๐Ÿ‡ท๐Ÿ‡บ", # Russian "ar": "๐Ÿ‡ธ๐Ÿ‡ฆ", # Arabic "pt": "๐Ÿ‡ต๐Ÿ‡น", # Portuguese "es": "๐Ÿ‡ช๐Ÿ‡ธ", # Spanish "fr": "๐Ÿ‡ซ๐Ÿ‡ท", # French "de": "๐Ÿ‡ฉ๐Ÿ‡ช", # German "it": "๐Ÿ‡ฎ๐Ÿ‡น", # Italian "ko": "๐Ÿ‡ฐ๐Ÿ‡ท", # Korean "hi": "๐Ÿ‡ฎ๐Ÿ‡ณ", # Hindi "bn": "๐Ÿ‡ง๐Ÿ‡ฉ", # Bengali "pa": "๐Ÿ‡ต๐Ÿ‡ฐ", # Punjabi "mr": "๐Ÿ‡ฎ๐Ÿ‡ณ", # Marathi "gu": "๐Ÿ‡ฎ๐Ÿ‡ณ", # Gujarati "ta": "๐Ÿ‡ฎ๐Ÿ‡ณ", # Tamil "te": "๐Ÿ‡ฎ๐Ÿ‡ณ", # Telugu "kn": "๐Ÿ‡ฎ๐Ÿ‡ณ", # Kannada "ml": "๐Ÿ‡ฎ๐Ÿ‡ณ", # Malayalam "th": "๐Ÿ‡น๐Ÿ‡ญ", # Thai "vi": "๐Ÿ‡ป๐Ÿ‡ณ", # Vietnamese "id": "๐Ÿ‡ฎ๐Ÿ‡ฉ", # Indonesian "tr": "๐Ÿ‡น๐Ÿ‡ท", # Turkish "he": "๐Ÿ‡ฎ๐Ÿ‡ฑ", # Hebrew "fa": "๐Ÿ‡ฎ๐Ÿ‡ท", # Persian "uk": "๐Ÿ‡บ๐Ÿ‡ฆ", # Ukrainian "el": "๐Ÿ‡ฌ๐Ÿ‡ท", # Greek "lt": "๐Ÿ‡ฑ๐Ÿ‡น", # Lithuanian "lv": "๐Ÿ‡ฑ๐Ÿ‡ป", # Latvian "et": "๐Ÿ‡ช๐Ÿ‡ช", # Estonian "pl": "๐Ÿ‡ต๐Ÿ‡ฑ", # Polish "cs": "๐Ÿ‡จ๐Ÿ‡ฟ", # Czech "sk": "๐Ÿ‡ธ๐Ÿ‡ฐ", # Slovak "hu": "๐Ÿ‡ญ๐Ÿ‡บ", # Hungarian "ro": "๐Ÿ‡ท๐Ÿ‡ด", # Romanian "bg": "๐Ÿ‡ง๐Ÿ‡ฌ", # Bulgarian "sr": "๐Ÿ‡ท๐Ÿ‡ธ", # Serbian "hr": "๐Ÿ‡ญ๐Ÿ‡ท", # Croatian "sl": "๐Ÿ‡ธ๐Ÿ‡ฎ", # Slovenian "nl": "๐Ÿ‡ณ๐Ÿ‡ฑ", # Dutch "da": "๐Ÿ‡ฉ๐Ÿ‡ฐ", # Danish "fi": "๐Ÿ‡ซ๐Ÿ‡ฎ", # Finnish "sv": "๐Ÿ‡ธ๐Ÿ‡ช", # Swedish "no": "๐Ÿ‡ณ๐Ÿ‡ด", # Norwegian "ms": "๐Ÿ‡ฒ๐Ÿ‡พ", # Malay "la": "๐Ÿ’ƒ๐Ÿป", # Latino } def get_language_emoji(language: str): language_formatted = language.lower() return ( languages_emojis[language_formatted] if language_formatted in languages_emojis else language ) translation_table = { "ฤ": "a", "ฤƒ": "a", "ฤ…": "a", "ฤ‡": "c", "ฤ": "c", "รง": "c", "ฤ‰": "c", "ฤ‹": "c", "ฤ": "d", "ฤ‘": "d", "รจ": "e", "รฉ": "e", "รช": "e", "รซ": "e", "ฤ“": "e", "ฤ•": "e", "ฤ™": "e", "ฤ›": "e", "ฤ": "g", "ฤŸ": "g", "ฤก": "g", "ฤฃ": "g", "ฤฅ": "h", "รฎ": "i", "รฏ": "i", "รฌ": "i", "รญ": "i", "ฤซ": "i", "ฤฉ": "i", "ฤญ": "i", "ฤฑ": "i", "ฤต": "j", "ฤท": "k", "ฤบ": "l", "ฤผ": "l", "ล‚": "l", "ล„": "n", "ลˆ": "n", "รฑ": "n", "ล†": "n", "ล‰": "n", "รณ": "o", "รด": "o", "รต": "o", "รถ": "o", "รธ": "o", "ล": "o", "ล‘": "o", "ล“": "oe", "ล•": "r", "ล™": "r", "ล—": "r", "ลก": "s", "ลŸ": "s", "ล›": "s", "ศ™": "s", "รŸ": "ss", "ลฅ": "t", "ลฃ": "t", "ลซ": "u", "ลญ": "u", "ลฉ": "u", "รป": "u", "รผ": "u", "รน": "u", "รบ": "u", "ลณ": "u", "ลฑ": "u", "ลต": "w", "รฝ": "y", "รฟ": "y", "ลท": "y", "ลพ": "z", "ลผ": "z", "ลบ": "z", "รฆ": "ae", "วŽ": "a", "วง": "g", "ษ™": "e", "ฦ’": "f", "ว": "i", "ว’": "o", "ว”": "u", "วš": "u", "วœ": "u", "วน": "n", "วป": "a", "วฝ": "ae", "วฟ": "o", } translation_table = str.maketrans(translation_table) info_hash_pattern = re.compile(r"\b([a-fA-F0-9]{40})\b") def translate(title: str): return title.translate(translation_table) def is_video(title: str): return title.endswith( tuple( [ ".mkv", ".mp4", ".avi", ".mov", ".flv", ".wmv", ".webm", ".mpg", ".mpeg", ".m4v", ".3gp", ".3g2", ".ogv", ".ogg", ".drc", ".gif", ".gifv", ".mng", ".avi", ".mov", ".qt", ".wmv", ".yuv", ".rm", ".rmvb", ".asf", ".amv", ".m4p", ".m4v", ".mpg", ".mp2", ".mpeg", ".mpe", ".mpv", ".mpg", ".mpeg", ".m2v", ".m4v", ".svi", ".3gp", ".3g2", ".mxf", ".roq", ".nsv", ".flv", ".f4v", ".f4p", ".f4a", ".f4b", ] ) ) def bytes_to_size(bytes: int): sizes = ["Bytes", "KB", "MB", "GB", "TB"] if bytes == 0: return "0 Byte" i = 0 while bytes >= 1024 and i < len(sizes) - 1: bytes /= 1024 i += 1 return f"{round(bytes, 2)} {sizes[i]}" def config_check(b64config: str): try: config = json.loads(base64.b64decode(b64config).decode()) validated_config = ConfigModel(**config) return validated_config.model_dump() except: return False def get_debrid_extension(debridService: str): debrid_extension = None if debridService == "realdebrid": debrid_extension = "RD" elif debridService == "alldebrid": debrid_extension = "AD" elif debridService == "premiumize": debrid_extension = "PM" elif debridService == "torbox": debrid_extension = "TB" elif debridService == "debridlink": debrid_extension = "DL" return debrid_extension async def get_indexer_manager( session: aiohttp.ClientSession, indexer_manager_type: str, indexers: list, query: str, ): results = [] try: indexers = [indexer.replace("_", " ") for indexer in indexers] if indexer_manager_type == "jackett": async def fetch_jackett_results( session: aiohttp.ClientSession, indexer: str, query: str ): try: async with session.get( f"{settings.INDEXER_MANAGER_URL}/api/v2.0/indexers/all/results?apikey={settings.INDEXER_MANAGER_API_KEY}&Query={query}&Tracker[]={indexer}", timeout=aiohttp.ClientTimeout( total=settings.INDEXER_MANAGER_TIMEOUT ), ) as response: response_json = await response.json() return response_json.get("Results", []) except Exception as e: logger.warning( f"Exception while fetching Jackett results for indexer {indexer}: {e}" ) return [] tasks = [ fetch_jackett_results(session, indexer, query) for indexer in indexers ] all_results = await asyncio.gather(*tasks) for result_set in all_results: results.extend(result_set) elif indexer_manager_type == "prowlarr": get_indexers = await session.get( f"{settings.INDEXER_MANAGER_URL}/api/v1/indexer", headers={"X-Api-Key": settings.INDEXER_MANAGER_API_KEY}, ) get_indexers = await get_indexers.json() indexers_id = [] for indexer in get_indexers: if ( indexer["name"].lower() in indexers or indexer["definitionName"].lower() in indexers ): indexers_id.append(indexer["id"]) response = await session.get( f"{settings.INDEXER_MANAGER_URL}/api/v1/search?query={query}&indexerIds={'&indexerIds='.join(str(indexer_id) for indexer_id in indexers_id)}&type=search", headers={"X-Api-Key": settings.INDEXER_MANAGER_API_KEY}, ) response = await response.json() for result in response: result["InfoHash"] = ( result["infoHash"] if "infoHash" in result else None ) result["Title"] = result["title"] result["Size"] = result["size"] result["Link"] = ( result["downloadUrl"] if "downloadUrl" in result else None ) result["Tracker"] = result["indexer"] results.append(result) except Exception as e: logger.warning( f"Exception while getting {indexer_manager_type} results for {query} with {indexers}: {e}" ) pass return results async def get_zilean( session: aiohttp.ClientSession, name: str, log_name: str, season: int, episode: int ): results = [] try: show = f"&season={season}&episode={episode}" get_dmm = await session.get( f"{settings.ZILEAN_URL}/dmm/filtered?query={name}{show if season else ''}" ) get_dmm = await get_dmm.json() if isinstance(get_dmm, list): take_first = get_dmm[: settings.ZILEAN_TAKE_FIRST] for result in take_first: object = { "Title": result["raw_title"], "InfoHash": result["info_hash"], "Size": result["size"], "Tracker": "DMM", } results.append(object) logger.info(f"{len(results)} torrents found for {log_name} with Zilean") except Exception as e: logger.warning( f"Exception while getting torrents for {log_name} with Zilean: {e}" ) pass return results async def get_torrentio(log_name: str, type: str, full_id: str): results = [] try: try: get_torrentio = requests.get( f"https://torrentio.strem.fun/stream/{type}/{full_id}.json" ).json() except: get_torrentio = requests.get( f"https://torrentio.strem.fun/stream/{type}/{full_id}.json", proxies={ "http": settings.DEBRID_PROXY_URL, "https": settings.DEBRID_PROXY_URL, }, ).json() for torrent in get_torrentio["streams"]: title = torrent["title"] title_full = title.split("\n๐Ÿ‘ค")[0] tracker = title.split("โš™๏ธ ")[1].split("\n")[0] results.append( { "Title": title_full, "InfoHash": torrent["infoHash"], "Size": None, "Tracker": f"Torrentio|{tracker}", } ) logger.info(f"{len(results)} torrents found for {log_name} with Torrentio") except Exception as e: logger.warning( f"Exception while getting torrents for {log_name} with Torrentio, your IP is most likely blacklisted (you should try proxying Comet): {e}" ) pass return results async def filter(torrents: list, name: str, year: int): results = [] for torrent in torrents: index = torrent[0] title = torrent[1] if "\n" in title: # Torrentio title parsing title = title.split("\n")[1] parsed = parse(title) if not title_match(name, parsed.parsed_title): results.append((index, False)) continue if year and parsed.year and year != parsed.year: results.append((index, False)) continue results.append((index, True)) return results async def get_torrent_hash(session: aiohttp.ClientSession, torrent: tuple): index = torrent[0] torrent = torrent[1] if "InfoHash" in torrent and torrent["InfoHash"] is not None: return (index, torrent["InfoHash"].lower()) url = torrent["Link"] try: timeout = aiohttp.ClientTimeout(total=settings.GET_TORRENT_TIMEOUT) response = await session.get(url, allow_redirects=False, timeout=timeout) if response.status == 200: torrent_data = await response.read() torrent_dict = bencodepy.decode(torrent_data) info = bencodepy.encode(torrent_dict[b"info"]) hash = hashlib.sha1(info).hexdigest() else: location = response.headers.get("Location", "") if not location: return (index, None) match = info_hash_pattern.search(location) if not match: return (index, None) hash = match.group(1).upper() return (index, hash.lower()) except Exception as e: logger.warning( f"Exception while getting torrent info hash for {torrent['indexer'] if 'indexer' in torrent else (torrent['Tracker'] if 'Tracker' in torrent else '')}|{url}: {e}" ) return (index, None) def get_balanced_hashes(hashes: dict, config: dict): max_results = config["maxResults"] max_size = config["maxSize"] config_resolutions = [resolution.lower() for resolution in config["resolutions"]] include_all_resolutions = "all" in config_resolutions languages = [language.lower() for language in config["languages"]] include_all_languages = "all" in languages if not include_all_languages: config_languages = [ code for code, name in PTT.parse.LANGUAGES_TRANSLATION_TABLE.items() if name.lower() in languages ] hashes_by_resolution = {} for hash, hash_data in hashes.items(): hash_info = hash_data["data"] if max_size != 0 and hash_info["size"] > max_size: continue if ( not include_all_languages and not any(lang in hash_info["languages"] for lang in config_languages) and ("multi" not in languages if hash_info["dubbed"] else True) ): continue resolution = hash_info["resolution"] if not include_all_resolutions and resolution not in config_resolutions: continue if resolution not in hashes_by_resolution: hashes_by_resolution[resolution] = [] hashes_by_resolution[resolution].append(hash) total_resolutions = len(hashes_by_resolution) if max_results == 0 or total_resolutions == 0: return hashes_by_resolution hashes_per_resolution = max_results // total_resolutions extra_hashes = max_results % total_resolutions balanced_hashes = {} for resolution, hash_list in hashes_by_resolution.items(): selected_count = hashes_per_resolution + (1 if extra_hashes > 0 else 0) balanced_hashes[resolution] = hash_list[:selected_count] if extra_hashes > 0: extra_hashes -= 1 selected_total = sum(len(hashes) for hashes in balanced_hashes.values()) if selected_total < max_results: missing_hashes = max_results - selected_total for resolution, hash_list in hashes_by_resolution.items(): if missing_hashes <= 0: break current_count = len(balanced_hashes[resolution]) available_hashes = hash_list[current_count : current_count + missing_hashes] balanced_hashes[resolution].extend(available_hashes) missing_hashes -= len(available_hashes) return balanced_hashes def format_metadata(data: dict): extras = [] if data["quality"]: extras.append(data["quality"]) if data["hdr"]: extras.extend(data["hdr"]) if data["codec"]: extras.append(data["codec"]) if data["audio"]: extras.extend(data["audio"]) if data["channels"]: extras.extend(data["channels"]) if data["bit_depth"]: extras.append(data["bit_depth"]) if data["network"]: extras.append(data["network"]) if data["group"]: extras.append(data["group"]) return "|".join(extras) def format_title(data: dict, config: dict): title = "" if "All" in config["resultFormat"] or "Title" in config["resultFormat"]: title += f"{data['title']}\n" if "All" in config["resultFormat"] or "Metadata" in config["resultFormat"]: metadata = format_metadata(data) if metadata != "": title += f"๐Ÿ’ฟ {metadata}\n" if "All" in config["resultFormat"] or "Size" in config["resultFormat"]: title += f"๐Ÿ’พ {bytes_to_size(data['size'])} " if "All" in config["resultFormat"] or "Tracker" in config["resultFormat"]: title += f"๐Ÿ”Ž {data['tracker'] if 'tracker' in data else '?'}" if "All" in config["resultFormat"] or "Languages" in config["resultFormat"]: languages = data["languages"] if data["dubbed"]: languages.insert(0, "multi") formatted_languages = ( "/".join(get_language_emoji(language) for language in languages) if languages else None ) languages_str = "\n" + formatted_languages if formatted_languages else "" title += f"{languages_str}" if title == "": # Without this, Streamio shows SD as the result, which is confusing title = "Empty result format configuration" return title def get_client_ip(request: Request): return ( request.headers["cf-connecting-ip"] if "cf-connecting-ip" in request.headers else request.client.host )