Spaces:
Sleeping
Sleeping
File size: 6,421 Bytes
3e11f9b |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 |
import asyncio
import json
import os
import tempfile
from typing import List, Optional, Tuple
from urllib.parse import urlparse
import requests
from mcp.server import FastMCP
from aworld.logs.util import logger
def get_mime_type(file_path: str, default_mime: Optional[str] = None) -> str:
"""
Detect MIME type of a file using python-magic if available,
otherwise fallback to extension-based detection.
Args:
file_path: Path to the file
default_mime: Default MIME type to return if detection fails
Returns:
str: Detected MIME type
"""
# Try using python-magic for accurate MIME type detection
try:
# mime = magic.Magic(mime=True)
# return mime.from_file(file_path)
return "audio/mpeg"
except (AttributeError, IOError):
# Fallback to extension-based detection
extension_mime_map = {
# Audio formats
".mp3": "audio/mpeg",
".wav": "audio/wav",
".ogg": "audio/ogg",
".m4a": "audio/mp4",
".flac": "audio/flac",
# Image formats
".jpg": "image/jpeg",
".jpeg": "image/jpeg",
".png": "image/png",
".gif": "image/gif",
".webp": "image/webp",
".bmp": "image/bmp",
".tiff": "image/tiff",
# Video formats
".mp4": "video/mp4",
".avi": "video/x-msvideo",
".mov": "video/quicktime",
".mkv": "video/x-matroska",
".webm": "video/webm",
}
ext = os.path.splitext(file_path)[1].lower()
return extension_mime_map.get(ext, default_mime or "application/octet-stream")
def is_url(path_or_url: str) -> bool:
"""
Check if the given string is a URL.
Args:
path_or_url: String to check
Returns:
bool: True if the string is a URL, False otherwise
"""
parsed = urlparse(path_or_url)
return bool(parsed.scheme and parsed.netloc)
def get_file_from_source(
source: str,
allowed_mime_prefixes: List[str] = None,
max_size_mb: float = 100.0,
timeout: int = 60,
type: str = "image",
) -> Tuple[str, str, bytes]:
"""
Unified function to get file content from a URL or local path with validation.
Args:
source: URL or local file path
allowed_mime_prefixes: List of allowed MIME type prefixes (e.g., ['audio/', 'video/'])
max_size_mb: Maximum allowed file size in MB
timeout: Timeout for URL requests in seconds
Returns:
Tuple[str, str, bytes]: (file_path, mime_type, file_content)
- For URLs, file_path will be a temporary file path
- For local files, file_path will be the original path
Raises:
ValueError: When file doesn't exist, exceeds size limit, or has invalid MIME type
IOError: When file cannot be read
requests.RequestException: When URL request fails
"""
max_size_bytes = max_size_mb * 1024 * 1024
temp_file = None
try:
if is_url(source):
# Handle URL
logger.info(f"Downloading file from URL: {source}")
response = requests.get(source, stream=True, timeout=timeout)
response.raise_for_status()
# Check Content-Length if available
content_length = response.headers.get("Content-Length")
if content_length and int(content_length) > max_size_bytes:
raise ValueError(f"File size exceeds limit of {max_size_mb}MB")
# Create a temporary file
temp_file = tempfile.NamedTemporaryFile(delete=False)
file_path = temp_file.name
# Download content in chunks to avoid memory issues
content = bytearray()
downloaded_size = 0
for chunk in response.iter_content(chunk_size=8192):
downloaded_size += len(chunk)
if downloaded_size > max_size_bytes:
raise ValueError(f"File size exceeds limit of {max_size_mb}MB")
temp_file.write(chunk)
content.extend(chunk)
temp_file.close()
# Get MIME type
if type == "audio":
mime_type = "audio/mpeg"
elif type == "image":
mime_type = "image/jpeg"
elif type == "video":
mime_type = "video/mp4"
# mime_type = get_mime_type(file_path)
# For URLs where magic fails, try to use Content-Type header
if mime_type == "application/octet-stream":
content_type = response.headers.get("Content-Type", "").split(";")[0]
if content_type:
mime_type = content_type
else:
# Handle local file
file_path = os.path.abspath(source)
# Check if file exists
if not os.path.exists(file_path):
raise ValueError(f"File not found: {file_path}")
# Check file size
file_size = os.path.getsize(file_path)
if file_size > max_size_bytes:
raise ValueError(f"File size exceeds limit of {max_size_mb}MB")
# Get MIME type
if type == "audio":
mime_type = "audio/mpeg"
elif type == "image":
mime_type = "image/jpeg"
elif type == "video":
mime_type = "video/mp4"
# mime_type = get_mime_type(file_path)
# Read file content
with open(file_path, "rb") as f:
content = f.read()
# Validate MIME type if allowed_mime_prefixes is provided
if allowed_mime_prefixes:
if not any(
mime_type.startswith(prefix) for prefix in allowed_mime_prefixes
):
allowed_types = ", ".join(allowed_mime_prefixes)
raise ValueError(
f"Invalid file type: {mime_type}. Allowed types: {allowed_types}"
)
return file_path, mime_type, content
except Exception as e:
# Clean up temporary file if an error occurs
if temp_file and os.path.exists(temp_file.name):
os.unlink(temp_file.name)
raise e
if __name__ == "__main__":
mcp_tools=[]
logger.info(f"{json.dumps(mcp_tools, indent=4, ensure_ascii=False)}")
|