Duibonduil's picture
Upload 17 files
3e11f9b verified
"""
Image MCP Server
This module provides MCP server functionality for image processing and analysis.
It handles image encoding, optimization, and various image analysis tasks such as
OCR (Optical Character Recognition) and visual reasoning.
The server supports both local image files and remote image URLs with proper validation
and handles various image formats including JPEG, PNG, GIF, and others.
Main functions:
- encode_images: Encodes images to base64 format with optimization
- optimize_image: Resizes and optimizes images for better performance
- Various MCP tools for image analysis and processing
"""
# import asyncio
import base64
import os
from io import BytesIO
from typing import Any, Dict, List
from mcp.server.fastmcp import FastMCP
from openai import OpenAI
from PIL import Image
from pydantic import Field
from aworld.logs.util import logger
from mcp_servers.utils import get_file_from_source
# Initialize MCP server
mcp = FastMCP("image-server")
IMAGE_OCR = (
"Input is a base64 encoded image. Read text from image if present. "
"Return a json string with the following format: "
'{"image_text": "text from image"}'
)
IMAGE_REASONING = (
"Input is a base64 encoded image. Given user's task: {task}, "
"solve it following the guide line:\n"
"1. Careful visual inspection\n"
"2. Contextual reasoning\n"
"3. Text transcription where relevant\n"
"4. Logical deduction from visual evidence\n"
"Return a json string with the following format: "
'{"image_reasoning_result": "reasoning result given task and image"}'
)
def optimize_image(image_data: bytes, max_size: int = 1024) -> bytes:
"""
Optimize image by resizing if needed
Args:
image_data: Raw image data
max_size: Maximum dimension size in pixels
Returns:
bytes: Optimized image data
Raises:
ValueError: When image cannot be processed
"""
try:
image = Image.open(BytesIO(image_data))
# Resize if image is too large
if max(image.size) > max_size:
ratio = max_size / max(image.size)
new_size = (int(image.size[0] * ratio), int(image.size[1] * ratio))
image = image.resize(new_size, Image.Resampling.LANCZOS)
# Save to buffer
buffered = BytesIO()
image_format = image.format if image.format else "JPEG"
image.save(buffered, format=image_format)
return buffered.getvalue()
except Exception as e:
logger.warning(f"Failed to optimize image: {str(e)}")
return image_data # Return original data if optimization fails
def encode_images(image_sources: List[str], with_header: bool = True) -> List[str]:
"""
Encode images to base64 format with robust file handling
Args:
image_sources: List of URLs or local file paths of images
with_header: Whether to include MIME type header
Returns:
List[str]: Base64 encoded image strings, with MIME type prefix if with_header is True
Raises:
ValueError: When image source is invalid or image format is not supported
"""
if not image_sources:
raise ValueError("Image sources cannot be empty")
images = []
for image_source in image_sources:
try:
# Get file with validation (only image files allowed)
file_path, mime_type, content = get_file_from_source(
image_source,
allowed_mime_prefixes=["image/"],
max_size_mb=10.0, # 10MB limit for images
type="image",
)
# Optimize image
optimized_content = optimize_image(content)
# Encode to base64
image_base64 = base64.b64encode(optimized_content).decode()
# Format with header if requested
final_image = f"data:{mime_type};base64,{image_base64}" if with_header else image_base64
images.append(final_image)
# Clean up temporary file if it was created for a URL
if file_path != os.path.abspath(image_source) and os.path.exists(file_path):
os.unlink(file_path)
except Exception as e:
logger.error(f"Error encoding image from {image_source}: {str(e)}")
raise
return images
def image_to_base64(image_path):
try:
# 打开图片
with Image.open(image_path) as image:
buffered = BytesIO()
image_format = image.format if image.format else "JPEG"
image.save(buffered, format=image_format)
image_bytes = buffered.getvalue()
base64_encoded = base64.b64encode(image_bytes).decode("utf-8")
return base64_encoded
except Exception as e:
print(f"Base64 error: {e}")
return None
def create_image_contents(prompt: str, image_base64: List[str]) -> List[Dict[str, Any]]:
"""Create uniform image format for querying llm."""
content = [
{"type": "text", "text": prompt},
]
content.extend([{"type": "image_url", "image_url": {"url": url}} for url in image_base64])
return content
@mcp.tool(
description="solve the question by careful reasoning given the image(s) in given filepath or url, including reasoning, ocr, etc."
)
def mcp_image_recognition(
image_urls: List[str] = Field(description="The input image(s) in given a list of filepaths or urls."),
question: str = Field(description="The question to ask."),
) -> str:
"""solve the question by careful reasoning given the image(s) in given filepath or url."""
try:
image_base64 = image_to_base64(image_urls[0])
logger.info(f"image_base64:{image_urls[0]}")
reasoning_prompt = question
messages = [
{"role": "system", "content": "You are a helpful assistant."},
{
"role": "user",
"content": [
{"type": "text", "text": reasoning_prompt},
{
"type": "image_url",
"image_url": {"url": f"data:image/jpeg;base64,{image_base64}"},
},
],
},
]
client = OpenAI(api_key=os.getenv("IMAGE_LLM_API_KEY"), base_url=os.getenv("IMAGE_LLM_BASE_URL"))
response = client.chat.completions.create(
model=os.getenv("IMAGE_LLM_MODEL_NAME"),
messages=messages,
)
logger.info(f"response:{response.choices[0]}")
image_reasoning_result = response.choices[0].message.content
except Exception as e:
image_reasoning_result = ""
import traceback
traceback.print_exc()
logger.error(f"image_reasoning_result-Execute error: {e}")
logger.info(f"---get_reasoning_by_image-image_reasoning_result:{image_reasoning_result}")
return image_reasoning_result
def main():
from dotenv import load_dotenv
load_dotenv()
print("Starting Image MCP Server...", file=sys.stderr)
mcp.run(transport="stdio")
# Make the module callable
def __call__():
"""
Make the module callable for uvx.
This function is called when the module is executed directly.
"""
main()
# Add this for compatibility with uvx
import sys
sys.modules[__name__].__call__ = __call__
# Run the server when the script is executed directly
if __name__ == "__main__":
main()