Spaces:
Sleeping
Sleeping
import asyncio | |
import json | |
import os | |
import tempfile | |
from typing import List, Optional, Tuple | |
from urllib.parse import urlparse | |
import requests | |
from mcp.server import FastMCP | |
from aworld.logs.util import logger | |
def get_mime_type(file_path: str, default_mime: Optional[str] = None) -> str: | |
""" | |
Detect MIME type of a file using python-magic if available, | |
otherwise fallback to extension-based detection. | |
Args: | |
file_path: Path to the file | |
default_mime: Default MIME type to return if detection fails | |
Returns: | |
str: Detected MIME type | |
""" | |
# Try using python-magic for accurate MIME type detection | |
try: | |
# mime = magic.Magic(mime=True) | |
# return mime.from_file(file_path) | |
return "audio/mpeg" | |
except (AttributeError, IOError): | |
# Fallback to extension-based detection | |
extension_mime_map = { | |
# Audio formats | |
".mp3": "audio/mpeg", | |
".wav": "audio/wav", | |
".ogg": "audio/ogg", | |
".m4a": "audio/mp4", | |
".flac": "audio/flac", | |
# Image formats | |
".jpg": "image/jpeg", | |
".jpeg": "image/jpeg", | |
".png": "image/png", | |
".gif": "image/gif", | |
".webp": "image/webp", | |
".bmp": "image/bmp", | |
".tiff": "image/tiff", | |
# Video formats | |
".mp4": "video/mp4", | |
".avi": "video/x-msvideo", | |
".mov": "video/quicktime", | |
".mkv": "video/x-matroska", | |
".webm": "video/webm", | |
} | |
ext = os.path.splitext(file_path)[1].lower() | |
return extension_mime_map.get(ext, default_mime or "application/octet-stream") | |
def is_url(path_or_url: str) -> bool: | |
""" | |
Check if the given string is a URL. | |
Args: | |
path_or_url: String to check | |
Returns: | |
bool: True if the string is a URL, False otherwise | |
""" | |
parsed = urlparse(path_or_url) | |
return bool(parsed.scheme and parsed.netloc) | |
def get_file_from_source( | |
source: str, | |
allowed_mime_prefixes: List[str] = None, | |
max_size_mb: float = 100.0, | |
timeout: int = 60, | |
type: str = "image", | |
) -> Tuple[str, str, bytes]: | |
""" | |
Unified function to get file content from a URL or local path with validation. | |
Args: | |
source: URL or local file path | |
allowed_mime_prefixes: List of allowed MIME type prefixes (e.g., ['audio/', 'video/']) | |
max_size_mb: Maximum allowed file size in MB | |
timeout: Timeout for URL requests in seconds | |
Returns: | |
Tuple[str, str, bytes]: (file_path, mime_type, file_content) | |
- For URLs, file_path will be a temporary file path | |
- For local files, file_path will be the original path | |
Raises: | |
ValueError: When file doesn't exist, exceeds size limit, or has invalid MIME type | |
IOError: When file cannot be read | |
requests.RequestException: When URL request fails | |
""" | |
max_size_bytes = max_size_mb * 1024 * 1024 | |
temp_file = None | |
try: | |
if is_url(source): | |
# Handle URL | |
logger.info(f"Downloading file from URL: {source}") | |
response = requests.get(source, stream=True, timeout=timeout) | |
response.raise_for_status() | |
# Check Content-Length if available | |
content_length = response.headers.get("Content-Length") | |
if content_length and int(content_length) > max_size_bytes: | |
raise ValueError(f"File size exceeds limit of {max_size_mb}MB") | |
# Create a temporary file | |
temp_file = tempfile.NamedTemporaryFile(delete=False) | |
file_path = temp_file.name | |
# Download content in chunks to avoid memory issues | |
content = bytearray() | |
downloaded_size = 0 | |
for chunk in response.iter_content(chunk_size=8192): | |
downloaded_size += len(chunk) | |
if downloaded_size > max_size_bytes: | |
raise ValueError(f"File size exceeds limit of {max_size_mb}MB") | |
temp_file.write(chunk) | |
content.extend(chunk) | |
temp_file.close() | |
# Get MIME type | |
if type == "audio": | |
mime_type = "audio/mpeg" | |
elif type == "image": | |
mime_type = "image/jpeg" | |
elif type == "video": | |
mime_type = "video/mp4" | |
# mime_type = get_mime_type(file_path) | |
# For URLs where magic fails, try to use Content-Type header | |
if mime_type == "application/octet-stream": | |
content_type = response.headers.get("Content-Type", "").split(";")[0] | |
if content_type: | |
mime_type = content_type | |
else: | |
# Handle local file | |
file_path = os.path.abspath(source) | |
# Check if file exists | |
if not os.path.exists(file_path): | |
raise ValueError(f"File not found: {file_path}") | |
# Check file size | |
file_size = os.path.getsize(file_path) | |
if file_size > max_size_bytes: | |
raise ValueError(f"File size exceeds limit of {max_size_mb}MB") | |
# Get MIME type | |
if type == "audio": | |
mime_type = "audio/mpeg" | |
elif type == "image": | |
mime_type = "image/jpeg" | |
elif type == "video": | |
mime_type = "video/mp4" | |
# mime_type = get_mime_type(file_path) | |
# Read file content | |
with open(file_path, "rb") as f: | |
content = f.read() | |
# Validate MIME type if allowed_mime_prefixes is provided | |
if allowed_mime_prefixes: | |
if not any( | |
mime_type.startswith(prefix) for prefix in allowed_mime_prefixes | |
): | |
allowed_types = ", ".join(allowed_mime_prefixes) | |
raise ValueError( | |
f"Invalid file type: {mime_type}. Allowed types: {allowed_types}" | |
) | |
return file_path, mime_type, content | |
except Exception as e: | |
# Clean up temporary file if an error occurs | |
if temp_file and os.path.exists(temp_file.name): | |
os.unlink(temp_file.name) | |
raise e | |
if __name__ == "__main__": | |
mcp_tools=[] | |
logger.info(f"{json.dumps(mcp_tools, indent=4, ensure_ascii=False)}") | |