testing-y / main.py
tecuts's picture
Upload 4 files
3394e4a verified
import os
import uuid
import logging
from pathlib import Path
from typing import Optional, Literal, Union, Dict, Any # Import Union
from urllib.parse import urlparse
import requests
import cloudscraper
import re
# --- FastAPI Imports ---
from fastapi import FastAPI, Request, HTTPException, BackgroundTasks, Body
from fastapi.responses import JSONResponse, FileResponse
from fastapi.staticfiles import StaticFiles
from pydantic import BaseModel, HttpUrl, Field, field_validator # Import field_validator
# --- yt-dlp Import ---
from yt_dlp import YoutubeDL
# --- Logging Configuration ---
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# --- Constants ---
DOWNLOAD_DIR = Path('downloads') # Use pathlib for paths
COOKIE_FILE = 'www.youtube.com_cookies.txt' # Define cookie file path
# --- Create Download Directory ---
DOWNLOAD_DIR.mkdir(parents=True, exist_ok=True)
# --- FastAPI App Initialization ---
app = FastAPI(
title="tesings",
description="API to fetch info",
version="1.4.0", # Incremented version
)
# --- Mount Static Files Directory ---
app.mount("/downloads", StaticFiles(directory=DOWNLOAD_DIR), name="downloads")
# --- Pydantic Models for Request/Response Validation ---
class UrlRequest(BaseModel):
"""Request model for endpoints needing just a URL."""
url: HttpUrl
# Define allowed quality string literals (including numerical ones)
AllowedQualityStr = Literal['best', '240', '480', '720', '1080', '1440', '2160']
class MaxDownloadRequest(BaseModel):
"""Request model for the /max endpoint."""
url: HttpUrl
# Accept 'best' or specific numerical resolutions as strings
quality: Optional[AllowedQualityStr] = 'best'
class InfoResponse(BaseModel):
"""Response model for the /get-info endpoint."""
title: Optional[str] = None
thumbnail: Optional[str] = None
duration: Optional[float] = None
channel: Optional[str] = None
class DownloadResponse(BaseModel):
"""Response model for download endpoints."""
url: str
filename: str
message: Optional[str] = None
class ErrorResponse(BaseModel):
"""Standard error response model."""
detail: str
# --- Helper Function for Download ---
def perform_download(ydl_opts: dict, url: str, file_path: Path):
"""Synchronously downloads using yt-dlp."""
try:
logger.info(f"Starting download for URL: {url} with options: {ydl_opts}")
ydl_opts['outtmpl'] = str(file_path.with_suffix('.%(ext)s'))
with YoutubeDL(ydl_opts) as ydl:
ydl.extract_info(url, download=True)
logger.info(f"Download finished successfully for URL: {url}")
downloaded_files = list(DOWNLOAD_DIR.glob(f"{file_path.stem}.*"))
if not downloaded_files:
logger.error(f"Download completed but no file found for stem: {file_path.stem}")
part_files = list(DOWNLOAD_DIR.glob(f"{file_path.stem}.*.part"))
for part_file in part_files:
try:
os.remove(part_file)
logger.info(f"Removed leftover part file: {part_file}")
except OSError as rm_err:
logger.error(f"Error removing part file {part_file}: {rm_err}")
raise RuntimeError(f"Could not find downloaded file for {url}")
return downloaded_files[0]
except Exception as e:
logger.error(f"yt-dlp download failed for URL {url}: {e}", exc_info=True)
possible_files = list(DOWNLOAD_DIR.glob(f"{file_path.stem}.*"))
for f in possible_files:
if f.is_file():
try:
os.remove(f)
logger.info(f"Removed potentially incomplete/failed file: {f}")
except OSError as rm_err:
logger.error(f"Error removing file {f}: {rm_err}")
raise
# --- API Endpoints ---
@app.get("/")
async def root():
"""Root endpoint providing basic API info."""
return {"message": "Running in errors."}
@app.post(
"/get-info",
response_model=InfoResponse,
responses={500: {"model": ErrorResponse}}
)
async def get_info(payload: UrlRequest = Body(...)):
"""
Extracts video information (title, thumbnail, duration, channel) from a given URL.
"""
logger.info(f"Received /get-info request for URL: {payload.url}")
ydl_opts = {}
if os.path.exists(COOKIE_FILE):
ydl_opts['cookiefile'] = COOKIE_FILE
logger.info("Using cookie file.")
else:
logger.warning(f"Cookie file '{COOKIE_FILE}' not found. Some videos might require login/cookies.")
try:
# Use str(payload.url) to pass the URL string to yt-dlp
with YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(str(payload.url), download=False)
return InfoResponse(
title=info.get('title'),
thumbnail=info.get('thumbnail'),
duration=info.get('duration'),
channel=info.get('channel')
)
except Exception as e:
logger.error(f"Error fetching info for {payload.url}: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=f"Failed to extract video info: {str(e)}")
'''@app.post(
"/download",
response_model=DownloadResponse,
responses={400: {"model": ErrorResponse}, 500: {"model": ErrorResponse}}
)
async def download_audio(request: Request, payload: UrlRequest = Body(...)):
"""
Downloads the audio track of a video as an MP3 file (128kbps).
"""
logger.info(f"Received /download (audio) request for URL: {payload.url}")
unique_id = str(uuid.uuid4())
file_path_stem = DOWNLOAD_DIR / unique_id
ydl_opts = {
'format': '140/m4a/bestaudio/best',
'outtmpl': str(file_path_stem.with_suffix('.%(ext)s')),
'postprocessors': [{
'key': 'FFmpegExtractAudio',
'preferredcodec': 'mp3',
'preferredquality': '128',
}],
'noplaylist': True,
'quiet': False,
'progress_hooks': [lambda d: logger.debug(f"Download progress: {d['status']} - {d.get('_percent_str', '')}")],
}
if os.path.exists(COOKIE_FILE):
ydl_opts['cookiefile'] = COOKIE_FILE
logger.info("Using cookie file for audio download.")
else:
logger.warning(f"Cookie file '{COOKIE_FILE}' not found for audio download.")
try:
# Use str(payload.url) to pass the URL string to the helper
final_file_path = perform_download(ydl_opts, str(payload.url), file_path_stem)
final_filename = final_file_path.name
download_url = f"{str(request.base_url).rstrip('/')}/downloads/{final_filename}"
logger.info(f"Audio download complete for {payload.url}. URL: {download_url}")
return DownloadResponse(url=download_url, filename=final_filename)
except Exception as e:
# Error logged in perform_download
raise HTTPException(status_code=500, detail=f"Audio download failed: {str(e)}")
'''
yt_api = os.getenv("yt_api")
class ApiRotator:
def __init__(self, apis):
self.apis = apis
self.last_successful_index = None
def get_prioritized_apis(self):
if self.last_successful_index is not None:
# Move the last successful API to the front
rotated_apis = (
[self.apis[self.last_successful_index]] +
self.apis[:self.last_successful_index] +
self.apis[self.last_successful_index+1:]
)
return rotated_apis
return self.apis
def update_last_successful(self, index):
self.last_successful_index = index
# In your function:
api_rotator = ApiRotator([
yt_api,
"https://dwnld.nichind.dev",
"https://chrunos-load.hf.space",
"https://cobalt-api.kwiatekmiki.com"
])
async def get_track_download_url(video_url: str, quality: str) -> str:
apis = api_rotator.get_prioritized_apis()
session = cloudscraper.create_scraper() # Requires cloudscraper package
headers = {
"Accept": "application/json",
"Content-Type": "application/json",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
}
quality_lower = quality.lower()
if quality_lower == "max":
body_json = {"url": video_url, "videoQuality": "max", "filenameStyle": "pretty", "youtubeVideoCodec": "vp9"}
else:
body_json = {"url": video_url, "videoQuality": quality, "filenameStyle": "pretty", "youtubeVideoCodec": "h264"}
for i, api_url in enumerate(apis):
try:
logger.info(f"Attempting to get download URL from: {api_url}")
y_url = video_url
response = session.post(
api_url,
timeout=20,
json=body_json,
headers=headers
)
logger.info(f"Response status: {response.status_code}")
logger.info(f"Response content: {response.content}")
if response.headers.get('content-type', '').startswith('application/json'):
json_response = response.json()
error_code = json_response.get("error", {}).get("code", "")
if error_code == "error.api.content.video.age":
logger.warning(f"Video unavailable error from {api_url}")
break # Only break for specific error
if "url" in json_response or "picker" in json_response:
api_rotator.update_last_successful(i)
return json_response
except Exception as e:
logger.error(f"Failed with {api_url}: {str(e)}")
continue
logger.error(f"No download URL found")
return {"error": "Download URL not found"}
reads_api = os.getenv("reads_api")
def call_extract_endpoint(post_url: str) -> Dict[str, Any]:
try:
response = requests.get(f"{reads_api}/extract", params={"url": post_url})
response.raise_for_status()
return response.json()
except requests.RequestException as e:
raise requests.RequestException(f"Failed to call extract endpoint: {str(e)}")
except ValueError as e:
raise ValueError(f"Invalid response format: {str(e)}")
def is_threads_url(url: str) -> bool:
"""Validate if URL is a valid Threads URL"""
try:
parsed = urlparse(url)
logger.info(parsed)
# Check if netloc matches known Threads domains
if parsed.netloc not in ['threads.net', 'www.threads.net', 'threads.com', 'www.threads.com']:
return False
# Check if path contains /post/ or /t/ after a username
if '/post/' in parsed.path or '/t/' in parsed.path:
return True
return False
except Exception:
return False
@app.post("/multi")
async def multi_download(request: Request):
data = await request.json()
video_url = data.get('url')
quality = data.get('videoQuality')
logger.info(f'{video_url}, {quality}')
if not video_url:
raise HTTPException(
status_code=400,
detail={"error": "Input 'url' is required."}
)
# Basic URL validation: checks if it starts with http/https and has some content after.
# For more robust validation, consider using a library like 'validators'.
if not re.match(r'^https?://\S+', str(video_url)): # Ensure video_url is treated as string for regex
raise HTTPException(
status_code=400,
detail={"error": f"Input 'url' ('{video_url}') is not a valid URL. It must start with http:// or https://."}
)
if not quality: # Checks for None or empty string
raise HTTPException(
status_code=400,
detail={"error": "This version of shortcut is outdated. Get the latest version of Chrunos Multi Downloader shortcut."}
)
if quality == "mp3":
parameter = 'type=audio'
else:
parameter = f'type=video&quality={quality}'
if is_threads_url(video_url):
return call_extract_endpoint(video_url)
else:
dl_url = await get_track_download_url(video_url, quality)
if dl_url:
return dl_url
else:
return {
"error": "Failed to Fetch the video."
}
@app.post(
"/max",
response_model=DownloadResponse,
responses={400: {"model": ErrorResponse}, 500: {"model": ErrorResponse}}
)
async def download_video_max_quality(request: Request, payload: MaxDownloadRequest = Body(...)):
"""
Downloads the video in the specified quality or 'best' available, handling
both landscape and portrait videos correctly. Attempts H.264 codec for 1080
and lower. Merges video and audio into MP4.
Accepted qualities: 'best', '240', '480', '720', '1080', '1440', '2160'.
Quality number (as string) refers to the maximum dimension (height or width).
"""
logger.info(f"Received /max (video) request for URL: {payload.url} with quality: {payload.quality}")
unique_id = str(uuid.uuid4())
file_path_stem = DOWNLOAD_DIR / unique_id
# --- Determine yt-dlp Format Selector based on Quality and Codec Preference ---
quality_str = payload.quality # Quality is now guaranteed to be a string from AllowedQualityStr
format_selector = None
max_dim = 0 # Initialize max_dim
if quality_str == 'best':
format_selector = 'bestvideo+bestaudio/best' # Best video and audio, merged if possible
logger.info("Using format selector for 'best' quality.")
else:
# Quality is a numerical string ('240', '480', etc.)
try:
# Convert the validated string quality to an integer for logic
max_dim = int(quality_str)
except ValueError:
# This should not happen if Pydantic validation works, but good practice
logger.error(f"Internal error: Could not convert validated quality string '{quality_str}' to int. Falling back to 'best'.")
format_selector = 'bestvideo+bestaudio/best'
# Set max_dim to a high value to skip specific logic below if format_selector is set
max_dim = 99999
# Only proceed if format_selector wasn't set in the except block
long_edge = int(max_dim * 1.8)
if not format_selector:
# --- Codec Preference Logic ---
if max_dim <= 1080:
# Prefer H.264 (avc1) for 1080 or lower max dimension
logger.info(f"Attempting H.264 codec for requested quality (max dimension): {max_dim}")
format_selector = f'bestvideo[vcodec^=avc][height<={long_edge}][width<={long_edge}]+bestaudio/best'
#f'bestvideo[height<={long_edge}]/bestvideo[width<={long_edge}]+bestaudio/'
#f'best[height<={long_edge}]/best[width<={long_edge}]'
else:
# For > 1080 max dimension, prioritize best available codec
logger.info(f"Attempting best available codec for requested quality (max dimension): {max_dim}")
format_selector = f'bestvideo[height<={long_edge}][width<={long_edge}]+bestaudio/best'
logger.info(f"Using format selector: '{format_selector}'")
# --- yt-dlp Options for Video Download ---
ydl_opts = {
'format': format_selector,
'outtmpl': str(file_path_stem.with_suffix('.%(ext)s')),
'merge_output_format': 'mp4', # Merge into MP4 container
'noplaylist': True,
'quiet': True,
'noprogress': True
}
if os.path.exists(COOKIE_FILE):
ydl_opts['cookiefile'] = COOKIE_FILE
logger.info("Using cookie file for video download.")
else:
logger.warning(f"Cookie file '{COOKIE_FILE}' not found for video download.")
try:
# Use str(payload.url) to pass the URL string to the helper
final_file_path = perform_download(ydl_opts, str(payload.url), file_path_stem)
final_filename = final_file_path.name
download_url = f"{str(request.base_url).rstrip('/')}/downloads/{final_filename}"
logger.info(f"Video download complete for {payload.url}. URL: {download_url}")
# Changed 'download_url=' to 'url='
return DownloadResponse(url=download_url, filename=final_filename)
except Exception as e:
# Error logged in perform_download
raise HTTPException(status_code=500, detail=f"Video download failed: {str(e)}")
# --- Optional: Cleanup Task ---
async def cleanup_old_files(directory: Path, max_age_seconds: int):
"""Removes files older than max_age_seconds in the background."""
import time
now = time.time()
count = 0
try:
for item in directory.iterdir():
if item.is_file():
try:
if now - item.stat().st_mtime > max_age_seconds:
os.remove(item)
logger.info(f"Cleaned up old file: {item.name}")
count += 1
except OSError as e:
logger.error(f"Error removing file {item}: {e}")
if count > 0:
logger.info(f"Background cleanup finished. Removed {count} old files.")
else:
logger.info("Background cleanup finished. No old files found.")
except Exception as e:
logger.error(f"Error during background file cleanup: {e}", exc_info=True)
@app.post("/trigger-cleanup")
async def trigger_cleanup(background_tasks: BackgroundTasks):
"""Manually trigger a cleanup of files older than 1 day."""
logger.info("Triggering background cleanup of old download files.")
background_tasks.add_task(cleanup_old_files, DOWNLOAD_DIR, 86400) # 1 day
return {"message": "Background cleanup task scheduled."}