Spaces:
Sleeping
Sleeping
File size: 5,635 Bytes
3e11f9b |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 |
"""
Download MCP Server
This module provides MCP server functionality for downloading files from URLs.
It handles various download scenarios with proper validation, error handling,
and progress tracking.
Key features:
- File downloading from HTTP/HTTPS URLs
- Download progress tracking
- File validation
- Safe file saving
Main functions:
- mcpdownload: Downloads files from URLs to local filesystem
"""
import os
import sys
import traceback
import urllib.parse
from pathlib import Path
from typing import List, Optional
import requests
from dotenv import load_dotenv
from mcp.server.fastmcp import FastMCP
from pydantic import BaseModel, Field
from aworld.logs.util import logger
mcp = FastMCP("download-server")
class DownloadResult(BaseModel):
"""Download result model with file information"""
file_path: str
file_name: str
file_size: int
content_type: Optional[str] = None
success: bool
error: Optional[str] = None
class DownloadResults(BaseModel):
"""Download results model for multiple files"""
results: List[DownloadResult]
success_count: int
failed_count: int
@mcp.tool(description="Download files from URLs and save to the local filesystem.")
def mcpdownloadfiles(
urls: List[str] = Field(
..., description="The URLs of the files to download. Must be a list of URLs."
),
output_dir: str = Field(
"/tmp/mcp_downloads",
description="Directory to save the downloaded files (default: /tmp/mcp_downloads).",
),
timeout: int = Field(60, description="Download timeout in seconds (default: 60)."),
) -> str:
"""Download files from URLs and save to the local filesystem.
Args:
urls: The URLs of the files to download, must be a list of URLs
output_dir: Directory to save the downloaded files
timeout: Download timeout in seconds
Returns:
JSON string with download results information
"""
results = []
success_count = 0
failed_count = 0
for single_url in urls:
result_json = _download_single_file(single_url, output_dir, "", timeout)
result = DownloadResult.model_validate_json(result_json)
results.append(result)
if result.success:
success_count += 1
else:
failed_count += 1
batch_results = DownloadResults(
results=results, success_count=success_count, failed_count=failed_count
)
return batch_results.model_dump_json()
def _download_single_file(
url: str, output_dir: str, filename: str, timeout: int
) -> str:
"""Download a single file from URL and save it to the local filesystem."""
try:
# Validate URL
if not url.startswith(("http://", "https://")):
raise ValueError(
"Invalid URL format. URL must start with http:// or https://"
)
# Create output directory if it doesn't exist
output_path = Path(output_dir)
output_path.mkdir(parents=True, exist_ok=True)
# Determine filename if not provided
if not filename:
filename = os.path.basename(urllib.parse.urlparse(url).path)
if not filename:
filename = "downloaded_file"
# Full path to save the file
file_path = os.path.join(output_path, filename)
logger.info(f"Downloading file from {url} to {file_path}")
# Download the file with progress tracking
headers = {
"User-Agent": (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AWorld/1.0 (https://github.com/inclusionAI/AWorld; qintong.wqt@antgroup.com) "
"Python/requests "
),
"Accept": "text/html,application/xhtml+xml,application/xml,application/pdf;q=0.9,image/webp,*/*;q=0.8",
"Accept-Language": "en-US,en;q=0.5",
"Accept-Encoding": "gzip, deflate, br",
"Connection": "keep-alive",
}
response = requests.get(url, headers=headers, stream=True, timeout=timeout)
response.raise_for_status()
# Get content type and size
content_type = response.headers.get("Content-Type")
# Save the file
with open(file_path, "wb") as f:
for chunk in response.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
# Get actual file size
actual_size = os.path.getsize(file_path)
logger.info(f"File downloaded successfully to {file_path}")
# Create result
result = DownloadResult(
file_path=file_path,
file_name=filename,
file_size=actual_size,
content_type=content_type,
success=True,
error=None,
)
return result.model_dump_json()
except Exception as e:
error_msg = str(e)
logger.error(f"Download error: {traceback.format_exc()}")
result = DownloadResult(
file_path="",
file_name="",
file_size=0,
content_type=None,
success=False,
error=error_msg,
)
return result.model_dump_json()
def main():
load_dotenv()
print("Starting Download MCP Server...", file=sys.stderr)
mcp.run(transport="stdio")
# Make the module callable
def __call__():
"""
Make the module callable for uvx.
This function is called when the module is executed directly.
"""
main()
sys.modules[__name__].__call__ = __call__
# Run the server when the script is executed directly
if __name__ == "__main__":
main()
|