File size: 4,776 Bytes
3e11f9b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
"""
Search MCP Server

This module provides MCP server functionality for performing web searches using various search engines.
It supports structured queries and returns formatted search results.

Key features:
- Perform web searches using Exa, Google, and DuckDuckGo
- Filter and format search results
- Validate and process search queries

Main functions:
- mcpsearchexa: Searches the web using Exa
- mcpsearchgoogle: Searches the web using Google
- mcpsearchduckduckgo: Searches the web using DuckDuckGo
"""

import os
import sys
import traceback
from typing import List, Optional

import requests
from dotenv import load_dotenv
from mcp.server.fastmcp import FastMCP
from pydantic import BaseModel, Field

from aworld.logs.util import logger

# Initialize MCP server
mcp = FastMCP("search-server")


# Base search result model that all providers will use
class SearchResult(BaseModel):
    """Base search result model with common fields"""

    id: str
    title: str
    url: str
    snippet: str
    source: str  # Which search engine provided this result


class GoogleSearchResult(SearchResult):
    """Google-specific search result model"""

    displayLink: str = ""
    formattedUrl: str = ""
    htmlSnippet: str = ""
    htmlTitle: str = ""
    kind: str = ""
    link: str = ""


class SearchResponse(BaseModel):
    """Unified search response model"""

    query: str
    results: List[SearchResult]
    count: int
    source: str
    error: Optional[str] = None


@mcp.tool(description="Search the web using Google Custom Search API.")
def mcpsearchgoogle(
    query: str = Field(..., description="The search query string."),
    num_results: int = Field(
        10, description="Number of search results to return (default 10)."
    ),
    safe_search: bool = Field(
        True, description="Whether to enable safe search filtering."
    ),
    language: str = Field("en", description="Language code for search results."),
    country: str = Field("us", description="Country code for search results."),
) -> str:
    """
    Search the web using Google Custom Search API.

    Requires GOOGLE_API_KEY and GOOGLE_CSE_ID environment variables to be set.
    """
    try:
        api_key = os.environ.get("GOOGLE_API_KEY")
        cse_id = os.environ.get("GOOGLE_CSE_ID")

        if not api_key:
            raise ValueError("GOOGLE_API_KEY environment variable not set")
        if not cse_id:
            raise ValueError("GOOGLE_CSE_ID environment variable not set")

        # Ensure num_results is within valid range
        num_results = max(1, num_results)

        # Build the Google Custom Search API URL
        url = "https://www.googleapis.com/customsearch/v1"
        params = {
            "key": api_key,
            "cx": cse_id,
            "q": query,
            "num": num_results,
            "safe": "active" if safe_search else "off",
            "hl": language,
            "gl": country,
        }

        logger.info(f"Google search starts for query: {query}")
        response = requests.get(url, params=params, timeout=10)
        response.raise_for_status()

        data = response.json()
        search_results = []

        if "items" in data:
            for i, item in enumerate(data["items"]):
                result = GoogleSearchResult(
                    id=f"google-{i}",
                    title=item.get("title", ""),
                    url=item.get("link", ""),
                    snippet=item.get("snippet", ""),
                    source="google",
                    displayLink=item.get("displayLink", ""),
                    formattedUrl=item.get("formattedUrl", ""),
                    htmlSnippet=item.get("htmlSnippet", ""),
                    htmlTitle=item.get("htmlTitle", ""),
                    kind=item.get("kind", ""),
                    link=item.get("link", ""),
                )
                search_results.append(result)

        return SearchResponse(
            query=query,
            results=search_results,
            count=len(search_results),
            source="google",
        ).model_dump_json()

    except Exception as e:
        logger.error(f"Google search error: {traceback.format_exc()}")
        return SearchResponse(
            query=query, results=[], count=0, source="google", error=str(e)
        ).model_dump_json()


def main():
    load_dotenv()

    print("Starting Search MCP Server...", file=sys.stderr)
    mcp.run(transport="stdio")


# Make the module callable
def __call__():
    """
    Make the module callable for uvx.
    This function is called when the module is executed directly.
    """
    main()


sys.modules[__name__].__call__ = __call__

# Run the server when the script is executed directly
if __name__ == "__main__":
    main()