Duibonduil's picture
Upload 17 files
3e11f9b verified
import asyncio
import json
import os
import tempfile
from typing import List, Optional, Tuple
from urllib.parse import urlparse
import requests
from mcp.server import FastMCP
from aworld.logs.util import logger
def get_mime_type(file_path: str, default_mime: Optional[str] = None) -> str:
"""
Detect MIME type of a file using python-magic if available,
otherwise fallback to extension-based detection.
Args:
file_path: Path to the file
default_mime: Default MIME type to return if detection fails
Returns:
str: Detected MIME type
"""
# Try using python-magic for accurate MIME type detection
try:
# mime = magic.Magic(mime=True)
# return mime.from_file(file_path)
return "audio/mpeg"
except (AttributeError, IOError):
# Fallback to extension-based detection
extension_mime_map = {
# Audio formats
".mp3": "audio/mpeg",
".wav": "audio/wav",
".ogg": "audio/ogg",
".m4a": "audio/mp4",
".flac": "audio/flac",
# Image formats
".jpg": "image/jpeg",
".jpeg": "image/jpeg",
".png": "image/png",
".gif": "image/gif",
".webp": "image/webp",
".bmp": "image/bmp",
".tiff": "image/tiff",
# Video formats
".mp4": "video/mp4",
".avi": "video/x-msvideo",
".mov": "video/quicktime",
".mkv": "video/x-matroska",
".webm": "video/webm",
}
ext = os.path.splitext(file_path)[1].lower()
return extension_mime_map.get(ext, default_mime or "application/octet-stream")
def is_url(path_or_url: str) -> bool:
"""
Check if the given string is a URL.
Args:
path_or_url: String to check
Returns:
bool: True if the string is a URL, False otherwise
"""
parsed = urlparse(path_or_url)
return bool(parsed.scheme and parsed.netloc)
def get_file_from_source(
source: str,
allowed_mime_prefixes: List[str] = None,
max_size_mb: float = 100.0,
timeout: int = 60,
type: str = "image",
) -> Tuple[str, str, bytes]:
"""
Unified function to get file content from a URL or local path with validation.
Args:
source: URL or local file path
allowed_mime_prefixes: List of allowed MIME type prefixes (e.g., ['audio/', 'video/'])
max_size_mb: Maximum allowed file size in MB
timeout: Timeout for URL requests in seconds
Returns:
Tuple[str, str, bytes]: (file_path, mime_type, file_content)
- For URLs, file_path will be a temporary file path
- For local files, file_path will be the original path
Raises:
ValueError: When file doesn't exist, exceeds size limit, or has invalid MIME type
IOError: When file cannot be read
requests.RequestException: When URL request fails
"""
max_size_bytes = max_size_mb * 1024 * 1024
temp_file = None
try:
if is_url(source):
# Handle URL
logger.info(f"Downloading file from URL: {source}")
response = requests.get(source, stream=True, timeout=timeout)
response.raise_for_status()
# Check Content-Length if available
content_length = response.headers.get("Content-Length")
if content_length and int(content_length) > max_size_bytes:
raise ValueError(f"File size exceeds limit of {max_size_mb}MB")
# Create a temporary file
temp_file = tempfile.NamedTemporaryFile(delete=False)
file_path = temp_file.name
# Download content in chunks to avoid memory issues
content = bytearray()
downloaded_size = 0
for chunk in response.iter_content(chunk_size=8192):
downloaded_size += len(chunk)
if downloaded_size > max_size_bytes:
raise ValueError(f"File size exceeds limit of {max_size_mb}MB")
temp_file.write(chunk)
content.extend(chunk)
temp_file.close()
# Get MIME type
if type == "audio":
mime_type = "audio/mpeg"
elif type == "image":
mime_type = "image/jpeg"
elif type == "video":
mime_type = "video/mp4"
# mime_type = get_mime_type(file_path)
# For URLs where magic fails, try to use Content-Type header
if mime_type == "application/octet-stream":
content_type = response.headers.get("Content-Type", "").split(";")[0]
if content_type:
mime_type = content_type
else:
# Handle local file
file_path = os.path.abspath(source)
# Check if file exists
if not os.path.exists(file_path):
raise ValueError(f"File not found: {file_path}")
# Check file size
file_size = os.path.getsize(file_path)
if file_size > max_size_bytes:
raise ValueError(f"File size exceeds limit of {max_size_mb}MB")
# Get MIME type
if type == "audio":
mime_type = "audio/mpeg"
elif type == "image":
mime_type = "image/jpeg"
elif type == "video":
mime_type = "video/mp4"
# mime_type = get_mime_type(file_path)
# Read file content
with open(file_path, "rb") as f:
content = f.read()
# Validate MIME type if allowed_mime_prefixes is provided
if allowed_mime_prefixes:
if not any(
mime_type.startswith(prefix) for prefix in allowed_mime_prefixes
):
allowed_types = ", ".join(allowed_mime_prefixes)
raise ValueError(
f"Invalid file type: {mime_type}. Allowed types: {allowed_types}"
)
return file_path, mime_type, content
except Exception as e:
# Clean up temporary file if an error occurs
if temp_file and os.path.exists(temp_file.name):
os.unlink(temp_file.name)
raise e
if __name__ == "__main__":
mcp_tools=[]
logger.info(f"{json.dumps(mcp_tools, indent=4, ensure_ascii=False)}")