import os import uuid import logging from pathlib import Path from typing import Optional, Literal, Union, Dict, Any # Import Union from urllib.parse import urlparse import requests import cloudscraper import re # --- FastAPI Imports --- from fastapi import FastAPI, Request, HTTPException, BackgroundTasks, Body from fastapi.responses import JSONResponse, FileResponse from fastapi.staticfiles import StaticFiles from pydantic import BaseModel, HttpUrl, Field, field_validator # Import field_validator # --- yt-dlp Import --- from yt_dlp import YoutubeDL # --- Logging Configuration --- logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # --- Constants --- DOWNLOAD_DIR = Path('downloads') # Use pathlib for paths COOKIE_FILE = 'www.youtube.com_cookies.txt' # Define cookie file path # --- Create Download Directory --- DOWNLOAD_DIR.mkdir(parents=True, exist_ok=True) # --- FastAPI App Initialization --- app = FastAPI( title="tesings", description="API to fetch info", version="1.4.0", # Incremented version ) # --- Mount Static Files Directory --- app.mount("/downloads", StaticFiles(directory=DOWNLOAD_DIR), name="downloads") # --- Pydantic Models for Request/Response Validation --- class UrlRequest(BaseModel): """Request model for endpoints needing just a URL.""" url: HttpUrl # Define allowed quality string literals (including numerical ones) AllowedQualityStr = Literal['best', '240', '480', '720', '1080', '1440', '2160'] class MaxDownloadRequest(BaseModel): """Request model for the /max endpoint.""" url: HttpUrl # Accept 'best' or specific numerical resolutions as strings quality: Optional[AllowedQualityStr] = 'best' class InfoResponse(BaseModel): """Response model for the /get-info endpoint.""" title: Optional[str] = None thumbnail: Optional[str] = None duration: Optional[float] = None channel: Optional[str] = None class DownloadResponse(BaseModel): """Response model for download endpoints.""" url: str filename: str message: Optional[str] = None class ErrorResponse(BaseModel): """Standard error response model.""" detail: str # --- Helper Function for Download --- def perform_download(ydl_opts: dict, url: str, file_path: Path): """Synchronously downloads using yt-dlp.""" try: logger.info(f"Starting download for URL: {url} with options: {ydl_opts}") ydl_opts['outtmpl'] = str(file_path.with_suffix('.%(ext)s')) with YoutubeDL(ydl_opts) as ydl: ydl.extract_info(url, download=True) logger.info(f"Download finished successfully for URL: {url}") downloaded_files = list(DOWNLOAD_DIR.glob(f"{file_path.stem}.*")) if not downloaded_files: logger.error(f"Download completed but no file found for stem: {file_path.stem}") part_files = list(DOWNLOAD_DIR.glob(f"{file_path.stem}.*.part")) for part_file in part_files: try: os.remove(part_file) logger.info(f"Removed leftover part file: {part_file}") except OSError as rm_err: logger.error(f"Error removing part file {part_file}: {rm_err}") raise RuntimeError(f"Could not find downloaded file for {url}") return downloaded_files[0] except Exception as e: logger.error(f"yt-dlp download failed for URL {url}: {e}", exc_info=True) possible_files = list(DOWNLOAD_DIR.glob(f"{file_path.stem}.*")) for f in possible_files: if f.is_file(): try: os.remove(f) logger.info(f"Removed potentially incomplete/failed file: {f}") except OSError as rm_err: logger.error(f"Error removing file {f}: {rm_err}") raise # --- API Endpoints --- @app.get("/") async def root(): """Root endpoint providing basic API info.""" return {"message": "Running in errors."} @app.post( "/get-info", response_model=InfoResponse, responses={500: {"model": ErrorResponse}} ) async def get_info(payload: UrlRequest = Body(...)): """ Extracts video information (title, thumbnail, duration, channel) from a given URL. """ logger.info(f"Received /get-info request for URL: {payload.url}") ydl_opts = {} if os.path.exists(COOKIE_FILE): ydl_opts['cookiefile'] = COOKIE_FILE logger.info("Using cookie file.") else: logger.warning(f"Cookie file '{COOKIE_FILE}' not found. Some videos might require login/cookies.") try: # Use str(payload.url) to pass the URL string to yt-dlp with YoutubeDL(ydl_opts) as ydl: info = ydl.extract_info(str(payload.url), download=False) return InfoResponse( title=info.get('title'), thumbnail=info.get('thumbnail'), duration=info.get('duration'), channel=info.get('channel') ) except Exception as e: logger.error(f"Error fetching info for {payload.url}: {e}", exc_info=True) raise HTTPException(status_code=500, detail=f"Failed to extract video info: {str(e)}") '''@app.post( "/download", response_model=DownloadResponse, responses={400: {"model": ErrorResponse}, 500: {"model": ErrorResponse}} ) async def download_audio(request: Request, payload: UrlRequest = Body(...)): """ Downloads the audio track of a video as an MP3 file (128kbps). """ logger.info(f"Received /download (audio) request for URL: {payload.url}") unique_id = str(uuid.uuid4()) file_path_stem = DOWNLOAD_DIR / unique_id ydl_opts = { 'format': '140/m4a/bestaudio/best', 'outtmpl': str(file_path_stem.with_suffix('.%(ext)s')), 'postprocessors': [{ 'key': 'FFmpegExtractAudio', 'preferredcodec': 'mp3', 'preferredquality': '128', }], 'noplaylist': True, 'quiet': False, 'progress_hooks': [lambda d: logger.debug(f"Download progress: {d['status']} - {d.get('_percent_str', '')}")], } if os.path.exists(COOKIE_FILE): ydl_opts['cookiefile'] = COOKIE_FILE logger.info("Using cookie file for audio download.") else: logger.warning(f"Cookie file '{COOKIE_FILE}' not found for audio download.") try: # Use str(payload.url) to pass the URL string to the helper final_file_path = perform_download(ydl_opts, str(payload.url), file_path_stem) final_filename = final_file_path.name download_url = f"{str(request.base_url).rstrip('/')}/downloads/{final_filename}" logger.info(f"Audio download complete for {payload.url}. URL: {download_url}") return DownloadResponse(url=download_url, filename=final_filename) except Exception as e: # Error logged in perform_download raise HTTPException(status_code=500, detail=f"Audio download failed: {str(e)}") ''' yt_api = os.getenv("yt_api") class ApiRotator: def __init__(self, apis): self.apis = apis self.last_successful_index = None def get_prioritized_apis(self): if self.last_successful_index is not None: # Move the last successful API to the front rotated_apis = ( [self.apis[self.last_successful_index]] + self.apis[:self.last_successful_index] + self.apis[self.last_successful_index+1:] ) return rotated_apis return self.apis def update_last_successful(self, index): self.last_successful_index = index # In your function: api_rotator = ApiRotator([ yt_api, "https://dwnld.nichind.dev", "https://chrunos-load.hf.space", "https://cobalt-api.kwiatekmiki.com" ]) async def get_track_download_url(video_url: str, quality: str) -> str: apis = api_rotator.get_prioritized_apis() session = cloudscraper.create_scraper() # Requires cloudscraper package headers = { "Accept": "application/json", "Content-Type": "application/json", "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36" } quality_lower = quality.lower() if quality_lower == "max": body_json = {"url": video_url, "videoQuality": "max", "filenameStyle": "pretty", "youtubeVideoCodec": "vp9"} else: body_json = {"url": video_url, "videoQuality": quality, "filenameStyle": "pretty", "youtubeVideoCodec": "h264"} for i, api_url in enumerate(apis): try: logger.info(f"Attempting to get download URL from: {api_url}") y_url = video_url response = session.post( api_url, timeout=20, json=body_json, headers=headers ) logger.info(f"Response status: {response.status_code}") logger.info(f"Response content: {response.content}") if response.headers.get('content-type', '').startswith('application/json'): json_response = response.json() error_code = json_response.get("error", {}).get("code", "") if error_code == "error.api.content.video.age": logger.warning(f"Video unavailable error from {api_url}") break # Only break for specific error if "url" in json_response or "picker" in json_response: api_rotator.update_last_successful(i) return json_response except Exception as e: logger.error(f"Failed with {api_url}: {str(e)}") continue logger.error(f"No download URL found") return {"error": "Download URL not found"} reads_api = os.getenv("reads_api") def call_extract_endpoint(post_url: str) -> Dict[str, Any]: try: response = requests.get(f"{reads_api}/extract", params={"url": post_url}) response.raise_for_status() return response.json() except requests.RequestException as e: raise requests.RequestException(f"Failed to call extract endpoint: {str(e)}") except ValueError as e: raise ValueError(f"Invalid response format: {str(e)}") def is_threads_url(url: str) -> bool: """Validate if URL is a valid Threads URL""" try: parsed = urlparse(url) logger.info(parsed) # Check if netloc matches known Threads domains if parsed.netloc not in ['threads.net', 'www.threads.net', 'threads.com', 'www.threads.com']: return False # Check if path contains /post/ or /t/ after a username if '/post/' in parsed.path or '/t/' in parsed.path: return True return False except Exception: return False @app.post("/multi") async def multi_download(request: Request): data = await request.json() video_url = data.get('url') quality = data.get('videoQuality') logger.info(f'{video_url}, {quality}') if not video_url: raise HTTPException( status_code=400, detail={"error": "Input 'url' is required."} ) # Basic URL validation: checks if it starts with http/https and has some content after. # For more robust validation, consider using a library like 'validators'. if not re.match(r'^https?://\S+', str(video_url)): # Ensure video_url is treated as string for regex raise HTTPException( status_code=400, detail={"error": f"Input 'url' ('{video_url}') is not a valid URL. It must start with http:// or https://."} ) if not quality: # Checks for None or empty string raise HTTPException( status_code=400, detail={"error": "This version of shortcut is outdated. Get the latest version of Chrunos Multi Downloader shortcut."} ) if quality == "mp3": parameter = 'type=audio' else: parameter = f'type=video&quality={quality}' if is_threads_url(video_url): return call_extract_endpoint(video_url) else: dl_url = await get_track_download_url(video_url, quality) if dl_url: return dl_url else: return { "error": "Failed to Fetch the video." } @app.post( "/max", response_model=DownloadResponse, responses={400: {"model": ErrorResponse}, 500: {"model": ErrorResponse}} ) async def download_video_max_quality(request: Request, payload: MaxDownloadRequest = Body(...)): """ Downloads the video in the specified quality or 'best' available, handling both landscape and portrait videos correctly. Attempts H.264 codec for 1080 and lower. Merges video and audio into MP4. Accepted qualities: 'best', '240', '480', '720', '1080', '1440', '2160'. Quality number (as string) refers to the maximum dimension (height or width). """ logger.info(f"Received /max (video) request for URL: {payload.url} with quality: {payload.quality}") unique_id = str(uuid.uuid4()) file_path_stem = DOWNLOAD_DIR / unique_id # --- Determine yt-dlp Format Selector based on Quality and Codec Preference --- quality_str = payload.quality # Quality is now guaranteed to be a string from AllowedQualityStr format_selector = None max_dim = 0 # Initialize max_dim if quality_str == 'best': format_selector = 'bestvideo+bestaudio/best' # Best video and audio, merged if possible logger.info("Using format selector for 'best' quality.") else: # Quality is a numerical string ('240', '480', etc.) try: # Convert the validated string quality to an integer for logic max_dim = int(quality_str) except ValueError: # This should not happen if Pydantic validation works, but good practice logger.error(f"Internal error: Could not convert validated quality string '{quality_str}' to int. Falling back to 'best'.") format_selector = 'bestvideo+bestaudio/best' # Set max_dim to a high value to skip specific logic below if format_selector is set max_dim = 99999 # Only proceed if format_selector wasn't set in the except block long_edge = int(max_dim * 1.8) if not format_selector: # --- Codec Preference Logic --- if max_dim <= 1080: # Prefer H.264 (avc1) for 1080 or lower max dimension logger.info(f"Attempting H.264 codec for requested quality (max dimension): {max_dim}") format_selector = f'bestvideo[vcodec^=avc][height<={long_edge}][width<={long_edge}]+bestaudio/best' #f'bestvideo[height<={long_edge}]/bestvideo[width<={long_edge}]+bestaudio/' #f'best[height<={long_edge}]/best[width<={long_edge}]' else: # For > 1080 max dimension, prioritize best available codec logger.info(f"Attempting best available codec for requested quality (max dimension): {max_dim}") format_selector = f'bestvideo[height<={long_edge}][width<={long_edge}]+bestaudio/best' logger.info(f"Using format selector: '{format_selector}'") # --- yt-dlp Options for Video Download --- ydl_opts = { 'format': format_selector, 'outtmpl': str(file_path_stem.with_suffix('.%(ext)s')), 'merge_output_format': 'mp4', # Merge into MP4 container 'noplaylist': True, 'quiet': True, 'noprogress': True } if os.path.exists(COOKIE_FILE): ydl_opts['cookiefile'] = COOKIE_FILE logger.info("Using cookie file for video download.") else: logger.warning(f"Cookie file '{COOKIE_FILE}' not found for video download.") try: # Use str(payload.url) to pass the URL string to the helper final_file_path = perform_download(ydl_opts, str(payload.url), file_path_stem) final_filename = final_file_path.name download_url = f"{str(request.base_url).rstrip('/')}/downloads/{final_filename}" logger.info(f"Video download complete for {payload.url}. URL: {download_url}") # Changed 'download_url=' to 'url=' return DownloadResponse(url=download_url, filename=final_filename) except Exception as e: # Error logged in perform_download raise HTTPException(status_code=500, detail=f"Video download failed: {str(e)}") # --- Optional: Cleanup Task --- async def cleanup_old_files(directory: Path, max_age_seconds: int): """Removes files older than max_age_seconds in the background.""" import time now = time.time() count = 0 try: for item in directory.iterdir(): if item.is_file(): try: if now - item.stat().st_mtime > max_age_seconds: os.remove(item) logger.info(f"Cleaned up old file: {item.name}") count += 1 except OSError as e: logger.error(f"Error removing file {item}: {e}") if count > 0: logger.info(f"Background cleanup finished. Removed {count} old files.") else: logger.info("Background cleanup finished. No old files found.") except Exception as e: logger.error(f"Error during background file cleanup: {e}", exc_info=True) @app.post("/trigger-cleanup") async def trigger_cleanup(background_tasks: BackgroundTasks): """Manually trigger a cleanup of files older than 1 day.""" logger.info("Triggering background cleanup of old download files.") background_tasks.add_task(cleanup_old_files, DOWNLOAD_DIR, 86400) # 1 day return {"message": "Background cleanup task scheduled."}