|
import os |
|
import shutil |
|
import subprocess |
|
import uuid |
|
import logging |
|
import asyncio |
|
from pathlib import Path |
|
from typing import Dict |
|
|
|
import httpx |
|
from fastapi import FastAPI, HTTPException, Request, BackgroundTasks, status |
|
from fastapi.responses import JSONResponse |
|
from fastapi.staticfiles import StaticFiles |
|
from yt_dlp import YoutubeDL |
|
|
|
|
|
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") |
|
BASE_URL = os.getenv("BASE_URL") |
|
TEMP_DIR = Path("/tmp/downloads") |
|
STATIC_DIR = Path("static") |
|
TEMP_DIR.mkdir(exist_ok=True) |
|
STATIC_DIR.mkdir(exist_ok=True) |
|
FILE_LIFETIME_SECONDS = 1800 |
|
|
|
|
|
|
|
|
|
task_statuses: Dict[str, Dict] = {} |
|
|
|
|
|
|
|
app = FastAPI( |
|
title="Async Video Processor", |
|
description="An API to process videos asynchronously without timeouts." |
|
) |
|
app.mount("/static", StaticFiles(directory=STATIC_DIR), name="static") |
|
|
|
|
|
|
|
async def cleanup_file(filepath: Path): |
|
await asyncio.sleep(FILE_LIFETIME_SECONDS) |
|
try: |
|
if filepath.parent.exists(): |
|
shutil.rmtree(filepath.parent) |
|
logging.info(f"Cleaned up directory: {filepath.parent}") |
|
except Exception as e: |
|
logging.error(f"Error during cleanup of {filepath.parent}: {e}") |
|
|
|
def get_best_formats_with_fallback(data: dict, requested_quality: int): |
|
""" |
|
Parses the Info API response to find the best matching video format |
|
with a robust fallback, and the best audio format. |
|
""" |
|
if "formats" not in data: |
|
raise ValueError("The 'formats' key is missing from the Info API response.") |
|
|
|
|
|
video_url = None |
|
|
|
video_formats = [ |
|
f for f in data.get("formats", []) |
|
if f.get("vcodec") not in (None, "none") and f.get("acodec") == "none" and f.get("height") |
|
] |
|
|
|
|
|
video_formats.sort(key=lambda f: f["height"], reverse=True) |
|
|
|
if not video_formats: |
|
raise ValueError("Could not find any suitable video-only streams in the API response.") |
|
|
|
|
|
selected_format = None |
|
for f in video_formats: |
|
if f["height"] <= requested_quality: |
|
selected_format = f |
|
break |
|
|
|
|
|
|
|
if selected_format is None: |
|
selected_format = video_formats[0] |
|
logging.warning( |
|
f"Requested quality ({requested_quality}p) is not available. " |
|
f"Falling back to best available quality: {selected_format.get('height')}p." |
|
) |
|
|
|
video_url = selected_format.get("url") |
|
logging.info( |
|
f"Selected video: {selected_format.get('height')}p " |
|
f"(format_id: {selected_format.get('format_id')}, ext: {selected_format.get('ext')})" |
|
) |
|
|
|
|
|
|
|
audio_url = None |
|
audio_formats = [ |
|
f for f in data.get("formats", []) |
|
if f.get("acodec") not in (None, "none") and f.get("vcodec") == "none" |
|
] |
|
if audio_formats: |
|
audio_formats.sort(key=lambda f: f.get("abr", 0) or 0, reverse=True) |
|
selected_audio = audio_formats[0] |
|
audio_url = selected_audio.get("url") |
|
logging.info( |
|
f"Selected best audio: format_id {selected_audio.get('format_id')}, " |
|
f"bitrate {selected_audio.get('abr', 'N/A')}k, " |
|
f"codec {selected_audio.get('acodec')}" |
|
) |
|
|
|
|
|
if not video_url or not audio_url: |
|
raise ValueError("Could not find a suitable video and/or audio stream.") |
|
|
|
return video_url, audio_url |
|
|
|
|
|
|
|
def run_ytdlp_download(url: str, out_path: Path): |
|
""" |
|
Runs a single yt-dlp download using the high-speed 'aria2c' downloader. |
|
""" |
|
ydl_opts = { |
|
'outtmpl': str(out_path), |
|
'quiet': True, |
|
'noprogress': True, |
|
|
|
|
|
'external_downloader': 'aria2c', |
|
|
|
'external_downloader_args': [ |
|
'--min-split-size=1M', |
|
'--max-connection-per-server=16', |
|
'--max-concurrent-downloads=16', |
|
'--split=16' |
|
], |
|
} |
|
with YoutubeDL(ydl_opts) as ydl: |
|
ydl.download([url]) |
|
|
|
async def process_in_background( |
|
task_id: str, |
|
video_url: str, |
|
quality: int, |
|
base_url_for_links: str, |
|
background_tasks: BackgroundTasks |
|
): |
|
""" |
|
This is the main worker function that runs in the background. |
|
""" |
|
task_statuses[task_id] = {"status": "processing", "message": "Fetching video info..."} |
|
try: |
|
|
|
async with httpx.AsyncClient(follow_redirects=True) as client: |
|
info_api_url = f"{BASE_URL.rstrip('/')}/api/info" |
|
response = await client.get(info_api_url, params={"url": video_url, "playlist": "false"}, timeout=30.0) |
|
response.raise_for_status() |
|
video_data = response.json() |
|
video_stream_url, audio_stream_url = get_best_formats_with_fallback(video_data, quality) |
|
|
|
|
|
task_statuses[task_id]["message"] = "Downloading video and audio streams..." |
|
final_output_dir = STATIC_DIR / task_id |
|
final_output_dir.mkdir() |
|
video_path = TEMP_DIR / f"{task_id}_video.mp4" |
|
audio_path = TEMP_DIR / f"{task_id}_audio.m4a" |
|
|
|
video_dl_task = asyncio.to_thread(run_ytdlp_download, video_stream_url, video_path) |
|
audio_dl_task = asyncio.to_thread(run_ytdlp_download, audio_stream_url, audio_path) |
|
await asyncio.gather(video_dl_task, audio_dl_task) |
|
|
|
|
|
task_statuses[task_id]["message"] = "Merging files with FFmpeg..." |
|
final_output_path = final_output_dir / "video.mp4" |
|
subprocess.run( |
|
['ffmpeg', '-i', str(video_path), '-i', str(audio_path), '-c', 'copy', str(final_output_path)], |
|
check=True, capture_output=True, text=True |
|
) |
|
|
|
|
|
download_url = f"{base_url_for_links.rstrip('/')}/static/{task_id}/video.mp4" |
|
task_statuses[task_id] = { |
|
"status": "complete", |
|
"download_url": download_url, |
|
"expires_in": f"{FILE_LIFETIME_SECONDS} seconds" |
|
} |
|
background_tasks.add_task(cleanup_file, final_output_path) |
|
|
|
except Exception as e: |
|
logging.error(f"Task {task_id} failed: {e}") |
|
task_statuses[task_id] = {"status": "failed", "error": str(e)} |
|
finally: |
|
if 'video_path' in locals() and video_path.exists(): video_path.unlink() |
|
if 'audio_path' in locals() and audio_path.exists(): audio_path.unlink() |
|
|
|
|
|
|
|
@app.post("/api/process", status_code=status.HTTP_202_ACCEPTED) |
|
async def start_processing_job(request: Request, background_tasks: BackgroundTasks): |
|
""" |
|
Accepts a job and starts it in the background. Returns a task ID immediately. |
|
""" |
|
body = await request.json() |
|
video_url = body.get("url") |
|
quality = int(body.get("quality", "1080")) |
|
|
|
if not video_url: |
|
raise HTTPException(status_code=400, detail="A 'url' is required.") |
|
|
|
task_id = str(uuid.uuid4()) |
|
task_statuses[task_id] = {"status": "queued"} |
|
|
|
|
|
base_url_for_links = str(request.base_url) |
|
|
|
background_tasks.add_task( |
|
process_in_background, task_id, video_url, quality, base_url_for_links, background_tasks |
|
) |
|
|
|
status_url = request.url_for('get_job_status', task_id=task_id) |
|
return {"task_id": task_id, "status_url": str(status_url)} |
|
|
|
|
|
@app.get("/api/status/{task_id}") |
|
async def get_job_status(task_id: str): |
|
""" |
|
Allows the client to poll for the status of a background job. |
|
""" |
|
status_info = task_statuses.get(task_id) |
|
if not status_info: |
|
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Task not found") |
|
return status_info |