|
import os |
|
import re |
|
import threading |
|
import uuid |
|
import requests |
|
import logging |
|
from flask import Flask, request, jsonify |
|
from urllib.parse import quote |
|
|
|
|
|
|
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') |
|
|
|
|
|
app = Flask(__name__) |
|
|
|
|
|
|
|
YTDLP_DOMAIN = os.environ.get("YTDLP_DOMAIN") |
|
|
|
|
|
tasks = {} |
|
tasks_lock = threading.Lock() |
|
|
|
def get_format_code(quality: str) -> str: |
|
"""Maps user-friendly quality names to yt-dlp format codes.""" |
|
quality_map = { |
|
"480": "135+140", |
|
"720": "136+140", |
|
"1080": "137+140", |
|
"best": "bestvideo+bestaudio", |
|
} |
|
return quality_map.get(quality, "best") |
|
|
|
def process_video_task(task_id: str, video_url: str, quality: str): |
|
""" |
|
This function runs in a background thread. It calls the external API, |
|
waits for the stream to complete, then parses the full response to find the |
|
download URL and updates the task status. |
|
""" |
|
app.logger.info(f"Task {task_id}: Starting for URL '{video_url}' with quality '{quality}'.") |
|
|
|
if not YTDLP_DOMAIN: |
|
app.logger.error(f"Task {task_id}: Failed. YTDLP_DOMAIN secret is not set in the Space settings.") |
|
with tasks_lock: |
|
tasks[task_id]['status'] = 'error' |
|
tasks[task_id]['message'] = "Server configuration error: YTDLP_DOMAIN secret is not set." |
|
return |
|
|
|
try: |
|
format_code = get_format_code(quality) |
|
command = f"{video_url} -f {format_code}" |
|
encoded_command = quote(command) |
|
stream_api_url = f"{YTDLP_DOMAIN}/stream?command={encoded_command}" |
|
|
|
app.logger.info(f"Task {task_id}: Calling stream API: {stream_api_url}") |
|
|
|
|
|
full_response_lines = [] |
|
with requests.get(stream_api_url, stream=True, timeout=300) as response: |
|
response.raise_for_status() |
|
for line_bytes in response.iter_lines(): |
|
if line_bytes: |
|
line = line_bytes.decode('utf-8') |
|
full_response_lines.append(line) |
|
|
|
app.logger.info(f"Task {task_id}: Stream completed. Searching for download link in the response.") |
|
|
|
|
|
found_url = None |
|
for line in full_response_lines: |
|
match = re.search(r'href="(/download/download/[^"]+)"', line) |
|
if match: |
|
relative_url = match.group(1) |
|
final_stream_url = f"{YTDLP_DOMAIN}{relative_url}" |
|
found_url = final_stream_url |
|
app.logger.info(f"Task {task_id}: Found download link: {final_stream_url}") |
|
break |
|
|
|
|
|
with tasks_lock: |
|
if found_url: |
|
tasks[task_id]['status'] = 'completed' |
|
tasks[task_id]['result'] = found_url |
|
app.logger.info(f"Task {task_id}: Marked as COMPLETED.") |
|
else: |
|
tasks[task_id]['status'] = 'error' |
|
tasks[task_id]['message'] = "Command executed, but no download link was found in the API response." |
|
app.logger.warning(f"Task {task_id}: Marked as ERROR, no download link found.") |
|
|
|
app.logger.info(f"Task {task_id}: Last 5 lines of response: {full_response_lines[-5:]}") |
|
|
|
|
|
except requests.exceptions.RequestException as e: |
|
app.logger.error(f"Task {task_id}: A request exception occurred. Error: {e}") |
|
with tasks_lock: |
|
tasks[task_id]['status'] = 'error' |
|
tasks[task_id]['message'] = str(e) |
|
|
|
|
|
@app.route('/process', methods=['POST']) |
|
def start_processing(): |
|
"""Endpoint to start a new video processing task.""" |
|
data = request.get_json() |
|
if not data or 'video_url' not in data or 'quality' not in data: |
|
return jsonify({"error": "Missing 'video_url' or 'quality' in request body"}), 400 |
|
|
|
task_id = str(uuid.uuid4()) |
|
app.logger.info(f"Received new job. Assigning Task ID: {task_id}") |
|
|
|
video_url = data['video_url'] |
|
quality = data.get('quality', 'best') |
|
|
|
with tasks_lock: |
|
tasks[task_id] = {'status': 'processing'} |
|
|
|
thread = threading.Thread(target=process_video_task, args=(task_id, video_url, quality)) |
|
thread.daemon = True |
|
thread.start() |
|
|
|
return jsonify({'task_id': task_id}), 202 |
|
|
|
@app.route('/api/<string:task_id>', methods=['GET']) |
|
def get_task_status(task_id: str): |
|
"""Endpoint to check the status and result of a task.""" |
|
app.logger.info(f"Status check for Task ID: {task_id}") |
|
with tasks_lock: |
|
task = tasks.get(task_id) |
|
|
|
if task is None: |
|
app.logger.warning(f"Status check for non-existent Task ID: {task_id}") |
|
return jsonify({'error': 'Task not found'}), 404 |
|
|
|
return jsonify(task) |
|
|
|
@app.route('/') |
|
def index(): |
|
"""A simple index route to show the API is running.""" |
|
if not YTDLP_DOMAIN: |
|
return "<h1>Video Processing API Error</h1><p>Server is missing the YTDLP_DOMAIN secret.</p>", 500 |
|
return "<h1>Video Processing API is running!</h1><p>Use /process and /api/<task_id> endpoints.</p>" |
|
|
|
|
|
if __name__ == '__main__': |
|
app.run(debug=True, host='0.0.0.0', port=int(os.environ.get("PORT", 7860))) |