Duibonduil's picture
Upload 17 files
3e11f9b verified
"""
Youtube Download MCP Server
This module provides MCP server functionality for downloading files from Youtube URLs.
It handles various download scenarios with proper validation, error handling,
and progress tracking.
Key features:
- File downloading from Youtube HTTP/HTTPS URLs
- Download progress tracking
- File validation
- Safe file saving
Main functions:
- mcpyoutubedownload: Downloads files from URLs of Youtube to local filesystem
"""
import os
import sys
import time
import traceback
import urllib.parse
from datetime import datetime
from pathlib import Path
from typing import Optional
from dotenv import load_dotenv
from mcp.server.fastmcp import FastMCP
from pydantic import BaseModel, Field
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.common.by import By
from aworld.logs.util import logger
mcp = FastMCP("youtube-server")
_default_driver_path = os.environ.get(
"CHROME_DRIVER_PATH",
os.path.expanduser("~/Downloads/chromedriver-mac-arm64/chromedriver"),
)
class YoutubeDownloadResults(BaseModel):
"""Download result model with file information"""
file_path: str
file_name: str
file_size: int
content_type: Optional[str] = None
success: bool
error: Optional[str] = None
@mcp.tool(
description="Download the youtube file from the URL and save to the local filesystem."
)
def download_youtube_files(
url: str = Field(
description="The URL of youtube file to download. Must be a String."
),
output_dir: str = Field(
"/tmp/mcp_downloads",
description="Directory to save the downloaded files (default: /tmp/mcp_downloads).",
),
timeout: int = Field(
180, description="Download timeout in seconds (default: 180)."
),
) -> str:
"""Download the youtube file from the URL and save to the local filesystem.
Args:
url: The URL of youtube file to download, must be a String
output_dir: Directory to save the downloaded files
timeout: Download timeout in seconds
Returns:
JSON string with download results information
"""
# Handle Field objects if they're passed directly
if hasattr(url, "default") and not isinstance(url, str):
url = url.default
if hasattr(output_dir, "default") and not isinstance(output_dir, str):
output_dir = output_dir.default
if hasattr(timeout, "default") and not isinstance(timeout, int):
timeout = timeout.default
def _get_youtube_content(url: str, output_dir: str, timeout: int) -> None:
"""Use Selenium to download YouTube content via cobalt.tools"""
try:
options = webdriver.ChromeOptions()
options.add_argument("--disable-blink-features=AutomationControlled")
# Set download file default path
prefs = {
"download.default_directory": output_dir,
"download.prompt_for_download": False,
"download.directory_upgrade": True,
"safebrowsing.enabled": True,
}
options.add_experimental_option("prefs", prefs)
# Create WebDriver object and launch Chrome browser
service = Service(executable_path=_default_driver_path)
driver = webdriver.Chrome(service=service, options=options)
logger.info(f"Opening cobalt.tools to download from {url}")
# Open target webpage
driver.get("https://cobalt.tools/")
# Wait for page to load
time.sleep(5)
# Find input field and enter YouTube link
input_field = driver.find_element(By.ID, "link-area")
input_field.send_keys(url)
time.sleep(5)
# Find download button and click
download_button = driver.find_element(By.ID, "download-button")
download_button.click()
time.sleep(5)
try:
# Handle bot detection popup
driver.find_element(
By.CLASS_NAME,
"button.elevated.popup-button.undefined.svelte-nnawom.active",
).click()
except Exception as e:
logger.warning(f"Bot detection handling: {str(e)}")
# Wait for download to complete
cnt = 0
while (
len(os.listdir(output_dir)) == 0
or os.listdir(output_dir)[0].split(".")[-1] == "crdownload"
):
time.sleep(3)
cnt += 3
if cnt >= timeout:
logger.warning(f"Download timeout after {timeout} seconds")
break
logger.info("Download process completed")
except Exception as e:
logger.error(f"Error during YouTube content download: {str(e)}")
raise
finally:
# Close browser
if "driver" in locals():
driver.quit()
def _download_single_file(
url: str, output_dir: str, filename: str, timeout: int
) -> str:
"""Download a single file from URL and save it to the local filesystem."""
try:
# Validate URL
if not url.startswith(("http://", "https://")):
raise ValueError(
"Invalid URL format. URL must start with http:// or https://"
)
# Create output directory if it doesn't exist
output_path = Path(output_dir)
output_path.mkdir(parents=True, exist_ok=True)
# Determine filename if not provided
if not filename:
filename = os.path.basename(urllib.parse.urlparse(url).path)
if not filename:
filename = "downloaded_file"
filename += "_" + datetime.now().strftime("%Y%m%d_%H%M%S")
file_path = Path(os.path.join(output_path, filename))
file_path.mkdir(parents=True, exist_ok=True)
logger.info(f"Output path: {file_path}")
# check if video already exists with folder: /tmp/mcp_downloads
video_id = url.split("?v=")[-1].split("&")[0] if "?v=" in url else ""
base_path = os.getenv("FILESYSTEM_SERVER_WORKDIR")
# checker function
def find_existing_video(search_dir, video_id):
if not video_id:
return None
for item in os.listdir(search_dir):
item_path = os.path.join(search_dir, item)
if os.path.isfile(item_path) and video_id in item:
return item_path
elif os.path.isdir(item_path):
found = find_existing_video(item_path, video_id)
if found:
return found
return None
existing_file = find_existing_video(base_path, video_id)
if existing_file:
result = YoutubeDownloadResults(
file_path=existing_file,
file_name=os.path.basename(existing_file),
file_size=os.path.getsize(existing_file),
content_type="mp4",
success=True,
error=None,
)
logger.info(
f"Found {video_id} is already downloaded in: {existing_file}"
)
return result.model_dump_json()
logger.info(f"Downloading file from {url} to {file_path}")
_get_youtube_content(url, str(file_path), timeout)
# Check if download was successful
if len(os.listdir(file_path)) == 0:
raise FileNotFoundError("No files were downloaded")
download_file = os.path.join(file_path, os.listdir(file_path)[0])
# Get actual file size
actual_size = os.path.getsize(download_file)
logger.success(f"File downloaded successfully to {download_file}")
# Create result
result = YoutubeDownloadResults(
file_path=download_file,
file_name=os.listdir(file_path)[0],
file_size=actual_size,
content_type="mp4",
success=True,
error=None,
)
return result.model_dump_json()
except Exception as e:
error_msg = str(e)
logger.error(f"Download error: {traceback.format_exc()}")
result = YoutubeDownloadResults(
file_path="",
file_name="",
file_size=0,
content_type=None,
success=False,
error=error_msg,
)
return result.model_dump_json()
result_json = _download_single_file(url, output_dir, "", timeout)
result = YoutubeDownloadResults.model_validate_json(result_json)
return result.model_dump_json()
def main():
load_dotenv()
print("Starting YoutubeDownload MCP Server...", file=sys.stderr)
mcp.run(transport="stdio")
# Make the module callable
def __call__():
"""
Make the module callable for uvx.
This function is called when the module is executed directly.
"""
main()
sys.modules[__name__].__call__ = __call__
# Run the server when the script is executed directly
if __name__ == "__main__":
main()