File size: 5,962 Bytes
9675676 27063a7 9675676 8b6bffc 9675676 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 |
import os
import tempfile
import urllib.parse
from datetime import datetime
from pathlib import Path
import logging
from fastapi import FastAPI, Request
from fastapi.concurrency import run_in_threadpool
from fastapi.staticfiles import StaticFiles
import yt_dlp
import cloudscraper
from dotenv import load_dotenv
# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
tmp_dir = tempfile.gettempdir()
BASE_URL = "https://chrunos-kai.hf.space"
load_dotenv()
app = FastAPI()
# Define a global temporary download directory
global_download_dir = tempfile.mkdtemp()
EXTRACT_API = os.getenv("EXTRACT_API")
ALT_API = os.getenv("ALT_API")
def extract_video_info(video_url: str) -> dict:
"""Extract video information from the provided URL using fallback APIs."""
api_urls = [f'{ALT_API}?url={video_url}', f'{EXTRACT_API}?url={video_url}']
for api_url in api_urls:
logger.info(f"Trying API: {api_url}")
session = cloudscraper.create_scraper()
try:
response = session.get(api_url, timeout=20)
if response.status_code == 200:
json_response = response.json()
result = []
if 'formats' in json_response:
for format_item in json_response['formats']:
format_url = format_item.get('url')
format_id = format_item.get('format_id')
p_cookies = format_item.get('cookies')
if format_id and format_url:
result.append({
"url": format_url,
"format_id": format_id,
"cookies": p_cookies
})
title = json_response.get('title')
logger.info(f"Video title: {title}")
if "ornhub.com" in video_url:
p_result = [item for item in result if 'hls' in item['format_id']]
return p_result
else:
if len(result) == 1:
new_item = {
"format_id": "This is Fake, Don't Choose This One",
"url": "none"
}
result.append(new_item)
return result
else:
if 'url' in json_response:
d_url = json_response.get('url')
t_url = json_response.get('thumbnail')
result.append({
"url": d_url,
"format_id": "video"
})
result.append({
"url": t_url,
"format_id": "thumbnail"
})
return result
else:
return {"error": "No formats available"}
else:
logger.warning(f"Request failed with status code {response.status_code}, API: {api_url}")
except Exception as e:
logger.error(f"An error occurred with API {api_url}: {e}")
return {"error": "Both APIs failed to provide valid results."}
@app.post("/pripper")
async def test_download(request: Request):
"""Test endpoint to extract video information."""
try:
data = await request.json()
video_url = data.get('url')
if not video_url:
return {"error": "URL parameter is required"}
response = extract_video_info(video_url)
return response
except Exception as e:
logger.error(f"Error in test_download: {e}")
return {"error": f"Failed to process request: {str(e)}"}
@app.post("/hls")
async def download_hls_video(request: Request):
"""Download HLS video and return download URL."""
try:
data = await request.json()
hls_url = data.get('url')
if not hls_url:
return {"error": "URL parameter is required"}
timestamp = datetime.now().strftime('%Y%m%d%H%M%S')
output_template = str(Path(global_download_dir) / f'%(title)s_{timestamp}.%(ext)s')
ydl_opts = {
'format': 'best',
'outtmpl': output_template,
'quiet': True,
'no_warnings': True,
'noprogress': True,
'merge_output_format': 'mp4'
}
try:
await run_in_threadpool(lambda: yt_dlp.YoutubeDL(ydl_opts).download([hls_url]))
except Exception as e:
logger.error(f"yt-dlp download failed: {e}")
return {"error": f"Download failed: {str(e)}"}
downloaded_files = list(Path(global_download_dir).glob(f"*_{timestamp}.mp4"))
if not downloaded_files:
return {"error": "Download failed - no files found"}
downloaded_file = downloaded_files[0]
encoded_filename = urllib.parse.quote(downloaded_file.name)
download_url = f"{BASE_URL}/file/{encoded_filename}"
logger.info(f"Download successful: {download_url}")
return {"url": download_url}
except Exception as e:
logger.error(f"Error in download_hls_video: {e}")
return {"error": f"Failed to process request: {str(e)}"}
@app.get("/health")
async def health_check():
"""Health check endpoint."""
return {"status": "healthy", "message": "Service is running"}
# Mount the static files directory
app.mount("/file", StaticFiles(directory=global_download_dir), name="downloads")
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000) |