online / app.py
Chrunos's picture
Update app.py
f630a4e verified
import os
import re
import threading
import uuid
import requests
import logging
from flask import Flask, request, jsonify
from urllib.parse import quote
# --- Logging Configuration ---
# This will print logs to the console, which you can view in your Space's "Logs" tab.
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# --- Flask App Initialization ---
app = Flask(__name__)
# --- Secret Management ---
# Get the base domain of your downloader service from Hugging Face secrets.
YTDLP_DOMAIN = os.environ.get("YTDLP_DOMAIN")
# --- In-memory Task Storage ---
tasks = {}
tasks_lock = threading.Lock()
def get_format_code(quality: str) -> str:
"""Maps user-friendly quality names to yt-dlp format codes."""
quality_map = {
"480": "135+140",
"720": "136+140",
"1080": "137+140",
"best": "bestvideo+bestaudio",
}
return quality_map.get(quality, "best")
def process_video_task(task_id: str, video_url: str, quality: str):
"""
This function runs in a background thread. It calls the external API,
waits for the stream to complete, then parses the full response to find the
download URL and updates the task status.
"""
app.logger.info(f"Task {task_id}: Starting for URL '{video_url}' with quality '{quality}'.")
if not YTDLP_DOMAIN:
app.logger.error(f"Task {task_id}: Failed. YTDLP_DOMAIN secret is not set in the Space settings.")
with tasks_lock:
tasks[task_id]['status'] = 'error'
tasks[task_id]['message'] = "Server configuration error: YTDLP_DOMAIN secret is not set."
return
try:
format_code = get_format_code(quality)
command = f"{video_url} -f {format_code}"
encoded_command = quote(command)
stream_api_url = f"{YTDLP_DOMAIN}/stream?command={encoded_command}"
app.logger.info(f"Task {task_id}: Calling stream API: {stream_api_url}")
# Collect the full response from the stream
full_response_lines = []
with requests.get(stream_api_url, stream=True, timeout=300) as response:
response.raise_for_status()
for line_bytes in response.iter_lines():
if line_bytes:
line = line_bytes.decode('utf-8')
full_response_lines.append(line)
app.logger.info(f"Task {task_id}: Stream completed. Searching for download link in the response.")
# Now, parse the complete response to find the link
found_url = None
for line in full_response_lines:
match = re.search(r'href="(/download/download/[^"]+)"', line)
if match:
relative_url = match.group(1)
final_stream_url = f"{YTDLP_DOMAIN}{relative_url}"
found_url = final_stream_url
app.logger.info(f"Task {task_id}: Found download link: {final_stream_url}")
break # Found the link, no need to keep searching
# After checking the whole response, update the task status
with tasks_lock:
if found_url:
tasks[task_id]['status'] = 'completed'
tasks[task_id]['result'] = found_url
app.logger.info(f"Task {task_id}: Marked as COMPLETED.")
else:
tasks[task_id]['status'] = 'error'
tasks[task_id]['message'] = "Command executed, but no download link was found in the API response."
app.logger.warning(f"Task {task_id}: Marked as ERROR, no download link found.")
# Log the last few lines for debugging, in case of an error message from the service
app.logger.info(f"Task {task_id}: Last 5 lines of response: {full_response_lines[-5:]}")
except requests.exceptions.RequestException as e:
app.logger.error(f"Task {task_id}: A request exception occurred. Error: {e}")
with tasks_lock:
tasks[task_id]['status'] = 'error'
tasks[task_id]['message'] = str(e)
@app.route('/process', methods=['POST'])
def start_processing():
"""Endpoint to start a new video processing task."""
data = request.get_json()
if not data or 'video_url' not in data or 'quality' not in data:
return jsonify({"error": "Missing 'video_url' or 'quality' in request body"}), 400
task_id = str(uuid.uuid4())
app.logger.info(f"Received new job. Assigning Task ID: {task_id}")
video_url = data['video_url']
quality = data.get('quality', 'best')
with tasks_lock:
tasks[task_id] = {'status': 'processing'}
thread = threading.Thread(target=process_video_task, args=(task_id, video_url, quality))
thread.daemon = True
thread.start()
return jsonify({'task_id': task_id}), 202
@app.route('/api/<string:task_id>', methods=['GET'])
def get_task_status(task_id: str):
"""Endpoint to check the status and result of a task."""
app.logger.info(f"Status check for Task ID: {task_id}")
with tasks_lock:
task = tasks.get(task_id)
if task is None:
app.logger.warning(f"Status check for non-existent Task ID: {task_id}")
return jsonify({'error': 'Task not found'}), 404
return jsonify(task)
@app.route('/')
def index():
"""A simple index route to show the API is running."""
if not YTDLP_DOMAIN:
return "<h1>Video Processing API Error</h1><p>Server is missing the YTDLP_DOMAIN secret.</p>", 500
return "<h1>Video Processing API is running!</h1><p>Use /process and /api/&lt;task_id&gt; endpoints.</p>"
# This part is for local testing. Gunicorn runs the app in your Space.
if __name__ == '__main__':
app.run(debug=True, host='0.0.0.0', port=int(os.environ.get("PORT", 7860)))