File size: 6,421 Bytes
3e11f9b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
import asyncio
import json
import os
import tempfile
from typing import List, Optional, Tuple
from urllib.parse import urlparse

import requests
from mcp.server import FastMCP

from aworld.logs.util import logger


def get_mime_type(file_path: str, default_mime: Optional[str] = None) -> str:
    """
    Detect MIME type of a file using python-magic if available,
    otherwise fallback to extension-based detection.

    Args:
        file_path: Path to the file
        default_mime: Default MIME type to return if detection fails

    Returns:
        str: Detected MIME type
    """
    # Try using python-magic for accurate MIME type detection
    try:
        # mime = magic.Magic(mime=True)
        # return mime.from_file(file_path)
        return "audio/mpeg"
    except (AttributeError, IOError):
        # Fallback to extension-based detection
        extension_mime_map = {
            # Audio formats
            ".mp3": "audio/mpeg",
            ".wav": "audio/wav",
            ".ogg": "audio/ogg",
            ".m4a": "audio/mp4",
            ".flac": "audio/flac",
            # Image formats
            ".jpg": "image/jpeg",
            ".jpeg": "image/jpeg",
            ".png": "image/png",
            ".gif": "image/gif",
            ".webp": "image/webp",
            ".bmp": "image/bmp",
            ".tiff": "image/tiff",
            # Video formats
            ".mp4": "video/mp4",
            ".avi": "video/x-msvideo",
            ".mov": "video/quicktime",
            ".mkv": "video/x-matroska",
            ".webm": "video/webm",
        }

        ext = os.path.splitext(file_path)[1].lower()
        return extension_mime_map.get(ext, default_mime or "application/octet-stream")


def is_url(path_or_url: str) -> bool:
    """
    Check if the given string is a URL.

    Args:
        path_or_url: String to check

    Returns:
        bool: True if the string is a URL, False otherwise
    """
    parsed = urlparse(path_or_url)
    return bool(parsed.scheme and parsed.netloc)


def get_file_from_source(
    source: str,
    allowed_mime_prefixes: List[str] = None,
    max_size_mb: float = 100.0,
    timeout: int = 60,
    type: str = "image",
) -> Tuple[str, str, bytes]:
    """
    Unified function to get file content from a URL or local path with validation.

    Args:
        source: URL or local file path
        allowed_mime_prefixes: List of allowed MIME type prefixes (e.g., ['audio/', 'video/'])
        max_size_mb: Maximum allowed file size in MB
        timeout: Timeout for URL requests in seconds

    Returns:
        Tuple[str, str, bytes]: (file_path, mime_type, file_content)
        - For URLs, file_path will be a temporary file path
        - For local files, file_path will be the original path

    Raises:
        ValueError: When file doesn't exist, exceeds size limit, or has invalid MIME type
        IOError: When file cannot be read
        requests.RequestException: When URL request fails
    """
    max_size_bytes = max_size_mb * 1024 * 1024
    temp_file = None

    try:
        if is_url(source):
            # Handle URL
            logger.info(f"Downloading file from URL: {source}")
            response = requests.get(source, stream=True, timeout=timeout)
            response.raise_for_status()

            # Check Content-Length if available
            content_length = response.headers.get("Content-Length")
            if content_length and int(content_length) > max_size_bytes:
                raise ValueError(f"File size exceeds limit of {max_size_mb}MB")

            # Create a temporary file
            temp_file = tempfile.NamedTemporaryFile(delete=False)
            file_path = temp_file.name

            # Download content in chunks to avoid memory issues
            content = bytearray()
            downloaded_size = 0
            for chunk in response.iter_content(chunk_size=8192):
                downloaded_size += len(chunk)
                if downloaded_size > max_size_bytes:
                    raise ValueError(f"File size exceeds limit of {max_size_mb}MB")
                temp_file.write(chunk)
                content.extend(chunk)

            temp_file.close()

            # Get MIME type
            if type == "audio":
                mime_type = "audio/mpeg"
            elif type == "image":
                mime_type = "image/jpeg"
            elif type == "video":
                mime_type = "video/mp4"

            # mime_type = get_mime_type(file_path)

            # For URLs where magic fails, try to use Content-Type header
            if mime_type == "application/octet-stream":
                content_type = response.headers.get("Content-Type", "").split(";")[0]
                if content_type:
                    mime_type = content_type
        else:
            # Handle local file
            file_path = os.path.abspath(source)

            # Check if file exists
            if not os.path.exists(file_path):
                raise ValueError(f"File not found: {file_path}")

            # Check file size
            file_size = os.path.getsize(file_path)
            if file_size > max_size_bytes:
                raise ValueError(f"File size exceeds limit of {max_size_mb}MB")

            # Get MIME type
            if type == "audio":
                mime_type = "audio/mpeg"
            elif type == "image":
                mime_type = "image/jpeg"
            elif type == "video":
                mime_type = "video/mp4"
            # mime_type = get_mime_type(file_path)

            # Read file content
            with open(file_path, "rb") as f:
                content = f.read()

        # Validate MIME type if allowed_mime_prefixes is provided
        if allowed_mime_prefixes:
            if not any(
                mime_type.startswith(prefix) for prefix in allowed_mime_prefixes
            ):
                allowed_types = ", ".join(allowed_mime_prefixes)
                raise ValueError(
                    f"Invalid file type: {mime_type}. Allowed types: {allowed_types}"
                )

        return file_path, mime_type, content

    except Exception as e:
        # Clean up temporary file if an error occurs
        if temp_file and os.path.exists(temp_file.name):
            os.unlink(temp_file.name)
        raise e


if __name__ == "__main__":
    mcp_tools=[]
    logger.info(f"{json.dumps(mcp_tools, indent=4, ensure_ascii=False)}")