File size: 5,789 Bytes
c09c0d6
 
 
 
 
5e23e4b
c09c0d6
 
 
5e23e4b
 
 
 
 
c09c0d6
 
 
 
 
 
5e23e4b
c09c0d6
 
 
 
 
 
f630a4e
 
 
 
c09c0d6
 
 
 
 
 
5e23e4b
 
c09c0d6
5e23e4b
 
c09c0d6
5e23e4b
c09c0d6
 
5e23e4b
c09c0d6
 
 
 
 
 
 
 
5e23e4b
 
 
 
c09c0d6
 
 
 
 
5e23e4b
c09c0d6
5e23e4b
 
 
 
 
 
 
 
 
 
 
 
 
 
c09c0d6
 
 
 
5e23e4b
c09c0d6
 
5e23e4b
 
 
 
 
c09c0d6
 
5e23e4b
c09c0d6
 
 
 
 
 
 
5e23e4b
c09c0d6
 
 
 
5e23e4b
 
 
c09c0d6
 
 
 
 
 
 
 
 
 
 
 
 
 
5e23e4b
 
c09c0d6
 
 
 
5e23e4b
c09c0d6
5e23e4b
c09c0d6
 
 
 
 
 
 
 
 
5e23e4b
c09c0d6
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
import os
import re
import threading
import uuid
import requests
import logging
from flask import Flask, request, jsonify
from urllib.parse import quote

# --- Logging Configuration ---
# This will print logs to the console, which you can view in your Space's "Logs" tab.
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# --- Flask App Initialization ---
app = Flask(__name__)

# --- Secret Management ---
# Get the base domain of your downloader service from Hugging Face secrets.
YTDLP_DOMAIN = os.environ.get("YTDLP_DOMAIN")

# --- In-memory Task Storage ---
tasks = {}
tasks_lock = threading.Lock()

def get_format_code(quality: str) -> str:
    """Maps user-friendly quality names to yt-dlp format codes."""
    quality_map = {
        "480": "135+140",
        "720": "136+140",
        "1080": "137+140",
        "best": "bestvideo+bestaudio",
    }
    return quality_map.get(quality, "best")

def process_video_task(task_id: str, video_url: str, quality: str):
    """
    This function runs in a background thread. It calls the external API,
    waits for the stream to complete, then parses the full response to find the
    download URL and updates the task status.
    """
    app.logger.info(f"Task {task_id}: Starting for URL '{video_url}' with quality '{quality}'.")

    if not YTDLP_DOMAIN:
        app.logger.error(f"Task {task_id}: Failed. YTDLP_DOMAIN secret is not set in the Space settings.")
        with tasks_lock:
            tasks[task_id]['status'] = 'error'
            tasks[task_id]['message'] = "Server configuration error: YTDLP_DOMAIN secret is not set."
        return

    try:
        format_code = get_format_code(quality)
        command = f"{video_url} -f {format_code}"
        encoded_command = quote(command)
        stream_api_url = f"{YTDLP_DOMAIN}/stream?command={encoded_command}"

        app.logger.info(f"Task {task_id}: Calling stream API: {stream_api_url}")
        
        # Collect the full response from the stream
        full_response_lines = []
        with requests.get(stream_api_url, stream=True, timeout=300) as response:
            response.raise_for_status()
            for line_bytes in response.iter_lines():
                if line_bytes:
                    line = line_bytes.decode('utf-8')
                    full_response_lines.append(line)
        
        app.logger.info(f"Task {task_id}: Stream completed. Searching for download link in the response.")
        
        # Now, parse the complete response to find the link
        found_url = None
        for line in full_response_lines:
            match = re.search(r'href="(/download/download/[^"]+)"', line)
            if match:
                relative_url = match.group(1)
                final_stream_url = f"{YTDLP_DOMAIN}{relative_url}"
                found_url = final_stream_url
                app.logger.info(f"Task {task_id}: Found download link: {final_stream_url}")
                break # Found the link, no need to keep searching

        # After checking the whole response, update the task status
        with tasks_lock:
            if found_url:
                tasks[task_id]['status'] = 'completed'
                tasks[task_id]['result'] = found_url
                app.logger.info(f"Task {task_id}: Marked as COMPLETED.")
            else:
                tasks[task_id]['status'] = 'error'
                tasks[task_id]['message'] = "Command executed, but no download link was found in the API response."
                app.logger.warning(f"Task {task_id}: Marked as ERROR, no download link found.")
                # Log the last few lines for debugging, in case of an error message from the service
                app.logger.info(f"Task {task_id}: Last 5 lines of response: {full_response_lines[-5:]}")


    except requests.exceptions.RequestException as e:
        app.logger.error(f"Task {task_id}: A request exception occurred. Error: {e}")
        with tasks_lock:
            tasks[task_id]['status'] = 'error'
            tasks[task_id]['message'] = str(e)


@app.route('/process', methods=['POST'])
def start_processing():
    """Endpoint to start a new video processing task."""
    data = request.get_json()
    if not data or 'video_url' not in data or 'quality' not in data:
        return jsonify({"error": "Missing 'video_url' or 'quality' in request body"}), 400

    task_id = str(uuid.uuid4())
    app.logger.info(f"Received new job. Assigning Task ID: {task_id}")

    video_url = data['video_url']
    quality = data.get('quality', 'best')

    with tasks_lock:
        tasks[task_id] = {'status': 'processing'}

    thread = threading.Thread(target=process_video_task, args=(task_id, video_url, quality))
    thread.daemon = True
    thread.start()

    return jsonify({'task_id': task_id}), 202

@app.route('/api/<string:task_id>', methods=['GET'])
def get_task_status(task_id: str):
    """Endpoint to check the status and result of a task."""
    app.logger.info(f"Status check for Task ID: {task_id}")
    with tasks_lock:
        task = tasks.get(task_id)

    if task is None:
        app.logger.warning(f"Status check for non-existent Task ID: {task_id}")
        return jsonify({'error': 'Task not found'}), 404
    
    return jsonify(task)

@app.route('/')
def index():
    """A simple index route to show the API is running."""
    if not YTDLP_DOMAIN:
        return "<h1>Video Processing API Error</h1><p>Server is missing the YTDLP_DOMAIN secret.</p>", 500
    return "<h1>Video Processing API is running!</h1><p>Use /process and /api/&lt;task_id&gt; endpoints.</p>"

# This part is for local testing. Gunicorn runs the app in your Space.
if __name__ == '__main__':
    app.run(debug=True, host='0.0.0.0', port=int(os.environ.get("PORT", 7860)))