from fastapi import FastAPI, HTTPException, Request, Query, Response from fastapi.responses import StreamingResponse, HTMLResponse from fastapi.templating import Jinja2Templates from pytubefix import YouTube from pytubefix.cli import on_progress import os import logging import httpx import hashlib from functools import lru_cache from encrypt import encrypt_video_id, decrypt_video_id app = FastAPI() CHUNK_SIZE = 1024 * 1024 # 1MB logger = logging.getLogger(__name__) def open_file_range(file_path: str, start: int, end: int): with open(file_path, "rb") as f: f.seek(start) bytes_to_read = end - start + 1 while bytes_to_read > 0: chunk = f.read(min(CHUNK_SIZE, bytes_to_read)) if not chunk: break bytes_to_read -= len(chunk) yield chunk def generate_etag(file_path): hasher = hashlib.md5() with open(file_path, 'rb') as f: while chunk := f.read(8192): hasher.update(chunk) return hasher.hexdigest() @lru_cache(maxsize=128) def get_video_metadata(video_id: str): yt = YouTube(f"https://www.youtube.com/watch?v={video_id}", client='WEB_EMBED') if yt.length >= 600: return { "title": yt.title, "description": yt.description, "author": yt.author, "duration": yt.length, "views": yt.views, "date": yt.publish_date, "video_url": yt.streams.get_highest_resolution().url, "audio_url": yt.streams.get_audio_only().url, } else: return { "title": yt.title, "description": yt.description, "author": yt.author, "duration": yt.length, "views": yt.views, "date": yt.publish_date, } @app.get("/api/video/{video_id}") def get_video_info(video_id: str, request: Request): try: metadata = get_video_metadata(video_id) encrypted_video_id = encrypt_video_id(video_id) BASE_URL = request.base_url if metadata['duration'] >= 600: return {**metadata} else: return { **metadata, "video_url": f"{BASE_URL}video/{encrypted_video_id}", "audio_url": f"{BASE_URL}audio/{encrypted_video_id}" } except Exception as e: raise HTTPException(status_code=500, detail=f"Error: {str(e)}") @app.get("/video/{video_id}") async def stream_video(video_id: str, request: Request, download: bool = Query(False)): try: decrypted_video_id = decrypt_video_id(video_id) yt = YouTube(f"https://www.youtube.com/watch?v={decrypted_video_id}") stream = yt.streams.get_highest_resolution() url = stream.url headers = {} if range_header := request.headers.get("range"): headers["Range"] = range_header async def proxy_stream(): try: async with httpx.AsyncClient() as client: async with client.stream("GET", url, headers=headers, timeout=60) as response: if response.status_code not in (200, 206): logger.error(f"Failed to stream: {response.status_code}") return async for chunk in response.aiter_bytes(CHUNK_SIZE): yield chunk except Exception as e: logger.error(f"Streaming error: {str(e)}") return response_headers = { "Accept-Ranges": "bytes", "Cache-Control": "public, max-age=3600" } # Handle filename safely title = yt.title.encode("utf-8", "ignore").decode("utf-8") if download: response_headers["Content-Disposition"] = f'attachment; filename="{title}.mp4"' else: response_headers["Content-Disposition"] = f'inline; filename="{title}.mp4"' return StreamingResponse( proxy_stream(), media_type="video/mp4", headers=response_headers ) except Exception as e: raise HTTPException(status_code=500, detail=f"Could not fetch video URL: {str(e)}") @app.get("/audio/{video_id}") async def stream_audio(video_id: str, request: Request, download: bool = Query(False)): try: decrypted_video_id = decrypt_video_id(video_id) yt = YouTube(f"https://www.youtube.com/watch?v={decrypted_video_id}") stream = yt.streams.get_audio_only() url = stream.url headers = { "User-Agent": request.headers.get("user-agent", "Mozilla/5.0"), } if range_header := request.headers.get("range"): headers["Range"] = range_header async def proxy_stream(): async with httpx.AsyncClient(follow_redirects=True) as client: async with client.stream("GET", url, headers=headers) as response: if response.status_code not in (200, 206): raise HTTPException(status_code=502, detail="Source stream error") async for chunk in response.aiter_bytes(CHUNK_SIZE): yield chunk response_headers = { "Accept-Ranges": "bytes", "Cache-Control": "public, max-age=3600" } # Handle filename safely title = yt.title.encode("utf-8", "ignore").decode("utf-8") if download: response_headers["Content-Disposition"] = f'attachment; filename="{title}.mp3"' else: response_headers["Content-Disposition"] = f'inline; filename="{title}.mp3"' return StreamingResponse( proxy_stream(), media_type=stream.mime_type or "audio/mp4", headers=response_headers ) except Exception as e: logger.error(f"Streaming error: {e}") raise HTTPException(status_code=500, detail=f"Error: {str(e)}")