import os import shutil import subprocess import uuid import logging import asyncio from pathlib import Path from typing import Dict import httpx from fastapi import FastAPI, HTTPException, Request, BackgroundTasks, status from fastapi.responses import JSONResponse from fastapi.staticfiles import StaticFiles from yt_dlp import YoutubeDL # --- Basic Configuration --- logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") BASE_URL = os.getenv("BASE_URL") TEMP_DIR = Path("/tmp/downloads") STATIC_DIR = Path("static") TEMP_DIR.mkdir(exist_ok=True) STATIC_DIR.mkdir(exist_ok=True) FILE_LIFETIME_SECONDS = 1800 # 30 minutes # --- In-memory store for task statuses --- # In a real-world, scalable application, you'd use a database like Redis or a message queue. # For a Hugging Face Space, this simple dictionary is sufficient. task_statuses: Dict[str, Dict] = {} # --- FastAPI App Initialization --- app = FastAPI( title="Async Video Processor", description="An API to process videos asynchronously without timeouts." ) app.mount("/static", StaticFiles(directory=STATIC_DIR), name="static") # --- Helper Functions and Background Worker --- async def cleanup_file(filepath: Path): await asyncio.sleep(FILE_LIFETIME_SECONDS) try: if filepath.parent.exists(): shutil.rmtree(filepath.parent) logging.info(f"Cleaned up directory: {filepath.parent}") except Exception as e: logging.error(f"Error during cleanup of {filepath.parent}: {e}") def get_best_formats_with_fallback(data: dict, requested_quality: int): """ Parses the Info API response to find the best matching video format with a robust fallback, and the best audio format. """ if "formats" not in data: raise ValueError("The 'formats' key is missing from the Info API response.") # --- Video Selection (Revised and Improved) --- video_url = None # Filter for any video-only stream that has resolution info video_formats = [ f for f in data.get("formats", []) if f.get("vcodec") not in (None, "none") and f.get("acodec") == "none" and f.get("height") ] # Sort by height from best to worst video_formats.sort(key=lambda f: f["height"], reverse=True) if not video_formats: raise ValueError("Could not find any suitable video-only streams in the API response.") # Try to find the best format that is at or below the requested quality selected_format = None for f in video_formats: if f["height"] <= requested_quality: selected_format = f break # Found the best possible match # If no match was found (e.g., user requested 144p but only 360p+ is available), # then default to the absolute best quality available. if selected_format is None: selected_format = video_formats[0] # Fallback to the best overall logging.warning( f"Requested quality ({requested_quality}p) is not available. " f"Falling back to best available quality: {selected_format.get('height')}p." ) video_url = selected_format.get("url") logging.info( f"Selected video: {selected_format.get('height')}p " f"(format_id: {selected_format.get('format_id')}, ext: {selected_format.get('ext')})" ) # --- Audio Selection --- audio_url = None audio_formats = [ f for f in data.get("formats", []) if f.get("acodec") not in (None, "none") and f.get("vcodec") == "none" ] if audio_formats: audio_formats.sort(key=lambda f: f.get("abr", 0) or 0, reverse=True) selected_audio = audio_formats[0] audio_url = selected_audio.get("url") logging.info( f"Selected best audio: format_id {selected_audio.get('format_id')}, " f"bitrate {selected_audio.get('abr', 'N/A')}k, " f"codec {selected_audio.get('acodec')}" ) # --- Final Check --- if not video_url or not audio_url: raise ValueError("Could not find a suitable video and/or audio stream.") return video_url, audio_url def run_ytdlp_download(url: str, out_path: Path): """ Runs a single yt-dlp download using the high-speed 'aria2c' downloader. """ ydl_opts = { 'outtmpl': str(out_path), 'quiet': True, 'noprogress': True, # --- SPEED OPTIMIZATION --- # Use the aria2c external downloader for multi-connection downloads 'external_downloader': 'aria2c', # Arguments to pass to aria2c for maximum speed 'external_downloader_args': [ '--min-split-size=1M', '--max-connection-per-server=16', '--max-concurrent-downloads=16', '--split=16' ], } with YoutubeDL(ydl_opts) as ydl: ydl.download([url]) async def process_in_background( task_id: str, video_url: str, quality: int, base_url_for_links: str, background_tasks: BackgroundTasks ): """ This is the main worker function that runs in the background. """ task_statuses[task_id] = {"status": "processing", "message": "Fetching video info..."} try: # Step 1: Get video info async with httpx.AsyncClient(follow_redirects=True) as client: info_api_url = f"{BASE_URL.rstrip('/')}/api/info" response = await client.get(info_api_url, params={"url": video_url, "playlist": "false"}, timeout=30.0) response.raise_for_status() video_data = response.json() video_stream_url, audio_stream_url = get_best_formats_with_fallback(video_data, quality) # Step 2: Download files task_statuses[task_id]["message"] = "Downloading video and audio streams..." final_output_dir = STATIC_DIR / task_id final_output_dir.mkdir() video_path = TEMP_DIR / f"{task_id}_video.mp4" audio_path = TEMP_DIR / f"{task_id}_audio.m4a" video_dl_task = asyncio.to_thread(run_ytdlp_download, video_stream_url, video_path) audio_dl_task = asyncio.to_thread(run_ytdlp_download, audio_stream_url, audio_path) await asyncio.gather(video_dl_task, audio_dl_task) # Step 3: Merge files task_statuses[task_id]["message"] = "Merging files with FFmpeg..." final_output_path = final_output_dir / "video.mp4" subprocess.run( ['ffmpeg', '-i', str(video_path), '-i', str(audio_path), '-c', 'copy', str(final_output_path)], check=True, capture_output=True, text=True ) # Step 4: Finalize and set status to complete download_url = f"{base_url_for_links.rstrip('/')}/static/{task_id}/video.mp4" task_statuses[task_id] = { "status": "complete", "download_url": download_url, "expires_in": f"{FILE_LIFETIME_SECONDS} seconds" } background_tasks.add_task(cleanup_file, final_output_path) except Exception as e: logging.error(f"Task {task_id} failed: {e}") task_statuses[task_id] = {"status": "failed", "error": str(e)} finally: if 'video_path' in locals() and video_path.exists(): video_path.unlink() if 'audio_path' in locals() and audio_path.exists(): audio_path.unlink() # --- API Endpoints --- @app.post("/api/process", status_code=status.HTTP_202_ACCEPTED) async def start_processing_job(request: Request, background_tasks: BackgroundTasks): """ Accepts a job and starts it in the background. Returns a task ID immediately. """ body = await request.json() video_url = body.get("url") quality = int(body.get("quality", "1080")) if not video_url: raise HTTPException(status_code=400, detail="A 'url' is required.") task_id = str(uuid.uuid4()) task_statuses[task_id] = {"status": "queued"} # We need the base URL of our own app to construct the final download link base_url_for_links = str(request.base_url) background_tasks.add_task( process_in_background, task_id, video_url, quality, base_url_for_links, background_tasks ) status_url = request.url_for('get_job_status', task_id=task_id) return {"task_id": task_id, "status_url": str(status_url)} @app.get("/api/status/{task_id}") async def get_job_status(task_id: str): """ Allows the client to poll for the status of a background job. """ status_info = task_statuses.get(task_id) if not status_info: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Task not found") return status_info