Spaces:
Sleeping
Sleeping
File size: 5,427 Bytes
3e11f9b |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 |
import asyncio
import json
import logging
import os
import sys
import aiohttp
from typing import List, Dict, Any, Optional
from dotenv import load_dotenv
from mcp.server import FastMCP
from pydantic import Field
from aworld.logs.util import logger
mcp = FastMCP("picsearch-server")
async def search_single(query: str, num: int = 5) -> Optional[Dict[str, Any]]:
"""Execute a single search query, returns None on error"""
try:
url = os.getenv('PIC_SEARCH_URL')
searchMode = os.getenv('PIC_SEARCH_SEARCHMODE')
source = os.getenv('PIC_SEARCH_SOURCE')
domain = os.getenv('PIC_SEARCH_DOMAIN')
uid = os.getenv('PIC_SEARCH_UID')
if not url or not searchMode or not source or not domain:
logger.warning(f"Query failed: url, searchMode, source, domain parameters incomplete")
return None
headers = {
'Content-Type': 'application/json'
}
data = {
"domain": domain,
"extParams": {
"contentType": "llmWholeImage"
},
"page": 0,
"pageSize": num,
"query": query,
"searchMode": searchMode,
"source": source,
"userId": uid
}
async with aiohttp.ClientSession() as session:
try:
async with session.post(url, headers=headers, json=data) as response:
if response.status != 200:
logger.warning(f"Query failed: {query}, status code: {response.status}")
return None
result = await response.json()
return result
except aiohttp.ClientError:
logger.warning(f"Request error: {query}")
return None
except Exception:
logger.warning(f"Query exception: {query}")
return None
def filter_valid_docs(result: Optional[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Filter valid document results, returns empty list if input is None"""
if result is None:
return []
try:
valid_docs = []
# Check success field
if not result.get("success"):
return valid_docs
# Check searchDocs field
search_docs = result.get("searchImages", [])
if not search_docs:
return valid_docs
# Extract required fields
required_fields = ["title", "picUrl"]
for doc in search_docs:
# Check if all required fields exist and are not empty
is_valid = True
for field in required_fields:
if field not in doc or not doc[field]:
is_valid = False
break
if is_valid:
# Keep only required fields
filtered_doc = {field: doc[field] for field in required_fields}
valid_docs.append(filtered_doc)
return valid_docs
except Exception:
return []
@mcp.tool(description="Search Picture based on the user's input query")
async def search(
query: str = Field(
description="The query to search for picture"
),
num: int = Field(
5,
description="Maximum number of results to return, default is 5"
)
) -> Any:
"""Execute search function for a single query"""
try:
# Get configuration from environment variables
env_total_num = os.getenv('PIC_SEARCH_TOTAL_NUM')
if env_total_num and env_total_num.isdigit():
# Force override input num parameter with environment variable
num = int(env_total_num)
# If no query provided, return empty list
if not query:
return json.dumps([])
# Get actual number of results to return
slice_num = os.getenv('PIC_SEARCH_SLICE_NUM')
if slice_num and slice_num.isdigit():
actual_num = int(slice_num)
else:
actual_num = num
# Execute the query
result = await search_single(query, actual_num)
# Filter results
valid_docs = filter_valid_docs(result)
# Return results
result_json = json.dumps(valid_docs, ensure_ascii=False)
logger.info(f"Completed query: '{query}', found {len(valid_docs)} valid documents")
logger.info(result_json)
return result_json
except Exception as e:
# Return empty list on exception
logger.error(f"Error processing query: {str(e)}")
return json.dumps([])
def main():
from dotenv import load_dotenv
load_dotenv()
print("Starting Audio MCP picsearch-server...", file=sys.stderr)
mcp.run(transport="stdio")
# Make the module callable
def __call__():
"""
Make the module callable for uvx.
This function is called when the module is executed directly.
"""
main()
sys.modules[__name__].__call__ = __call__
if __name__ == "__main__":
main()
# if __name__ == "__main__":
# # Configure logging
# logging.basicConfig(
# level=logging.INFO,
# format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
# )
#
#
# # Test single query
# asyncio.run(search(query="Image search test"))
#
# # Test multiple queries no longer applies |