Spaces:
Sleeping
Sleeping
""" | |
Youtube Download MCP Server | |
This module provides MCP server functionality for downloading files from Youtube URLs. | |
It handles various download scenarios with proper validation, error handling, | |
and progress tracking. | |
Key features: | |
- File downloading from Youtube HTTP/HTTPS URLs | |
- Download progress tracking | |
- File validation | |
- Safe file saving | |
Main functions: | |
- mcpyoutubedownload: Downloads files from URLs of Youtube to local filesystem | |
""" | |
import os | |
import sys | |
import time | |
import traceback | |
import urllib.parse | |
from datetime import datetime | |
from pathlib import Path | |
from typing import Optional | |
from dotenv import load_dotenv | |
from mcp.server.fastmcp import FastMCP | |
from pydantic import BaseModel, Field | |
from selenium import webdriver | |
from selenium.webdriver.chrome.service import Service | |
from selenium.webdriver.common.by import By | |
from aworld.logs.util import logger | |
mcp = FastMCP("youtube-server") | |
_default_driver_path = os.environ.get( | |
"CHROME_DRIVER_PATH", | |
os.path.expanduser("~/Downloads/chromedriver-mac-arm64/chromedriver"), | |
) | |
class YoutubeDownloadResults(BaseModel): | |
"""Download result model with file information""" | |
file_path: str | |
file_name: str | |
file_size: int | |
content_type: Optional[str] = None | |
success: bool | |
error: Optional[str] = None | |
def download_youtube_files( | |
url: str = Field( | |
description="The URL of youtube file to download. Must be a String." | |
), | |
output_dir: str = Field( | |
"/tmp/mcp_downloads", | |
description="Directory to save the downloaded files (default: /tmp/mcp_downloads).", | |
), | |
timeout: int = Field( | |
180, description="Download timeout in seconds (default: 180)." | |
), | |
) -> str: | |
"""Download the youtube file from the URL and save to the local filesystem. | |
Args: | |
url: The URL of youtube file to download, must be a String | |
output_dir: Directory to save the downloaded files | |
timeout: Download timeout in seconds | |
Returns: | |
JSON string with download results information | |
""" | |
# Handle Field objects if they're passed directly | |
if hasattr(url, "default") and not isinstance(url, str): | |
url = url.default | |
if hasattr(output_dir, "default") and not isinstance(output_dir, str): | |
output_dir = output_dir.default | |
if hasattr(timeout, "default") and not isinstance(timeout, int): | |
timeout = timeout.default | |
def _get_youtube_content(url: str, output_dir: str, timeout: int) -> None: | |
"""Use Selenium to download YouTube content via cobalt.tools""" | |
try: | |
options = webdriver.ChromeOptions() | |
options.add_argument("--disable-blink-features=AutomationControlled") | |
# Set download file default path | |
prefs = { | |
"download.default_directory": output_dir, | |
"download.prompt_for_download": False, | |
"download.directory_upgrade": True, | |
"safebrowsing.enabled": True, | |
} | |
options.add_experimental_option("prefs", prefs) | |
# Create WebDriver object and launch Chrome browser | |
service = Service(executable_path=_default_driver_path) | |
driver = webdriver.Chrome(service=service, options=options) | |
logger.info(f"Opening cobalt.tools to download from {url}") | |
# Open target webpage | |
driver.get("https://cobalt.tools/") | |
# Wait for page to load | |
time.sleep(5) | |
# Find input field and enter YouTube link | |
input_field = driver.find_element(By.ID, "link-area") | |
input_field.send_keys(url) | |
time.sleep(5) | |
# Find download button and click | |
download_button = driver.find_element(By.ID, "download-button") | |
download_button.click() | |
time.sleep(5) | |
try: | |
# Handle bot detection popup | |
driver.find_element( | |
By.CLASS_NAME, | |
"button.elevated.popup-button.undefined.svelte-nnawom.active", | |
).click() | |
except Exception as e: | |
logger.warning(f"Bot detection handling: {str(e)}") | |
# Wait for download to complete | |
cnt = 0 | |
while ( | |
len(os.listdir(output_dir)) == 0 | |
or os.listdir(output_dir)[0].split(".")[-1] == "crdownload" | |
): | |
time.sleep(3) | |
cnt += 3 | |
if cnt >= timeout: | |
logger.warning(f"Download timeout after {timeout} seconds") | |
break | |
logger.info("Download process completed") | |
except Exception as e: | |
logger.error(f"Error during YouTube content download: {str(e)}") | |
raise | |
finally: | |
# Close browser | |
if "driver" in locals(): | |
driver.quit() | |
def _download_single_file( | |
url: str, output_dir: str, filename: str, timeout: int | |
) -> str: | |
"""Download a single file from URL and save it to the local filesystem.""" | |
try: | |
# Validate URL | |
if not url.startswith(("http://", "https://")): | |
raise ValueError( | |
"Invalid URL format. URL must start with http:// or https://" | |
) | |
# Create output directory if it doesn't exist | |
output_path = Path(output_dir) | |
output_path.mkdir(parents=True, exist_ok=True) | |
# Determine filename if not provided | |
if not filename: | |
filename = os.path.basename(urllib.parse.urlparse(url).path) | |
if not filename: | |
filename = "downloaded_file" | |
filename += "_" + datetime.now().strftime("%Y%m%d_%H%M%S") | |
file_path = Path(os.path.join(output_path, filename)) | |
file_path.mkdir(parents=True, exist_ok=True) | |
logger.info(f"Output path: {file_path}") | |
# check if video already exists with folder: /tmp/mcp_downloads | |
video_id = url.split("?v=")[-1].split("&")[0] if "?v=" in url else "" | |
base_path = os.getenv("FILESYSTEM_SERVER_WORKDIR") | |
# checker function | |
def find_existing_video(search_dir, video_id): | |
if not video_id: | |
return None | |
for item in os.listdir(search_dir): | |
item_path = os.path.join(search_dir, item) | |
if os.path.isfile(item_path) and video_id in item: | |
return item_path | |
elif os.path.isdir(item_path): | |
found = find_existing_video(item_path, video_id) | |
if found: | |
return found | |
return None | |
existing_file = find_existing_video(base_path, video_id) | |
if existing_file: | |
result = YoutubeDownloadResults( | |
file_path=existing_file, | |
file_name=os.path.basename(existing_file), | |
file_size=os.path.getsize(existing_file), | |
content_type="mp4", | |
success=True, | |
error=None, | |
) | |
logger.info( | |
f"Found {video_id} is already downloaded in: {existing_file}" | |
) | |
return result.model_dump_json() | |
logger.info(f"Downloading file from {url} to {file_path}") | |
_get_youtube_content(url, str(file_path), timeout) | |
# Check if download was successful | |
if len(os.listdir(file_path)) == 0: | |
raise FileNotFoundError("No files were downloaded") | |
download_file = os.path.join(file_path, os.listdir(file_path)[0]) | |
# Get actual file size | |
actual_size = os.path.getsize(download_file) | |
logger.success(f"File downloaded successfully to {download_file}") | |
# Create result | |
result = YoutubeDownloadResults( | |
file_path=download_file, | |
file_name=os.listdir(file_path)[0], | |
file_size=actual_size, | |
content_type="mp4", | |
success=True, | |
error=None, | |
) | |
return result.model_dump_json() | |
except Exception as e: | |
error_msg = str(e) | |
logger.error(f"Download error: {traceback.format_exc()}") | |
result = YoutubeDownloadResults( | |
file_path="", | |
file_name="", | |
file_size=0, | |
content_type=None, | |
success=False, | |
error=error_msg, | |
) | |
return result.model_dump_json() | |
result_json = _download_single_file(url, output_dir, "", timeout) | |
result = YoutubeDownloadResults.model_validate_json(result_json) | |
return result.model_dump_json() | |
def main(): | |
load_dotenv() | |
print("Starting YoutubeDownload MCP Server...", file=sys.stderr) | |
mcp.run(transport="stdio") | |
# Make the module callable | |
def __call__(): | |
""" | |
Make the module callable for uvx. | |
This function is called when the module is executed directly. | |
""" | |
main() | |
sys.modules[__name__].__call__ = __call__ | |
# Run the server when the script is executed directly | |
if __name__ == "__main__": | |
main() | |