import os
import time
import re
import html
import asyncio
import ipaddress
import socket
from typing import Optional, Dict, Any, List, Tuple
import fnmatch
from urllib.parse import urlsplit
from datetime import datetime, timezone
import httpx
import trafilatura
import gradio as gr
from dateutil import parser as dateparser
from limits import parse
from limits.aio.storage import MemoryStorage
from limits.aio.strategies import MovingWindowRateLimiter
from analytics import record_request, last_n_days_count_df
# ──────────────────────────────────────────────────────────────────────────────
# Configuration
# ──────────────────────────────────────────────────────────────────────────────
SERPER_API_KEY = os.getenv("SERPER_API_KEY")
SERPER_SEARCH_ENDPOINT = "https://google.serper.dev/search"
SERPER_NEWS_ENDPOINT = "https://google.serper.dev/news"
HEADERS = {"X-API-KEY": SERPER_API_KEY or "", "Content-Type": "application/json"}
# HTTP clients with connection pooling
SERPER_TIMEOUT = httpx.Timeout(connect=5.0, read=10.0, write=5.0, pool=5.0)
WEB_TIMEOUT = httpx.Timeout(connect=5.0, read=20.0, write=5.0, pool=5.0)
SERPER_LIMITS = httpx.Limits(
max_keepalive_connections=int(os.getenv("SERPER_KEEPALIVE", "32")),
max_connections=int(os.getenv("SERPER_MAX_CONNECTIONS", "128")),
)
WEB_LIMITS = httpx.Limits(
max_keepalive_connections=int(os.getenv("WEB_KEEPALIVE", "128")),
max_connections=int(os.getenv("WEB_MAX_CONNECTIONS", "512")),
)
serper_client = httpx.AsyncClient(
timeout=SERPER_TIMEOUT,
limits=SERPER_LIMITS,
http2=True,
headers=HEADERS,
)
DEFAULT_USER_AGENT = os.getenv(
"FETCH_USER_AGENT",
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/124.0 Safari/537.36",
)
web_client = httpx.AsyncClient(
timeout=WEB_TIMEOUT,
limits=WEB_LIMITS,
http2=True,
follow_redirects=True,
headers={"User-Agent": DEFAULT_USER_AGENT},
)
# Rate limiting (shared by both tools, process-local)
GLOBAL_RATE = parse(os.getenv("GLOBAL_RATE", "3000/minute"))
PER_IP_RATE = parse(os.getenv("PER_IP_RATE", "60/minute"))
storage = MemoryStorage()
limiter = MovingWindowRateLimiter(storage)
# Concurrency controls & resource caps
FETCH_MAX_BYTES = max(1024, int(os.getenv("FETCH_MAX_BYTES", "1500000")))
FETCH_CONCURRENCY = max(1, int(os.getenv("FETCH_CONCURRENCY", "64")))
SEARCH_CONCURRENCY = max(1, int(os.getenv("SEARCH_CONCURRENCY", "64")))
EXTRACT_CONCURRENCY = max(
1,
int(
os.getenv(
"EXTRACT_CONCURRENCY",
str(max(4, (os.cpu_count() or 2) * 2)),
)
),
)
SEARCH_CACHE_TTL = max(0, int(os.getenv("SEARCH_CACHE_TTL", "30")))
FETCH_CACHE_TTL = max(0, int(os.getenv("FETCH_CACHE_TTL", "300")))
# Controls for private/local address handling in fetch()
def _env_flag(name: str, default: bool = False) -> bool:
"""Parse boolean-like env vars such as 1/true/yes/on."""
v = os.getenv(name)
if v is None:
return default
return str(v).strip().lower() in {"1", "true", "yes", "on", "y"}
# When True, allow any destination (disables SSRF guard — not recommended)
FETCH_ALLOW_PRIVATE = _env_flag("FETCH_ALLOW_PRIVATE", False)
# Optional comma/space separated host patterns to allow even if private, e.g.:
# FETCH_PRIVATE_ALLOWLIST="*.internal.example.com, my-proxy.local"
FETCH_PRIVATE_ALLOWLIST = [
p for p in re.split(r"[\s,]+", os.getenv("FETCH_PRIVATE_ALLOWLIST", "").strip()) if p
]
_search_cache: Dict[Tuple[str, str, int], Dict[str, Any]] = {}
_fetch_cache: Dict[str, Dict[str, Any]] = {}
_search_cache_lock: Optional[asyncio.Lock] = None
_fetch_cache_lock: Optional[asyncio.Lock] = None
_search_sema: Optional[asyncio.Semaphore] = None
_fetch_sema: Optional[asyncio.Semaphore] = None
_extract_sema: Optional[asyncio.Semaphore] = None
# ──────────────────────────────────────────────────────────────────────────────
# Helpers
# ──────────────────────────────────────────────────────────────────────────────
def _get_cache_lock(name: str) -> asyncio.Lock:
global _search_cache_lock, _fetch_cache_lock
if name == "search":
if _search_cache_lock is None:
_search_cache_lock = asyncio.Lock()
return _search_cache_lock
if name == "fetch":
if _fetch_cache_lock is None:
_fetch_cache_lock = asyncio.Lock()
return _fetch_cache_lock
raise ValueError(f"Unknown cache lock: {name}")
def _get_semaphore(name: str) -> asyncio.Semaphore:
global _search_sema, _fetch_sema, _extract_sema
if name == "search":
if _search_sema is None:
_search_sema = asyncio.Semaphore(SEARCH_CONCURRENCY)
return _search_sema
if name == "fetch":
if _fetch_sema is None:
_fetch_sema = asyncio.Semaphore(FETCH_CONCURRENCY)
return _fetch_sema
if name == "extract":
if _extract_sema is None:
_extract_sema = asyncio.Semaphore(EXTRACT_CONCURRENCY)
return _extract_sema
raise ValueError(f"Unknown semaphore: {name}")
async def _cache_get(name: str, cache: Dict[Any, Any], key: Any):
lock = _get_cache_lock(name)
async with lock:
entry = cache.get(key)
if not entry:
return None
if time.time() > entry["expires_at"]:
cache.pop(key, None)
return None
return entry["value"]
async def _cache_set(name: str, cache: Dict[Any, Any], key: Any, value: Any, ttl: int):
if ttl <= 0:
return
lock = _get_cache_lock(name)
async with lock:
cache[key] = {"expires_at": time.time() + ttl, "value": value}
def _client_ip(request: Optional[gr.Request]) -> str:
try:
if request is None:
return "unknown"
headers = getattr(request, "headers", None) or {}
xff = headers.get("x-forwarded-for")
if xff:
return xff.split(",")[0].strip()
client = getattr(request, "client", None)
if client and getattr(client, "host", None):
return client.host
except Exception:
pass
return "unknown"
def _host_matches_allowlist(host: str) -> bool:
"""Return True if host matches any pattern in FETCH_PRIVATE_ALLOWLIST."""
if not host:
return False
for pat in FETCH_PRIVATE_ALLOWLIST:
# Support bare host equality and fnmatch-style patterns (*.foo.bar)
if host == pat or fnmatch.fnmatch(host, pat):
return True
return False
async def _resolve_addresses(host: str) -> List[str]:
def _resolve() -> List[str]:
try:
return list({ai[4][0] for ai in socket.getaddrinfo(host, None)})
except Exception:
return []
return await asyncio.to_thread(_resolve)
async def _host_is_public(host: str) -> Tuple[bool, List[str]]:
"""Return (is_public, resolved_addresses).
- If resolution fails, treat as public and let HTTP request decide.
- Honors allowlist/env flags via the caller.
"""
if not host:
return False, []
addresses = await _resolve_addresses(host)
if not addresses:
return True, []
for addr in addresses:
ip_obj = ipaddress.ip_address(addr)
if (
ip_obj.is_private
or ip_obj.is_loopback
or ip_obj.is_link_local
or ip_obj.is_multicast
or ip_obj.is_reserved
or ip_obj.is_unspecified
):
return False, addresses
return True, addresses
async def _check_rate_limits(bucket: str, ip: str) -> Optional[str]:
if not await limiter.hit(GLOBAL_RATE, "global"):
return f"Global rate limit exceeded. Limit: {GLOBAL_RATE}."
if ip != "unknown":
if not await limiter.hit(PER_IP_RATE, f"{bucket}:{ip}"):
return f"Per-IP rate limit exceeded. Limit: {PER_IP_RATE}."
return None
def _domain_from_url(url: str) -> str:
try:
netloc = urlsplit(url).netloc
return netloc.replace("www.", "")
except Exception:
return ""
def _iso_date_or_unknown(date_str: Optional[str]) -> Optional[str]:
if not date_str:
return None
try:
return dateparser.parse(date_str, fuzzy=True).strftime("%Y-%m-%d")
except Exception:
return None
def _extract_title_from_html(html_text: str) -> Optional[str]:
m = re.search(r"
]*>(.*?)", html_text, re.IGNORECASE | re.DOTALL)
if not m:
return None
title = re.sub(r"\s+", " ", m.group(1)).strip()
return html.unescape(title) if title else None
# ──────────────────────────────────────────────────────────────────────────────
# Tool: search (metadata only)
# ──────────────────────────────────────────────────────────────────────────────
async def search(
query: str,
search_type: str = "search",
num_results: Optional[int] = 4,
request: Optional[gr.Request] = None,
) -> Dict[str, Any]:
"""Perform a web or news search via Serper and return metadata only."""
start_time = time.time()
if not query or not query.strip():
await record_request("search")
return {"error": "Missing 'query'. Please provide a search query string."}
query = query.strip()
if num_results is None:
num_results = 4
try:
num_results = max(1, min(20, int(num_results)))
except (TypeError, ValueError):
num_results = 4
if search_type not in ["search", "news"]:
search_type = "search"
if not SERPER_API_KEY:
await record_request("search")
return {
"error": "SERPER_API_KEY is not set. Export SERPER_API_KEY and try again."
}
ip = _client_ip(request)
try:
rl_message = await _check_rate_limits("search", ip)
if rl_message:
await record_request("search")
return {"error": rl_message}
cache_key = (query, search_type, num_results)
cached = await _cache_get("search", _search_cache, cache_key)
if cached:
await record_request("search")
return cached
endpoint = (
SERPER_NEWS_ENDPOINT if search_type == "news" else SERPER_SEARCH_ENDPOINT
)
payload: Dict[str, Any] = {"q": query, "num": num_results}
if search_type == "news":
payload["type"] = "news"
payload["page"] = 1
semaphore = _get_semaphore("search")
await semaphore.acquire()
try:
resp = await serper_client.post(endpoint, json=payload)
finally:
semaphore.release()
if resp.status_code != 200:
await record_request("search")
return {
"error": f"Search API returned status {resp.status_code}. Check your API key and query."
}
data = resp.json()
raw_results: List[Dict[str, Any]] = (
data.get("news", []) if search_type == "news" else data.get("organic", [])
)
formatted: List[Dict[str, Any]] = []
for idx, item in enumerate(raw_results[:num_results], start=1):
entry = {
"position": idx,
"title": item.get("title"),
"link": item.get("link"),
"domain": _domain_from_url(item.get("link", "")),
"snippet": item.get("snippet") or item.get("description"),
}
if search_type == "news":
entry["source"] = item.get("source")
entry["date"] = _iso_date_or_unknown(item.get("date"))
formatted.append(entry)
result = {
"query": query,
"search_type": search_type,
"count": len(formatted),
"results": formatted,
"duration_s": round(time.time() - start_time, 2),
}
if not formatted:
result["message"] = f"No {search_type} results found."
await _cache_set("search", _search_cache, cache_key, result, SEARCH_CACHE_TTL)
await record_request("search")
return result
except Exception as e:
await record_request("search")
return {"error": f"Search failed: {str(e)}"}
# ──────────────────────────────────────────────────────────────────────────────
# Tool: fetch (single URL fetch + extraction)
# ──────────────────────────────────────────────────────────────────────────────
async def fetch(
url: str,
timeout: int = 20,
request: Optional[gr.Request] = None,
) -> Dict[str, Any]:
"""Fetch a single URL and extract the main readable content."""
start_time = time.time()
if not url or not isinstance(url, str):
await record_request("fetch")
return {"error": "Missing 'url'. Please provide a valid URL string."}
if not url.lower().startswith(("http://", "https://")):
await record_request("fetch")
return {"error": "URL must start with http:// or https://."}
try:
timeout = max(5, min(60, int(timeout)))
except (TypeError, ValueError):
timeout = 20
ip = _client_ip(request)
try:
host = urlsplit(url).hostname or ""
if not host:
await record_request("fetch")
return {"error": "Invalid URL; unable to determine host."}
rl_message = await _check_rate_limits("fetch", ip)
if rl_message:
await record_request("fetch")
return {"error": rl_message}
cache_key = (url, timeout)
cached = await _cache_get("fetch", _fetch_cache, cache_key)
if cached:
await record_request("fetch")
return cached
is_public, addrs = await _host_is_public(host)
if not is_public and not (FETCH_ALLOW_PRIVATE or _host_matches_allowlist(host)):
await record_request("fetch")
detail = f" (resolved: {', '.join(addrs)})" if addrs else ""
return {
"error": "Refusing to fetch private or local addresses." + detail,
"host": host,
}
fetch_sema = _get_semaphore("fetch")
await fetch_sema.acquire()
try:
async with web_client.stream("GET", url, timeout=timeout) as resp:
status_code = resp.status_code
total = 0
chunks: List[bytes] = []
async for chunk in resp.aiter_bytes():
total += len(chunk)
if total > FETCH_MAX_BYTES:
break
chunks.append(chunk)
body = b"".join(chunks)
final_url_str = str(resp.url)
encoding = resp.encoding or "utf-8"
finally:
fetch_sema.release()
truncated = total > FETCH_MAX_BYTES
# Extra guard: if final URL host ended up private due to a redirect and
# the user hasn't allowed private hosts, refuse to return body content.
try:
final_host = urlsplit(final_url_str).hostname or ""
except Exception:
final_host = ""
if final_host and not (FETCH_ALLOW_PRIVATE or _host_matches_allowlist(final_host)):
final_public, _ = await _host_is_public(final_host)
if not final_public:
await record_request("fetch")
return {
"error": "Refusing to fetch private or local addresses after redirect.",
"host": final_host,
}
text = body.decode(encoding, errors="ignore")
extract_sema = _get_semaphore("extract")
await extract_sema.acquire()
try:
content = await asyncio.to_thread(
trafilatura.extract,
text,
include_formatting=False,
include_comments=False,
)
finally:
extract_sema.release()
content = (content or "").strip()
title = _extract_title_from_html(text) or ""
domain = _domain_from_url(final_url_str)
word_count = len(content.split()) if content else 0
result = {
"url": url,
"final_url": final_url_str,
"domain": domain,
"status_code": status_code,
"title": title,
"fetched_at": datetime.now(timezone.utc).isoformat(),
"word_count": word_count,
"content": content,
"duration_s": round(time.time() - start_time, 2),
}
if truncated:
result["truncated"] = True
await _cache_set("fetch", _fetch_cache, cache_key, result, FETCH_CACHE_TTL)
await record_request("fetch")
return result
except httpx.HTTPError as e:
await record_request("fetch")
return {"error": f"Network error while fetching: {str(e)}"}
except Exception as e:
await record_request("fetch")
return {"error": f"Unexpected error while fetching: {str(e)}"}
# ──────────────────────────────────────────────────────────────────────────────
# Gradio UI
# ──────────────────────────────────────────────────────────────────────────────
with gr.Blocks(title="Web MCP Server") as demo:
gr.HTML(
"""
🤝 Community resource — please use responsibly to keep this service available for everyone
"""
)
gr.Markdown("# 🔍 Web Search MCP Server")
gr.Markdown(
"This server provides two composable MCP tools: **search** (metadata only) and **fetch** (single-URL extraction)."
)
with gr.Tabs():
with gr.Tab("App"):
with gr.Row():
# ── Search panel ───────────────────────────────────────────────
with gr.Column(scale=3):
gr.Markdown("## Search (metadata only)")
query_input = gr.Textbox(
label="Search Query",
placeholder='e.g. "OpenAI news", "climate change 2024", "React hooks useState"',
info="Required",
)
search_type_input = gr.Radio(
choices=["search", "news"],
value="search",
label="Search Type",
info="Choose general web search or news",
)
num_results_input = gr.Slider(
minimum=1,
maximum=20,
value=4,
step=1,
label="Number of Results",
info="Optional (default 4)",
)
search_button = gr.Button("Run Search", variant="primary")
search_output = gr.JSON(
label="Search Results (metadata only)",
)
gr.Examples(
examples=[
["OpenAI GPT-5 latest developments", "news", 5],
["React hooks useState", "search", 4],
["Apple Vision Pro reviews", "search", 4],
["Tesla stock price today", "news", 6],
],
inputs=[query_input, search_type_input, num_results_input],
outputs=search_output,
fn=search,
cache_examples=False,
)
# ── Fetch panel ────────────────────────────────────────────────
with gr.Column(scale=2):
gr.Markdown("## Fetch (single URL → extracted content)")
url_input = gr.Textbox(
label="URL",
placeholder="https://example.com/article",
info="Required: the URL to fetch and extract",
)
timeout_input = gr.Slider(
minimum=5,
maximum=60,
value=20,
step=1,
label="Timeout (seconds)",
info="Optional (default 20)",
)
fetch_button = gr.Button("Fetch & Extract", variant="primary")
fetch_output = gr.JSON(label="Fetched Content (structured)")
gr.Examples(
examples=[
["https://news.ycombinator.com/"],
["https://www.python.org/dev/peps/pep-0008/"],
["https://en.wikipedia.org/wiki/Model_Context_Protocol"],
],
inputs=[url_input],
outputs=fetch_output,
fn=fetch,
cache_examples=False,
)
# Wire up buttons
search_button.click(
fn=search,
inputs=[query_input, search_type_input, num_results_input],
outputs=search_output,
api_name=False,
)
fetch_button.click(
fn=fetch,
inputs=[url_input, timeout_input],
outputs=fetch_output,
api_name=False,
)
with gr.Tab("Analytics"):
gr.Markdown("## Community Usage Analytics")
gr.Markdown("Daily request counts (UTC), split by tool.")
with gr.Row():
with gr.Column():
search_plot = gr.BarPlot(
value=last_n_days_count_df("search", 14),
x="date",
y="count",
title="Daily Search Count",
tooltip=["date", "count", "full_date"],
height=350,
x_label_angle=-45,
container=False,
)
with gr.Column():
fetch_plot = gr.BarPlot(
value=last_n_days_count_df("fetch", 14),
x="date",
y="count",
title="Daily Fetch Count",
tooltip=["date", "count", "full_date"],
height=350,
x_label_angle=-45,
container=False,
)
# Refresh analytics on load
demo.load(
fn=lambda: (
last_n_days_count_df("search", 14),
last_n_days_count_df("fetch", 14),
),
outputs=[search_plot, fetch_plot],
api_name=False,
)
# Expose MCP tools
gr.api(search, api_name="search")
gr.api(fetch, api_name="fetch")
demo.queue(
max_size=int(os.getenv("GRADIO_MAX_QUEUE", "256")),
default_concurrency_limit=int(os.getenv("GRADIO_CONCURRENCY", "32")),
)
if __name__ == "__main__":
# Launch with MCP server enabled
demo.launch(mcp_server=True, show_api=True)