import os import re import threading import uuid import requests import logging from flask import Flask, request, jsonify from urllib.parse import quote # --- Logging Configuration --- # This will print logs to the console, which you can view in your Space's "Logs" tab. logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') # --- Flask App Initialization --- app = Flask(__name__) # --- Secret Management --- # Get the base domain of your downloader service from Hugging Face secrets. YTDLP_DOMAIN = os.environ.get("YTDLP_DOMAIN") # --- In-memory Task Storage --- tasks = {} tasks_lock = threading.Lock() def get_format_code(quality: str) -> str: """Maps user-friendly quality names to yt-dlp format codes.""" quality_map = { "480": "135+140", "720": "136+140", "1080": "137+140", "best": "bestvideo+bestaudio", } return quality_map.get(quality, "best") def process_video_task(task_id: str, video_url: str, quality: str): """ This function runs in a background thread. It calls the external API, waits for the stream to complete, then parses the full response to find the download URL and updates the task status. """ app.logger.info(f"Task {task_id}: Starting for URL '{video_url}' with quality '{quality}'.") if not YTDLP_DOMAIN: app.logger.error(f"Task {task_id}: Failed. YTDLP_DOMAIN secret is not set in the Space settings.") with tasks_lock: tasks[task_id]['status'] = 'error' tasks[task_id]['message'] = "Server configuration error: YTDLP_DOMAIN secret is not set." return try: format_code = get_format_code(quality) command = f"{video_url} -f {format_code}" encoded_command = quote(command) stream_api_url = f"{YTDLP_DOMAIN}/stream?command={encoded_command}" app.logger.info(f"Task {task_id}: Calling stream API: {stream_api_url}") # Collect the full response from the stream full_response_lines = [] with requests.get(stream_api_url, stream=True, timeout=300) as response: response.raise_for_status() for line_bytes in response.iter_lines(): if line_bytes: line = line_bytes.decode('utf-8') full_response_lines.append(line) app.logger.info(f"Task {task_id}: Stream completed. Searching for download link in the response.") # Now, parse the complete response to find the link found_url = None for line in full_response_lines: match = re.search(r'href="(/download/download/[^"]+)"', line) if match: relative_url = match.group(1) final_stream_url = f"{YTDLP_DOMAIN}{relative_url}" found_url = final_stream_url app.logger.info(f"Task {task_id}: Found download link: {final_stream_url}") break # Found the link, no need to keep searching # After checking the whole response, update the task status with tasks_lock: if found_url: tasks[task_id]['status'] = 'completed' tasks[task_id]['result'] = found_url app.logger.info(f"Task {task_id}: Marked as COMPLETED.") else: tasks[task_id]['status'] = 'error' tasks[task_id]['message'] = "Command executed, but no download link was found in the API response." app.logger.warning(f"Task {task_id}: Marked as ERROR, no download link found.") # Log the last few lines for debugging, in case of an error message from the service app.logger.info(f"Task {task_id}: Last 5 lines of response: {full_response_lines[-5:]}") except requests.exceptions.RequestException as e: app.logger.error(f"Task {task_id}: A request exception occurred. Error: {e}") with tasks_lock: tasks[task_id]['status'] = 'error' tasks[task_id]['message'] = str(e) @app.route('/process', methods=['POST']) def start_processing(): """Endpoint to start a new video processing task.""" data = request.get_json() if not data or 'video_url' not in data or 'quality' not in data: return jsonify({"error": "Missing 'video_url' or 'quality' in request body"}), 400 task_id = str(uuid.uuid4()) app.logger.info(f"Received new job. Assigning Task ID: {task_id}") video_url = data['video_url'] quality = data.get('quality', 'best') with tasks_lock: tasks[task_id] = {'status': 'processing'} thread = threading.Thread(target=process_video_task, args=(task_id, video_url, quality)) thread.daemon = True thread.start() return jsonify({'task_id': task_id}), 202 @app.route('/api/', methods=['GET']) def get_task_status(task_id: str): """Endpoint to check the status and result of a task.""" app.logger.info(f"Status check for Task ID: {task_id}") with tasks_lock: task = tasks.get(task_id) if task is None: app.logger.warning(f"Status check for non-existent Task ID: {task_id}") return jsonify({'error': 'Task not found'}), 404 return jsonify(task) @app.route('/') def index(): """A simple index route to show the API is running.""" if not YTDLP_DOMAIN: return "

Video Processing API Error

Server is missing the YTDLP_DOMAIN secret.

", 500 return "

Video Processing API is running!

Use /process and /api/<task_id> endpoints.

" # This part is for local testing. Gunicorn runs the app in your Space. if __name__ == '__main__': app.run(debug=True, host='0.0.0.0', port=int(os.environ.get("PORT", 7860)))