kai / app.py
Chrunos's picture
Update app.py
27063a7 verified
import os
import tempfile
import urllib.parse
from datetime import datetime
from pathlib import Path
import logging
from fastapi import FastAPI, Request
from fastapi.concurrency import run_in_threadpool
from fastapi.staticfiles import StaticFiles
import yt_dlp
import cloudscraper
from dotenv import load_dotenv
# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
tmp_dir = tempfile.gettempdir()
BASE_URL = "https://chrunos-kai.hf.space"
load_dotenv()
app = FastAPI()
# Define a global temporary download directory
global_download_dir = tempfile.mkdtemp()
EXTRACT_API = os.getenv("EXTRACT_API")
ALT_API = os.getenv("ALT_API")
def extract_video_info(video_url: str) -> dict:
"""Extract video information from the provided URL using fallback APIs."""
api_urls = [f'{ALT_API}?url={video_url}', f'{EXTRACT_API}?url={video_url}']
for api_url in api_urls:
logger.info(f"Trying API: {api_url}")
session = cloudscraper.create_scraper()
try:
response = session.get(api_url, timeout=20)
if response.status_code == 200:
json_response = response.json()
result = []
if 'formats' in json_response:
for format_item in json_response['formats']:
format_url = format_item.get('url')
format_id = format_item.get('format_id')
p_cookies = format_item.get('cookies')
if format_id and format_url:
result.append({
"url": format_url,
"format_id": format_id,
"cookies": p_cookies
})
title = json_response.get('title')
logger.info(f"Video title: {title}")
if "ornhub.com" in video_url:
p_result = [item for item in result if 'hls' in item['format_id']]
return p_result
else:
if len(result) == 1:
new_item = {
"format_id": "This is Fake, Don't Choose This One",
"url": "none"
}
result.append(new_item)
return result
else:
if 'url' in json_response:
d_url = json_response.get('url')
t_url = json_response.get('thumbnail')
result.append({
"url": d_url,
"format_id": "video"
})
result.append({
"url": t_url,
"format_id": "thumbnail"
})
return result
else:
return {"error": "No formats available"}
else:
logger.warning(f"Request failed with status code {response.status_code}, API: {api_url}")
except Exception as e:
logger.error(f"An error occurred with API {api_url}: {e}")
return {"error": "Both APIs failed to provide valid results."}
@app.post("/pripper")
async def test_download(request: Request):
"""Test endpoint to extract video information."""
try:
data = await request.json()
video_url = data.get('url')
if not video_url:
return {"error": "URL parameter is required"}
response = extract_video_info(video_url)
return response
except Exception as e:
logger.error(f"Error in test_download: {e}")
return {"error": f"Failed to process request: {str(e)}"}
@app.post("/hls")
async def download_hls_video(request: Request):
"""Download HLS video and return download URL."""
try:
data = await request.json()
hls_url = data.get('url')
if not hls_url:
return {"error": "URL parameter is required"}
timestamp = datetime.now().strftime('%Y%m%d%H%M%S')
output_template = str(Path(global_download_dir) / f'%(title)s_{timestamp}.%(ext)s')
ydl_opts = {
'format': 'best',
'outtmpl': output_template,
'quiet': True,
'no_warnings': True,
'noprogress': True,
'merge_output_format': 'mp4'
}
try:
await run_in_threadpool(lambda: yt_dlp.YoutubeDL(ydl_opts).download([hls_url]))
except Exception as e:
logger.error(f"yt-dlp download failed: {e}")
return {"error": f"Download failed: {str(e)}"}
downloaded_files = list(Path(global_download_dir).glob(f"*_{timestamp}.mp4"))
if not downloaded_files:
return {"error": "Download failed - no files found"}
downloaded_file = downloaded_files[0]
encoded_filename = urllib.parse.quote(downloaded_file.name)
download_url = f"{BASE_URL}/file/{encoded_filename}"
logger.info(f"Download successful: {download_url}")
return {"url": download_url}
except Exception as e:
logger.error(f"Error in download_hls_video: {e}")
return {"error": f"Failed to process request: {str(e)}"}
@app.get("/health")
async def health_check():
"""Health check endpoint."""
return {"status": "healthy", "message": "Service is running"}
# Mount the static files directory
app.mount("/file", StaticFiles(directory=global_download_dir), name="downloads")
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)