import time from fastapi import FastAPI, Request, HTTPException from fastapi.concurrency import run_in_threadpool import yt_dlp import urllib.parse import os from datetime import datetime, timedelta from dotenv import load_dotenv import tempfile from pathlib import Path from collections import defaultdict import logging import gc from typing import Dict, Any import cloudscraper from fastapi.staticfiles import StaticFiles logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) load_dotenv() app = FastAPI() # Define a global temporary download directory global_download_dir = tempfile.mkdtemp() app.mount("/file", StaticFiles(directory=global_download_dir), name="downloads") # Rate limiting dictionary class RateLimiter: def __init__(self, max_requests: int, time_window: timedelta): self.max_requests = max_requests self.time_window = time_window self.requests: Dict[str, list] = defaultdict(list) def _cleanup_old_requests(self, user_ip: str) -> None: """Remove requests that are outside the time window.""" current_time = time.time() self.requests[user_ip] = [ timestamp for timestamp in self.requests[user_ip] if current_time - timestamp < self.time_window.total_seconds() ] def is_rate_limited(self, user_ip: str) -> bool: """Check if the user has exceeded their rate limit.""" self._cleanup_old_requests(user_ip) # Get current count after cleanup current_count = len(self.requests[user_ip]) # Add current request timestamp (incrementing the count) current_time = time.time() self.requests[user_ip].append(current_time) # Check if user has exceeded the maximum requests return (current_count + 1) > self.max_requests def get_current_count(self, user_ip: str) -> int: """Get the current request count for an IP.""" self._cleanup_old_requests(user_ip) return len(self.requests[user_ip]) # Initialize rate limiter with 100 requests per day rate_limiter = RateLimiter( max_requests=12, time_window=timedelta(days=1) ) def get_user_ip(request: Request) -> str: """Helper function to get user's IP address.""" forwarded = request.headers.get("X-Forwarded-For") if forwarded: return forwarded.split(",")[0] return request.client.host restricted_domain = "chrunos.com" TRACT_API = os.getenv("EXTRACT_API") ALT_API = os.getenv("ALT_API") target_domains = [ "pornhub.com", "xhamster.com", "eporner.com", "youporn.com" ] def extract_video_info(video_url: str) -> str: if any(domain in video_url for domain in target_domains): EXTRACT_API = TRACT_API else: EXTRACT_API = ALT_API api_url = f'{EXTRACT_API}?url={video_url}' logger.info(api_url) session = cloudscraper.create_scraper() try: response = session.get(api_url, timeout=20) if response.status_code == 200: json_response = response.json() result = [] # 检查 formats 列表是否存在且不为空 if 'formats' in json_response: for format_item in json_response['formats']: format_url = format_item.get('url') format_id = format_item.get('format_id') p_cookies = format_item.get('cookies') if format_id and format_url: result.append({ "url": format_url, "format_id": format_id, "cookies": p_cookies }) title = json_response.get('title') logger.info(title) if "pornhub.com" in video_url: p_result = [item for item in result if 'hls' in item['format_id']] last_item = p_result[-1] second_last_item = p_result[-2] last_item["format_id"] = f'{last_item["format_id"]} - Chrunos Shortcuts Premium Only' last_item["url"] = 'https://chrunos.com/premium-shortcuts/' second_last_item["format_id"] = f'{second_last_item["format_id"]} - Chrunos Shortcuts Premium Only' second_last_item["url"] = 'https://chrunos.com/premium-shortcuts/' return p_result else: new_result = result # Check if new_result has more than one item if len(new_result) > 3: for i in range(3, len(new_result)): item = new_result[i] item["format_id"] = f'{item["format_id"]} - Chrunos Shortcuts Premium Only' item["url"] = 'https://chrunos.com/premium-shortcuts/' elif 2 <= len(new_result) <= 3: last_item = new_result[-1] last_item["format_id"] = f'{last_item["format_id"]} - Chrunos Shortcuts Premium Only' last_item["url"] = 'https://chrunos.com/premium-shortcuts/' elif len(new_result) == 1: new_item = {"url": "https://chrunos.com/premium-shortcuts/", "format_id": "Best Qaulity Video - Chrunos Shortcuts Premium Only" } new_result.append(new_item) return new_result else: if 'url' in json_response: download_url = json_response.get('url') thumbnail_url = json_response.get('thumbnail') return [ {"url": download_url, "format_id": "Normal Quality Video" }, {"url": thumbnail_url, "format_id": "thumbnail"}, {"url": "https://chrunos.com/premium-shortcuts/", "format_id": "Best Qaulity Video - Chrunos Shortcuts Premium Only"} ] return {"error": "No formats available. Report Error on Telegram"} else: return {"error": f"Request failed with status code {response.status_code}, API: {api_url}"} except Exception as e: logger.error(f"An error occurred: {e}") return {"error": str(e)} @app.post("/test") async def test_download(request: Request): user_ip = get_user_ip(request) if rate_limiter.is_rate_limited(user_ip): current_count = rate_limiter.get_current_count(user_ip) raise HTTPException( status_code=429, detail={ "error": "You have exceeded the maximum number of requests per day. Please try again tomorrow.", "url": "https://t.me/chrunoss" } ) data = await request.json() video_url = data.get('url') response = extract_video_info(video_url) return response @app.post("/hls") async def download_hls_video(request: Request): data = await request.json() hls_url = data.get('url') base_url = str(request.base_url) timestamp = datetime.now().strftime('%Y%m%d%H%M%S') output_template = str(Path(global_download_dir) / f'%(title)s_{timestamp}.%(ext)s') ydl_opts = { 'format': 'best', 'outtmpl': output_template, 'quiet': True, 'no_warnings': True, 'noprogress': True, 'merge_output_format': 'mp4' } try: await run_in_threadpool(lambda: yt_dlp.YoutubeDL(ydl_opts).download([hls_url])) except Exception as e: return {"error": f"Download failed: {str(e)}"} downloaded_files = list(Path(global_download_dir).glob(f"*_{timestamp}.mp4")) if not downloaded_files: return {"error": "Download failed"} downloaded_file = downloaded_files[0] encoded_filename = urllib.parse.quote(downloaded_file.name) download_url = f"{base_url}file/{encoded_filename}" gc.collect() return {"url": download_url}