|
import os |
|
import uuid |
|
import logging |
|
from pathlib import Path |
|
from typing import Optional, Literal, Union, Dict, Any |
|
from urllib.parse import urlparse |
|
import requests |
|
import cloudscraper |
|
import re |
|
|
|
from fastapi import FastAPI, Request, HTTPException, BackgroundTasks, Body |
|
from fastapi.responses import JSONResponse, FileResponse |
|
from fastapi.staticfiles import StaticFiles |
|
from pydantic import BaseModel, HttpUrl, Field, field_validator |
|
|
|
|
|
from yt_dlp import YoutubeDL |
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
DOWNLOAD_DIR = Path('downloads') |
|
COOKIE_FILE = 'www.youtube.com_cookies.txt' |
|
|
|
|
|
DOWNLOAD_DIR.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
app = FastAPI( |
|
title="tesings", |
|
description="API to fetch info", |
|
version="1.4.0", |
|
) |
|
|
|
|
|
app.mount("/downloads", StaticFiles(directory=DOWNLOAD_DIR), name="downloads") |
|
|
|
|
|
|
|
class UrlRequest(BaseModel): |
|
"""Request model for endpoints needing just a URL.""" |
|
url: HttpUrl |
|
|
|
|
|
AllowedQualityStr = Literal['best', '240', '480', '720', '1080', '1440', '2160'] |
|
|
|
class MaxDownloadRequest(BaseModel): |
|
"""Request model for the /max endpoint.""" |
|
url: HttpUrl |
|
|
|
quality: Optional[AllowedQualityStr] = 'best' |
|
|
|
class InfoResponse(BaseModel): |
|
"""Response model for the /get-info endpoint.""" |
|
title: Optional[str] = None |
|
thumbnail: Optional[str] = None |
|
duration: Optional[float] = None |
|
channel: Optional[str] = None |
|
|
|
class DownloadResponse(BaseModel): |
|
"""Response model for download endpoints.""" |
|
url: str |
|
filename: str |
|
message: Optional[str] = None |
|
|
|
class ErrorResponse(BaseModel): |
|
"""Standard error response model.""" |
|
detail: str |
|
|
|
|
|
def perform_download(ydl_opts: dict, url: str, file_path: Path): |
|
"""Synchronously downloads using yt-dlp.""" |
|
try: |
|
logger.info(f"Starting download for URL: {url} with options: {ydl_opts}") |
|
ydl_opts['outtmpl'] = str(file_path.with_suffix('.%(ext)s')) |
|
|
|
with YoutubeDL(ydl_opts) as ydl: |
|
ydl.extract_info(url, download=True) |
|
logger.info(f"Download finished successfully for URL: {url}") |
|
|
|
downloaded_files = list(DOWNLOAD_DIR.glob(f"{file_path.stem}.*")) |
|
if not downloaded_files: |
|
logger.error(f"Download completed but no file found for stem: {file_path.stem}") |
|
part_files = list(DOWNLOAD_DIR.glob(f"{file_path.stem}.*.part")) |
|
for part_file in part_files: |
|
try: |
|
os.remove(part_file) |
|
logger.info(f"Removed leftover part file: {part_file}") |
|
except OSError as rm_err: |
|
logger.error(f"Error removing part file {part_file}: {rm_err}") |
|
raise RuntimeError(f"Could not find downloaded file for {url}") |
|
return downloaded_files[0] |
|
|
|
except Exception as e: |
|
logger.error(f"yt-dlp download failed for URL {url}: {e}", exc_info=True) |
|
possible_files = list(DOWNLOAD_DIR.glob(f"{file_path.stem}.*")) |
|
for f in possible_files: |
|
if f.is_file(): |
|
try: |
|
os.remove(f) |
|
logger.info(f"Removed potentially incomplete/failed file: {f}") |
|
except OSError as rm_err: |
|
logger.error(f"Error removing file {f}: {rm_err}") |
|
raise |
|
|
|
|
|
|
|
@app.get("/") |
|
async def root(): |
|
"""Root endpoint providing basic API info.""" |
|
return {"message": "Running in errors."} |
|
@app.post( |
|
"/get-info", |
|
response_model=InfoResponse, |
|
responses={500: {"model": ErrorResponse}} |
|
) |
|
async def get_info(payload: UrlRequest = Body(...)): |
|
""" |
|
Extracts video information (title, thumbnail, duration, channel) from a given URL. |
|
""" |
|
logger.info(f"Received /get-info request for URL: {payload.url}") |
|
ydl_opts = {} |
|
if os.path.exists(COOKIE_FILE): |
|
ydl_opts['cookiefile'] = COOKIE_FILE |
|
logger.info("Using cookie file.") |
|
else: |
|
logger.warning(f"Cookie file '{COOKIE_FILE}' not found. Some videos might require login/cookies.") |
|
|
|
try: |
|
|
|
with YoutubeDL(ydl_opts) as ydl: |
|
info = ydl.extract_info(str(payload.url), download=False) |
|
return InfoResponse( |
|
title=info.get('title'), |
|
thumbnail=info.get('thumbnail'), |
|
duration=info.get('duration'), |
|
channel=info.get('channel') |
|
) |
|
except Exception as e: |
|
logger.error(f"Error fetching info for {payload.url}: {e}", exc_info=True) |
|
raise HTTPException(status_code=500, detail=f"Failed to extract video info: {str(e)}") |
|
|
|
'''@app.post( |
|
"/download", |
|
response_model=DownloadResponse, |
|
responses={400: {"model": ErrorResponse}, 500: {"model": ErrorResponse}} |
|
) |
|
async def download_audio(request: Request, payload: UrlRequest = Body(...)): |
|
""" |
|
Downloads the audio track of a video as an MP3 file (128kbps). |
|
""" |
|
logger.info(f"Received /download (audio) request for URL: {payload.url}") |
|
unique_id = str(uuid.uuid4()) |
|
file_path_stem = DOWNLOAD_DIR / unique_id |
|
|
|
ydl_opts = { |
|
'format': '140/m4a/bestaudio/best', |
|
'outtmpl': str(file_path_stem.with_suffix('.%(ext)s')), |
|
'postprocessors': [{ |
|
'key': 'FFmpegExtractAudio', |
|
'preferredcodec': 'mp3', |
|
'preferredquality': '128', |
|
}], |
|
'noplaylist': True, |
|
'quiet': False, |
|
'progress_hooks': [lambda d: logger.debug(f"Download progress: {d['status']} - {d.get('_percent_str', '')}")], |
|
} |
|
if os.path.exists(COOKIE_FILE): |
|
ydl_opts['cookiefile'] = COOKIE_FILE |
|
logger.info("Using cookie file for audio download.") |
|
else: |
|
logger.warning(f"Cookie file '{COOKIE_FILE}' not found for audio download.") |
|
|
|
try: |
|
# Use str(payload.url) to pass the URL string to the helper |
|
final_file_path = perform_download(ydl_opts, str(payload.url), file_path_stem) |
|
final_filename = final_file_path.name |
|
download_url = f"{str(request.base_url).rstrip('/')}/downloads/{final_filename}" |
|
logger.info(f"Audio download complete for {payload.url}. URL: {download_url}") |
|
return DownloadResponse(url=download_url, filename=final_filename) |
|
|
|
except Exception as e: |
|
# Error logged in perform_download |
|
raise HTTPException(status_code=500, detail=f"Audio download failed: {str(e)}") |
|
''' |
|
yt_api = os.getenv("yt_api") |
|
class ApiRotator: |
|
def __init__(self, apis): |
|
self.apis = apis |
|
self.last_successful_index = None |
|
|
|
def get_prioritized_apis(self): |
|
if self.last_successful_index is not None: |
|
|
|
rotated_apis = ( |
|
[self.apis[self.last_successful_index]] + |
|
self.apis[:self.last_successful_index] + |
|
self.apis[self.last_successful_index+1:] |
|
) |
|
return rotated_apis |
|
return self.apis |
|
|
|
def update_last_successful(self, index): |
|
self.last_successful_index = index |
|
|
|
|
|
api_rotator = ApiRotator([ |
|
yt_api, |
|
"https://dwnld.nichind.dev", |
|
"https://chrunos-load.hf.space", |
|
"https://cobalt-api.kwiatekmiki.com" |
|
]) |
|
|
|
async def get_track_download_url(video_url: str, quality: str) -> str: |
|
apis = api_rotator.get_prioritized_apis() |
|
session = cloudscraper.create_scraper() |
|
headers = { |
|
"Accept": "application/json", |
|
"Content-Type": "application/json", |
|
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36" |
|
} |
|
quality_lower = quality.lower() |
|
if quality_lower == "max": |
|
body_json = {"url": video_url, "videoQuality": "max", "filenameStyle": "pretty", "youtubeVideoCodec": "vp9"} |
|
else: |
|
body_json = {"url": video_url, "videoQuality": quality, "filenameStyle": "pretty", "youtubeVideoCodec": "h264"} |
|
|
|
for i, api_url in enumerate(apis): |
|
try: |
|
logger.info(f"Attempting to get download URL from: {api_url}") |
|
y_url = video_url |
|
response = session.post( |
|
api_url, |
|
timeout=20, |
|
json=body_json, |
|
headers=headers |
|
) |
|
logger.info(f"Response status: {response.status_code}") |
|
logger.info(f"Response content: {response.content}") |
|
|
|
if response.headers.get('content-type', '').startswith('application/json'): |
|
json_response = response.json() |
|
error_code = json_response.get("error", {}).get("code", "") |
|
|
|
if error_code == "error.api.content.video.age": |
|
logger.warning(f"Video unavailable error from {api_url}") |
|
break |
|
|
|
if "url" in json_response or "picker" in json_response: |
|
api_rotator.update_last_successful(i) |
|
return json_response |
|
|
|
except Exception as e: |
|
logger.error(f"Failed with {api_url}: {str(e)}") |
|
continue |
|
|
|
logger.error(f"No download URL found") |
|
return {"error": "Download URL not found"} |
|
|
|
reads_api = os.getenv("reads_api") |
|
def call_extract_endpoint(post_url: str) -> Dict[str, Any]: |
|
|
|
try: |
|
response = requests.get(f"{reads_api}/extract", params={"url": post_url}) |
|
response.raise_for_status() |
|
return response.json() |
|
except requests.RequestException as e: |
|
raise requests.RequestException(f"Failed to call extract endpoint: {str(e)}") |
|
except ValueError as e: |
|
raise ValueError(f"Invalid response format: {str(e)}") |
|
|
|
|
|
def is_threads_url(url: str) -> bool: |
|
"""Validate if URL is a valid Threads URL""" |
|
try: |
|
parsed = urlparse(url) |
|
logger.info(parsed) |
|
|
|
if parsed.netloc not in ['threads.net', 'www.threads.net', 'threads.com', 'www.threads.com']: |
|
return False |
|
|
|
if '/post/' in parsed.path or '/t/' in parsed.path: |
|
return True |
|
return False |
|
except Exception: |
|
return False |
|
|
|
@app.post("/multi") |
|
async def multi_download(request: Request): |
|
data = await request.json() |
|
video_url = data.get('url') |
|
quality = data.get('videoQuality') |
|
logger.info(f'{video_url}, {quality}') |
|
if not video_url: |
|
raise HTTPException( |
|
status_code=400, |
|
detail={"error": "Input 'url' is required."} |
|
) |
|
|
|
|
|
|
|
if not re.match(r'^https?://\S+', str(video_url)): |
|
raise HTTPException( |
|
status_code=400, |
|
detail={"error": f"Input 'url' ('{video_url}') is not a valid URL. It must start with http:// or https://."} |
|
) |
|
|
|
if not quality: |
|
raise HTTPException( |
|
status_code=400, |
|
detail={"error": "This version of shortcut is outdated. Get the latest version of Chrunos Multi Downloader shortcut."} |
|
) |
|
if quality == "mp3": |
|
parameter = 'type=audio' |
|
else: |
|
parameter = f'type=video&quality={quality}' |
|
|
|
if is_threads_url(video_url): |
|
return call_extract_endpoint(video_url) |
|
else: |
|
dl_url = await get_track_download_url(video_url, quality) |
|
if dl_url: |
|
|
|
return dl_url |
|
else: |
|
return { |
|
"error": "Failed to Fetch the video." |
|
} |
|
|
|
|
|
@app.post( |
|
"/max", |
|
response_model=DownloadResponse, |
|
responses={400: {"model": ErrorResponse}, 500: {"model": ErrorResponse}} |
|
) |
|
async def download_video_max_quality(request: Request, payload: MaxDownloadRequest = Body(...)): |
|
""" |
|
Downloads the video in the specified quality or 'best' available, handling |
|
both landscape and portrait videos correctly. Attempts H.264 codec for 1080 |
|
and lower. Merges video and audio into MP4. |
|
Accepted qualities: 'best', '240', '480', '720', '1080', '1440', '2160'. |
|
Quality number (as string) refers to the maximum dimension (height or width). |
|
""" |
|
logger.info(f"Received /max (video) request for URL: {payload.url} with quality: {payload.quality}") |
|
|
|
unique_id = str(uuid.uuid4()) |
|
file_path_stem = DOWNLOAD_DIR / unique_id |
|
|
|
|
|
quality_str = payload.quality |
|
format_selector = None |
|
max_dim = 0 |
|
|
|
if quality_str == 'best': |
|
format_selector = 'bestvideo+bestaudio/best' |
|
logger.info("Using format selector for 'best' quality.") |
|
else: |
|
|
|
try: |
|
|
|
max_dim = int(quality_str) |
|
except ValueError: |
|
|
|
logger.error(f"Internal error: Could not convert validated quality string '{quality_str}' to int. Falling back to 'best'.") |
|
format_selector = 'bestvideo+bestaudio/best' |
|
|
|
max_dim = 99999 |
|
|
|
|
|
long_edge = int(max_dim * 1.8) |
|
if not format_selector: |
|
|
|
if max_dim <= 1080: |
|
|
|
logger.info(f"Attempting H.264 codec for requested quality (max dimension): {max_dim}") |
|
format_selector = f'bestvideo[vcodec^=avc][height<={long_edge}][width<={long_edge}]+bestaudio/best' |
|
|
|
|
|
else: |
|
|
|
logger.info(f"Attempting best available codec for requested quality (max dimension): {max_dim}") |
|
format_selector = f'bestvideo[height<={long_edge}][width<={long_edge}]+bestaudio/best' |
|
|
|
logger.info(f"Using format selector: '{format_selector}'") |
|
|
|
|
|
ydl_opts = { |
|
'format': format_selector, |
|
'outtmpl': str(file_path_stem.with_suffix('.%(ext)s')), |
|
'merge_output_format': 'mp4', |
|
'noplaylist': True, |
|
'quiet': True, |
|
'noprogress': True |
|
} |
|
if os.path.exists(COOKIE_FILE): |
|
ydl_opts['cookiefile'] = COOKIE_FILE |
|
logger.info("Using cookie file for video download.") |
|
else: |
|
logger.warning(f"Cookie file '{COOKIE_FILE}' not found for video download.") |
|
|
|
try: |
|
|
|
final_file_path = perform_download(ydl_opts, str(payload.url), file_path_stem) |
|
final_filename = final_file_path.name |
|
download_url = f"{str(request.base_url).rstrip('/')}/downloads/{final_filename}" |
|
|
|
logger.info(f"Video download complete for {payload.url}. URL: {download_url}") |
|
|
|
return DownloadResponse(url=download_url, filename=final_filename) |
|
|
|
except Exception as e: |
|
|
|
raise HTTPException(status_code=500, detail=f"Video download failed: {str(e)}") |
|
|
|
|
|
async def cleanup_old_files(directory: Path, max_age_seconds: int): |
|
"""Removes files older than max_age_seconds in the background.""" |
|
import time |
|
now = time.time() |
|
count = 0 |
|
try: |
|
for item in directory.iterdir(): |
|
if item.is_file(): |
|
try: |
|
if now - item.stat().st_mtime > max_age_seconds: |
|
os.remove(item) |
|
logger.info(f"Cleaned up old file: {item.name}") |
|
count += 1 |
|
except OSError as e: |
|
logger.error(f"Error removing file {item}: {e}") |
|
if count > 0: |
|
logger.info(f"Background cleanup finished. Removed {count} old files.") |
|
else: |
|
logger.info("Background cleanup finished. No old files found.") |
|
except Exception as e: |
|
logger.error(f"Error during background file cleanup: {e}", exc_info=True) |
|
|
|
@app.post("/trigger-cleanup") |
|
async def trigger_cleanup(background_tasks: BackgroundTasks): |
|
"""Manually trigger a cleanup of files older than 1 day.""" |
|
logger.info("Triggering background cleanup of old download files.") |
|
background_tasks.add_task(cleanup_old_files, DOWNLOAD_DIR, 86400) |
|
return {"message": "Background cleanup task scheduled."} |