File size: 5,789 Bytes
c09c0d6 5e23e4b c09c0d6 5e23e4b c09c0d6 5e23e4b c09c0d6 f630a4e c09c0d6 5e23e4b c09c0d6 5e23e4b c09c0d6 5e23e4b c09c0d6 5e23e4b c09c0d6 5e23e4b c09c0d6 5e23e4b c09c0d6 5e23e4b c09c0d6 5e23e4b c09c0d6 5e23e4b c09c0d6 5e23e4b c09c0d6 5e23e4b c09c0d6 5e23e4b c09c0d6 5e23e4b c09c0d6 5e23e4b c09c0d6 5e23e4b c09c0d6 5e23e4b c09c0d6 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 |
import os
import re
import threading
import uuid
import requests
import logging
from flask import Flask, request, jsonify
from urllib.parse import quote
# --- Logging Configuration ---
# This will print logs to the console, which you can view in your Space's "Logs" tab.
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# --- Flask App Initialization ---
app = Flask(__name__)
# --- Secret Management ---
# Get the base domain of your downloader service from Hugging Face secrets.
YTDLP_DOMAIN = os.environ.get("YTDLP_DOMAIN")
# --- In-memory Task Storage ---
tasks = {}
tasks_lock = threading.Lock()
def get_format_code(quality: str) -> str:
"""Maps user-friendly quality names to yt-dlp format codes."""
quality_map = {
"480": "135+140",
"720": "136+140",
"1080": "137+140",
"best": "bestvideo+bestaudio",
}
return quality_map.get(quality, "best")
def process_video_task(task_id: str, video_url: str, quality: str):
"""
This function runs in a background thread. It calls the external API,
waits for the stream to complete, then parses the full response to find the
download URL and updates the task status.
"""
app.logger.info(f"Task {task_id}: Starting for URL '{video_url}' with quality '{quality}'.")
if not YTDLP_DOMAIN:
app.logger.error(f"Task {task_id}: Failed. YTDLP_DOMAIN secret is not set in the Space settings.")
with tasks_lock:
tasks[task_id]['status'] = 'error'
tasks[task_id]['message'] = "Server configuration error: YTDLP_DOMAIN secret is not set."
return
try:
format_code = get_format_code(quality)
command = f"{video_url} -f {format_code}"
encoded_command = quote(command)
stream_api_url = f"{YTDLP_DOMAIN}/stream?command={encoded_command}"
app.logger.info(f"Task {task_id}: Calling stream API: {stream_api_url}")
# Collect the full response from the stream
full_response_lines = []
with requests.get(stream_api_url, stream=True, timeout=300) as response:
response.raise_for_status()
for line_bytes in response.iter_lines():
if line_bytes:
line = line_bytes.decode('utf-8')
full_response_lines.append(line)
app.logger.info(f"Task {task_id}: Stream completed. Searching for download link in the response.")
# Now, parse the complete response to find the link
found_url = None
for line in full_response_lines:
match = re.search(r'href="(/download/download/[^"]+)"', line)
if match:
relative_url = match.group(1)
final_stream_url = f"{YTDLP_DOMAIN}{relative_url}"
found_url = final_stream_url
app.logger.info(f"Task {task_id}: Found download link: {final_stream_url}")
break # Found the link, no need to keep searching
# After checking the whole response, update the task status
with tasks_lock:
if found_url:
tasks[task_id]['status'] = 'completed'
tasks[task_id]['result'] = found_url
app.logger.info(f"Task {task_id}: Marked as COMPLETED.")
else:
tasks[task_id]['status'] = 'error'
tasks[task_id]['message'] = "Command executed, but no download link was found in the API response."
app.logger.warning(f"Task {task_id}: Marked as ERROR, no download link found.")
# Log the last few lines for debugging, in case of an error message from the service
app.logger.info(f"Task {task_id}: Last 5 lines of response: {full_response_lines[-5:]}")
except requests.exceptions.RequestException as e:
app.logger.error(f"Task {task_id}: A request exception occurred. Error: {e}")
with tasks_lock:
tasks[task_id]['status'] = 'error'
tasks[task_id]['message'] = str(e)
@app.route('/process', methods=['POST'])
def start_processing():
"""Endpoint to start a new video processing task."""
data = request.get_json()
if not data or 'video_url' not in data or 'quality' not in data:
return jsonify({"error": "Missing 'video_url' or 'quality' in request body"}), 400
task_id = str(uuid.uuid4())
app.logger.info(f"Received new job. Assigning Task ID: {task_id}")
video_url = data['video_url']
quality = data.get('quality', 'best')
with tasks_lock:
tasks[task_id] = {'status': 'processing'}
thread = threading.Thread(target=process_video_task, args=(task_id, video_url, quality))
thread.daemon = True
thread.start()
return jsonify({'task_id': task_id}), 202
@app.route('/api/<string:task_id>', methods=['GET'])
def get_task_status(task_id: str):
"""Endpoint to check the status and result of a task."""
app.logger.info(f"Status check for Task ID: {task_id}")
with tasks_lock:
task = tasks.get(task_id)
if task is None:
app.logger.warning(f"Status check for non-existent Task ID: {task_id}")
return jsonify({'error': 'Task not found'}), 404
return jsonify(task)
@app.route('/')
def index():
"""A simple index route to show the API is running."""
if not YTDLP_DOMAIN:
return "<h1>Video Processing API Error</h1><p>Server is missing the YTDLP_DOMAIN secret.</p>", 500
return "<h1>Video Processing API is running!</h1><p>Use /process and /api/<task_id> endpoints.</p>"
# This part is for local testing. Gunicorn runs the app in your Space.
if __name__ == '__main__':
app.run(debug=True, host='0.0.0.0', port=int(os.environ.get("PORT", 7860))) |