File size: 9,494 Bytes
3e11f9b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
"""
Youtube Download MCP Server

This module provides MCP server functionality for downloading files from Youtube URLs.
It handles various download scenarios with proper validation, error handling,
and progress tracking.

Key features:
- File downloading from Youtube HTTP/HTTPS URLs
- Download progress tracking
- File validation
- Safe file saving

Main functions:
- mcpyoutubedownload: Downloads files from URLs of Youtube to local filesystem
"""

import os
import sys
import time
import traceback
import urllib.parse
from datetime import datetime
from pathlib import Path
from typing import Optional

from dotenv import load_dotenv
from mcp.server.fastmcp import FastMCP
from pydantic import BaseModel, Field
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.common.by import By

from aworld.logs.util import logger

mcp = FastMCP("youtube-server")
_default_driver_path = os.environ.get(
    "CHROME_DRIVER_PATH",
    os.path.expanduser("~/Downloads/chromedriver-mac-arm64/chromedriver"),
)


class YoutubeDownloadResults(BaseModel):
    """Download result model with file information"""

    file_path: str
    file_name: str
    file_size: int
    content_type: Optional[str] = None
    success: bool
    error: Optional[str] = None


@mcp.tool(
    description="Download the youtube file from the URL and save to the local filesystem."
)
def download_youtube_files(
    url: str = Field(
        description="The URL of youtube file to download. Must be a String."
    ),
    output_dir: str = Field(
        "/tmp/mcp_downloads",
        description="Directory to save the downloaded files (default: /tmp/mcp_downloads).",
    ),
    timeout: int = Field(
        180, description="Download timeout in seconds (default: 180)."
    ),
) -> str:
    """Download the youtube file from the URL and save to the local filesystem.

    Args:
        url: The URL of youtube file to download, must be a String
        output_dir: Directory to save the downloaded files
        timeout: Download timeout in seconds

    Returns:
        JSON string with download results information
    """
    # Handle Field objects if they're passed directly
    if hasattr(url, "default") and not isinstance(url, str):
        url = url.default

    if hasattr(output_dir, "default") and not isinstance(output_dir, str):
        output_dir = output_dir.default

    if hasattr(timeout, "default") and not isinstance(timeout, int):
        timeout = timeout.default

    def _get_youtube_content(url: str, output_dir: str, timeout: int) -> None:
        """Use Selenium to download YouTube content via cobalt.tools"""
        try:
            options = webdriver.ChromeOptions()
            options.add_argument("--disable-blink-features=AutomationControlled")
            # Set download file default path
            prefs = {
                "download.default_directory": output_dir,
                "download.prompt_for_download": False,
                "download.directory_upgrade": True,
                "safebrowsing.enabled": True,
            }
            options.add_experimental_option("prefs", prefs)
            # Create WebDriver object and launch Chrome browser
            service = Service(executable_path=_default_driver_path)
            driver = webdriver.Chrome(service=service, options=options)

            logger.info(f"Opening cobalt.tools to download from {url}")
            # Open target webpage
            driver.get("https://cobalt.tools/")
            # Wait for page to load
            time.sleep(5)
            # Find input field and enter YouTube link
            input_field = driver.find_element(By.ID, "link-area")
            input_field.send_keys(url)
            time.sleep(5)
            # Find download button and click
            download_button = driver.find_element(By.ID, "download-button")
            download_button.click()
            time.sleep(5)

            try:
                # Handle bot detection popup
                driver.find_element(
                    By.CLASS_NAME,
                    "button.elevated.popup-button.undefined.svelte-nnawom.active",
                ).click()
            except Exception as e:
                logger.warning(f"Bot detection handling: {str(e)}")

            # Wait for download to complete
            cnt = 0
            while (
                len(os.listdir(output_dir)) == 0
                or os.listdir(output_dir)[0].split(".")[-1] == "crdownload"
            ):
                time.sleep(3)
                cnt += 3
                if cnt >= timeout:
                    logger.warning(f"Download timeout after {timeout} seconds")
                    break

            logger.info("Download process completed")

        except Exception as e:
            logger.error(f"Error during YouTube content download: {str(e)}")
            raise
        finally:
            # Close browser
            if "driver" in locals():
                driver.quit()

    def _download_single_file(
        url: str, output_dir: str, filename: str, timeout: int
    ) -> str:
        """Download a single file from URL and save it to the local filesystem."""
        try:
            # Validate URL
            if not url.startswith(("http://", "https://")):
                raise ValueError(
                    "Invalid URL format. URL must start with http:// or https://"
                )

            # Create output directory if it doesn't exist
            output_path = Path(output_dir)
            output_path.mkdir(parents=True, exist_ok=True)

            # Determine filename if not provided
            if not filename:
                filename = os.path.basename(urllib.parse.urlparse(url).path)
                if not filename:
                    filename = "downloaded_file"
            filename += "_" + datetime.now().strftime("%Y%m%d_%H%M%S")

            file_path = Path(os.path.join(output_path, filename))
            file_path.mkdir(parents=True, exist_ok=True)
            logger.info(f"Output path: {file_path}")

            # check if video already exists with folder: /tmp/mcp_downloads
            video_id = url.split("?v=")[-1].split("&")[0] if "?v=" in url else ""
            base_path = os.getenv("FILESYSTEM_SERVER_WORKDIR")

            # checker function
            def find_existing_video(search_dir, video_id):
                if not video_id:
                    return None

                for item in os.listdir(search_dir):
                    item_path = os.path.join(search_dir, item)

                    if os.path.isfile(item_path) and video_id in item:
                        return item_path

                    elif os.path.isdir(item_path):
                        found = find_existing_video(item_path, video_id)
                        if found:
                            return found

                return None

            existing_file = find_existing_video(base_path, video_id)
            if existing_file:
                result = YoutubeDownloadResults(
                    file_path=existing_file,
                    file_name=os.path.basename(existing_file),
                    file_size=os.path.getsize(existing_file),
                    content_type="mp4",
                    success=True,
                    error=None,
                )
                logger.info(
                    f"Found {video_id} is already downloaded in: {existing_file}"
                )
                return result.model_dump_json()

            logger.info(f"Downloading file from {url} to {file_path}")

            _get_youtube_content(url, str(file_path), timeout)

            # Check if download was successful
            if len(os.listdir(file_path)) == 0:
                raise FileNotFoundError("No files were downloaded")

            download_file = os.path.join(file_path, os.listdir(file_path)[0])

            # Get actual file size
            actual_size = os.path.getsize(download_file)
            logger.success(f"File downloaded successfully to {download_file}")

            # Create result
            result = YoutubeDownloadResults(
                file_path=download_file,
                file_name=os.listdir(file_path)[0],
                file_size=actual_size,
                content_type="mp4",
                success=True,
                error=None,
            )

            return result.model_dump_json()

        except Exception as e:
            error_msg = str(e)
            logger.error(f"Download error: {traceback.format_exc()}")

            result = YoutubeDownloadResults(
                file_path="",
                file_name="",
                file_size=0,
                content_type=None,
                success=False,
                error=error_msg,
            )

            return result.model_dump_json()

    result_json = _download_single_file(url, output_dir, "", timeout)
    result = YoutubeDownloadResults.model_validate_json(result_json)
    return result.model_dump_json()


def main():
    load_dotenv()
    print("Starting YoutubeDownload MCP Server...", file=sys.stderr)
    mcp.run(transport="stdio")


# Make the module callable
def __call__():
    """
    Make the module callable for uvx.
    This function is called when the module is executed directly.
    """
    main()


sys.modules[__name__].__call__ = __call__

# Run the server when the script is executed directly
if __name__ == "__main__":
    main()