File size: 7,420 Bytes
3e11f9b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
"""
Image MCP Server

This module provides MCP server functionality for image processing and analysis.
It handles image encoding, optimization, and various image analysis tasks such as
OCR (Optical Character Recognition) and visual reasoning.

The server supports both local image files and remote image URLs with proper validation
and handles various image formats including JPEG, PNG, GIF, and others.

Main functions:
- encode_images: Encodes images to base64 format with optimization
- optimize_image: Resizes and optimizes images for better performance
- Various MCP tools for image analysis and processing
"""

# import asyncio
import base64
import os
from io import BytesIO
from typing import Any, Dict, List

from mcp.server.fastmcp import FastMCP
from openai import OpenAI
from PIL import Image
from pydantic import Field

from aworld.logs.util import logger
from mcp_servers.utils import get_file_from_source

# Initialize MCP server
mcp = FastMCP("image-server")


IMAGE_OCR = (
    "Input is a base64 encoded image. Read text from image if present. "
    "Return a json string with the following format: "
    '{"image_text": "text from image"}'
)

IMAGE_REASONING = (
    "Input is a base64 encoded image. Given user's task: {task}, "
    "solve it following the guide line:\n"
    "1. Careful visual inspection\n"
    "2. Contextual reasoning\n"
    "3. Text transcription where relevant\n"
    "4. Logical deduction from visual evidence\n"
    "Return a json string with the following format: "
    '{"image_reasoning_result": "reasoning result given task and image"}'
)


def optimize_image(image_data: bytes, max_size: int = 1024) -> bytes:
    """
    Optimize image by resizing if needed

    Args:
        image_data: Raw image data
        max_size: Maximum dimension size in pixels

    Returns:
        bytes: Optimized image data

    Raises:
        ValueError: When image cannot be processed
    """
    try:
        image = Image.open(BytesIO(image_data))

        # Resize if image is too large
        if max(image.size) > max_size:
            ratio = max_size / max(image.size)
            new_size = (int(image.size[0] * ratio), int(image.size[1] * ratio))
            image = image.resize(new_size, Image.Resampling.LANCZOS)

        # Save to buffer
        buffered = BytesIO()
        image_format = image.format if image.format else "JPEG"
        image.save(buffered, format=image_format)
        return buffered.getvalue()

    except Exception as e:
        logger.warning(f"Failed to optimize image: {str(e)}")
        return image_data  # Return original data if optimization fails


def encode_images(image_sources: List[str], with_header: bool = True) -> List[str]:
    """
    Encode images to base64 format with robust file handling

    Args:
        image_sources: List of URLs or local file paths of images
        with_header: Whether to include MIME type header

    Returns:
        List[str]: Base64 encoded image strings, with MIME type prefix if with_header is True

    Raises:
        ValueError: When image source is invalid or image format is not supported
    """
    if not image_sources:
        raise ValueError("Image sources cannot be empty")

    images = []
    for image_source in image_sources:
        try:
            # Get file with validation (only image files allowed)
            file_path, mime_type, content = get_file_from_source(
                image_source,
                allowed_mime_prefixes=["image/"],
                max_size_mb=10.0,  # 10MB limit for images
                type="image",
            )

            # Optimize image
            optimized_content = optimize_image(content)

            # Encode to base64
            image_base64 = base64.b64encode(optimized_content).decode()

            # Format with header if requested
            final_image = f"data:{mime_type};base64,{image_base64}" if with_header else image_base64

            images.append(final_image)

            # Clean up temporary file if it was created for a URL
            if file_path != os.path.abspath(image_source) and os.path.exists(file_path):
                os.unlink(file_path)

        except Exception as e:
            logger.error(f"Error encoding image from {image_source}: {str(e)}")
            raise

    return images


def image_to_base64(image_path):
    try:
        # 打开图片
        with Image.open(image_path) as image:
            buffered = BytesIO()
            image_format = image.format if image.format else "JPEG"
            image.save(buffered, format=image_format)
            image_bytes = buffered.getvalue()
            base64_encoded = base64.b64encode(image_bytes).decode("utf-8")
            return base64_encoded
    except Exception as e:
        print(f"Base64 error: {e}")
        return None


def create_image_contents(prompt: str, image_base64: List[str]) -> List[Dict[str, Any]]:
    """Create uniform image format for querying llm."""
    content = [
        {"type": "text", "text": prompt},
    ]
    content.extend([{"type": "image_url", "image_url": {"url": url}} for url in image_base64])
    return content


@mcp.tool(
    description="solve the question by careful reasoning given the image(s) in given filepath or url, including reasoning, ocr, etc."
)
def mcp_image_recognition(
    image_urls: List[str] = Field(description="The input image(s) in given a list of filepaths or urls."),
    question: str = Field(description="The question to ask."),
) -> str:
    """solve the question by careful reasoning given the image(s) in given filepath or url."""

    try:
        image_base64 = image_to_base64(image_urls[0])
        logger.info(f"image_base64:{image_urls[0]}")
        reasoning_prompt = question
        messages = [
            {"role": "system", "content": "You are a helpful assistant."},
            {
                "role": "user",
                "content": [
                    {"type": "text", "text": reasoning_prompt},
                    {
                        "type": "image_url",
                        "image_url": {"url": f"data:image/jpeg;base64,{image_base64}"},
                    },
                ],
            },
        ]

        client = OpenAI(api_key=os.getenv("IMAGE_LLM_API_KEY"), base_url=os.getenv("IMAGE_LLM_BASE_URL"))
        response = client.chat.completions.create(
            model=os.getenv("IMAGE_LLM_MODEL_NAME"),
            messages=messages,
        )

        logger.info(f"response:{response.choices[0]}")
        image_reasoning_result = response.choices[0].message.content

    except Exception as e:
        image_reasoning_result = ""
        import traceback

        traceback.print_exc()
        logger.error(f"image_reasoning_result-Execute error: {e}")

    logger.info(f"---get_reasoning_by_image-image_reasoning_result:{image_reasoning_result}")

    return image_reasoning_result


def main():
    from dotenv import load_dotenv

    load_dotenv()

    print("Starting Image MCP Server...", file=sys.stderr)
    mcp.run(transport="stdio")


# Make the module callable
def __call__():
    """
    Make the module callable for uvx.
    This function is called when the module is executed directly.
    """
    main()


# Add this for compatibility with uvx
import sys

sys.modules[__name__].__call__ = __call__

# Run the server when the script is executed directly
if __name__ == "__main__":
    main()