Spaces:
Sleeping
Sleeping
import asyncio | |
import json | |
import logging | |
import os | |
import sys | |
import aiohttp | |
from typing import List, Dict, Any, Optional | |
from dotenv import load_dotenv | |
from mcp.server import FastMCP | |
from pydantic import Field | |
from aworld.logs.util import logger | |
mcp = FastMCP("picsearch-server") | |
async def search_single(query: str, num: int = 5) -> Optional[Dict[str, Any]]: | |
"""Execute a single search query, returns None on error""" | |
try: | |
url = os.getenv('PIC_SEARCH_URL') | |
searchMode = os.getenv('PIC_SEARCH_SEARCHMODE') | |
source = os.getenv('PIC_SEARCH_SOURCE') | |
domain = os.getenv('PIC_SEARCH_DOMAIN') | |
uid = os.getenv('PIC_SEARCH_UID') | |
if not url or not searchMode or not source or not domain: | |
logger.warning(f"Query failed: url, searchMode, source, domain parameters incomplete") | |
return None | |
headers = { | |
'Content-Type': 'application/json' | |
} | |
data = { | |
"domain": domain, | |
"extParams": { | |
"contentType": "llmWholeImage" | |
}, | |
"page": 0, | |
"pageSize": num, | |
"query": query, | |
"searchMode": searchMode, | |
"source": source, | |
"userId": uid | |
} | |
async with aiohttp.ClientSession() as session: | |
try: | |
async with session.post(url, headers=headers, json=data) as response: | |
if response.status != 200: | |
logger.warning(f"Query failed: {query}, status code: {response.status}") | |
return None | |
result = await response.json() | |
return result | |
except aiohttp.ClientError: | |
logger.warning(f"Request error: {query}") | |
return None | |
except Exception: | |
logger.warning(f"Query exception: {query}") | |
return None | |
def filter_valid_docs(result: Optional[Dict[str, Any]]) -> List[Dict[str, Any]]: | |
"""Filter valid document results, returns empty list if input is None""" | |
if result is None: | |
return [] | |
try: | |
valid_docs = [] | |
# Check success field | |
if not result.get("success"): | |
return valid_docs | |
# Check searchDocs field | |
search_docs = result.get("searchImages", []) | |
if not search_docs: | |
return valid_docs | |
# Extract required fields | |
required_fields = ["title", "picUrl"] | |
for doc in search_docs: | |
# Check if all required fields exist and are not empty | |
is_valid = True | |
for field in required_fields: | |
if field not in doc or not doc[field]: | |
is_valid = False | |
break | |
if is_valid: | |
# Keep only required fields | |
filtered_doc = {field: doc[field] for field in required_fields} | |
valid_docs.append(filtered_doc) | |
return valid_docs | |
except Exception: | |
return [] | |
async def search( | |
query: str = Field( | |
description="The query to search for picture" | |
), | |
num: int = Field( | |
5, | |
description="Maximum number of results to return, default is 5" | |
) | |
) -> Any: | |
"""Execute search function for a single query""" | |
try: | |
# Get configuration from environment variables | |
env_total_num = os.getenv('PIC_SEARCH_TOTAL_NUM') | |
if env_total_num and env_total_num.isdigit(): | |
# Force override input num parameter with environment variable | |
num = int(env_total_num) | |
# If no query provided, return empty list | |
if not query: | |
return json.dumps([]) | |
# Get actual number of results to return | |
slice_num = os.getenv('PIC_SEARCH_SLICE_NUM') | |
if slice_num and slice_num.isdigit(): | |
actual_num = int(slice_num) | |
else: | |
actual_num = num | |
# Execute the query | |
result = await search_single(query, actual_num) | |
# Filter results | |
valid_docs = filter_valid_docs(result) | |
# Return results | |
result_json = json.dumps(valid_docs, ensure_ascii=False) | |
logger.info(f"Completed query: '{query}', found {len(valid_docs)} valid documents") | |
logger.info(result_json) | |
return result_json | |
except Exception as e: | |
# Return empty list on exception | |
logger.error(f"Error processing query: {str(e)}") | |
return json.dumps([]) | |
def main(): | |
from dotenv import load_dotenv | |
load_dotenv() | |
print("Starting Audio MCP picsearch-server...", file=sys.stderr) | |
mcp.run(transport="stdio") | |
# Make the module callable | |
def __call__(): | |
""" | |
Make the module callable for uvx. | |
This function is called when the module is executed directly. | |
""" | |
main() | |
sys.modules[__name__].__call__ = __call__ | |
if __name__ == "__main__": | |
main() | |
# if __name__ == "__main__": | |
# # Configure logging | |
# logging.basicConfig( | |
# level=logging.INFO, | |
# format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' | |
# ) | |
# | |
# | |
# # Test single query | |
# asyncio.run(search(query="Image search test")) | |
# | |
# # Test multiple queries no longer applies |