import os import asyncio import time import json from typing import Optional, List, Dict, Any from datetime import datetime import httpx import trafilatura import gradio as gr from dateutil import parser as dateparser from limits import parse from limits.aio.storage import MemoryStorage from limits.aio.strategies import MovingWindowRateLimiter from analytics import record_request, last_n_days_df, last_n_days_avg_time_df # Configuration SERPER_API_KEY_ENV = os.getenv("SERPER_API_KEY") SERPER_API_KEY_OVERRIDE: Optional[str] = None SERPER_SEARCH_ENDPOINT = "https://google.serper.dev/search" SERPER_NEWS_ENDPOINT = "https://google.serper.dev/news" def _get_serper_api_key() -> Optional[str]: """Return the currently active Serper API key (override wins, else env).""" return (SERPER_API_KEY_OVERRIDE or SERPER_API_KEY_ENV or None) def _get_headers() -> Dict[str, str]: api_key = _get_serper_api_key() return {"X-API-KEY": api_key or "", "Content-Type": "application/json"} # Rate limiting storage = MemoryStorage() limiter = MovingWindowRateLimiter(storage) rate_limit = parse("360/hour") async def search_web( query: str, search_type: str = "search", num_results: Optional[int] = 4 ) -> str: """ Search the web for information or fresh news, returning extracted content. This tool can perform two types of searches: - "search" (default): General web search for diverse, relevant content from various sources - "news": Specifically searches for fresh news articles and breaking stories Use "news" mode when looking for: - Breaking news or very recent events - Time-sensitive information - Current affairs and latest developments - Today's/this week's happenings Use "search" mode (default) for: - General information and research - Technical documentation or guides - Historical information - Diverse perspectives from various sources Args: query (str): The search query. This is REQUIRED. Examples: "apple inc earnings", "climate change 2024", "AI developments" search_type (str): Type of search. This is OPTIONAL. Default is "search". Options: "search" (general web search) or "news" (fresh news articles). Use "news" for time-sensitive, breaking news content. num_results (int): Number of results to fetch. This is OPTIONAL. Default is 4. Range: 1-20. More results = more context but longer response time. Returns: str: Formatted text containing extracted content with metadata (title, source, date, URL, and main text) for each result, separated by dividers. Returns error message if API key is missing or search fails. Examples: - search_web("OpenAI GPT-5", "news") - Get 5 fresh news articles about OpenAI - search_web("python tutorial", "search") - Get 4 general results about Python (default count) - search_web("stock market today", "news", 10) - Get 10 news articles about today's market - search_web("machine learning basics") - Get 4 general search results (all defaults) """ start_time = time.time() if not _get_serper_api_key(): await record_request(None, num_results) # Record even failed requests return "Error: SERPER_API_KEY environment variable is not set. Please set it to use this tool." # Validate and constrain num_results if num_results is None: num_results = 4 num_results = max(1, min(20, num_results)) # Validate search_type if search_type not in ["search", "news"]: search_type = "search" try: # Check rate limit if not await limiter.hit(rate_limit, "global"): print(f"[{datetime.now().isoformat()}] Rate limit exceeded") duration = time.time() - start_time await record_request(duration, num_results) return "Error: Rate limit exceeded. Please try again later (limit: 360 requests per hour)." # Select endpoint based on search type endpoint = ( SERPER_NEWS_ENDPOINT if search_type == "news" else SERPER_SEARCH_ENDPOINT ) # Prepare payload payload = {"q": query, "num": num_results} if search_type == "news": payload["type"] = "news" payload["page"] = 1 async with httpx.AsyncClient(timeout=15) as client: resp = await client.post(endpoint, headers=_get_headers(), json=payload) if resp.status_code != 200: duration = time.time() - start_time await record_request(duration, num_results) return f"Error: Search API returned status {resp.status_code}. Please check your API key and try again." # Extract results based on search type if search_type == "news": results = resp.json().get("news", []) else: results = resp.json().get("organic", []) if not results: duration = time.time() - start_time await record_request(duration, num_results) return f"No {search_type} results found for query: '{query}'. Try a different search term or search type." # Fetch HTML content concurrently urls = [r["link"] for r in results] async with httpx.AsyncClient(timeout=20, follow_redirects=True) as client: tasks = [client.get(u) for u in urls] responses = await asyncio.gather(*tasks, return_exceptions=True) # Extract and format content chunks = [] successful_extractions = 0 for meta, response in zip(results, responses): if isinstance(response, Exception): continue # Extract main text content body = trafilatura.extract( response.text, include_formatting=True, include_comments=False ) if not body: continue successful_extractions += 1 print( f"[{datetime.now().isoformat()}] Successfully extracted content from {meta['link']}" ) # Format the chunk based on search type if search_type == "news": # News results have date and source try: date_str = meta.get("date", "") if date_str: date_iso = dateparser.parse(date_str, fuzzy=True).strftime( "%Y-%m-%d" ) else: date_iso = "Unknown" except Exception: date_iso = "Unknown" chunk = ( f"## {meta['title']}\n" f"**Source:** {meta.get('source', 'Unknown')} " f"**Date:** {date_iso}\n" f"**URL:** {meta['link']}\n\n" f"{body.strip()}\n" ) else: # Search results don't have date/source but have domain domain = meta["link"].split("/")[2].replace("www.", "") chunk = ( f"## {meta['title']}\n" f"**Domain:** {domain}\n" f"**URL:** {meta['link']}\n\n" f"{body.strip()}\n" ) chunks.append(chunk) if not chunks: duration = time.time() - start_time await record_request(duration, num_results) return f"Found {len(results)} {search_type} results for '{query}', but couldn't extract readable content from any of them. The websites might be blocking automated access." result = "\n---\n".join(chunks) summary = f"Successfully extracted content from {successful_extractions} out of {len(results)} {search_type} results for query: '{query}'\n\n---\n\n" print( f"[{datetime.now().isoformat()}] Extraction complete: {successful_extractions}/{len(results)} successful for query '{query}'" ) # Record successful request with duration duration = time.time() - start_time await record_request(duration, num_results) return summary + result except Exception as e: # Record failed request with duration duration = time.time() - start_time return f"Error occurred while searching: {str(e)}. Please try again or check your query." async def search_and_chunk( query: str, search_type: str, num_results: Optional[int], tokenizer_or_token_counter: str, chunk_size: int, chunk_overlap: int, heading_level: int, min_characters_per_chunk: int, max_characters_per_section: int, clean_text: bool, ) -> str: """ Complete flow: search -> fetch -> extract with trafilatura -> chunk with MarkdownChunker/Parser. Returns a JSON string of a list[dict] where each dict is a chunk enriched with source metadata. """ start_time = time.time() if not _get_serper_api_key(): await record_request(None, num_results) return json.dumps([ {"error": "SERPER_API_KEY not set", "hint": "Set env or paste in the UI"} ]) # Normalize inputs if num_results is None: num_results = 4 num_results = max(1, min(20, int(num_results))) if search_type not in ["search", "news"]: search_type = "search" try: # Rate limit if not await limiter.hit(rate_limit, "global"): duration = time.time() - start_time await record_request(duration, num_results) return json.dumps([ {"error": "rate_limited", "limit": "360/hour"} ]) endpoint = ( SERPER_NEWS_ENDPOINT if search_type == "news" else SERPER_SEARCH_ENDPOINT ) payload = {"q": query, "num": num_results} if search_type == "news": payload["type"] = "news" payload["page"] = 1 async with httpx.AsyncClient(timeout=15) as client: resp = await client.post(endpoint, headers=_get_headers(), json=payload) if resp.status_code != 200: duration = time.time() - start_time await record_request(duration, num_results) return json.dumps([ {"error": "bad_status", "status": resp.status_code} ]) results = resp.json().get("news" if search_type == "news" else "organic", []) if not results: duration = time.time() - start_time await record_request(duration, num_results) return json.dumps([]) # Fetch pages concurrently urls = [r.get("link") for r in results] async with httpx.AsyncClient(timeout=20, follow_redirects=True) as client: responses = await asyncio.gather(*[client.get(u) for u in urls], return_exceptions=True) all_chunks: List[Dict[str, Any]] = [] for meta, response in zip(results, responses): if isinstance(response, Exception): continue extracted = trafilatura.extract( response.text, include_formatting=True, include_comments=False ) if not extracted: continue # Build a markdown doc with metadata header to help heading-aware chunking if search_type == "news": # Parse date if present try: date_str = meta.get("date", "") date_iso = ( dateparser.parse(date_str, fuzzy=True).strftime("%Y-%m-%d") if date_str else "Unknown" ) except Exception: date_iso = "Unknown" markdown_doc = ( f"# {meta.get('title', 'Untitled')}\n\n" f"**Source:** {meta.get('source', 'Unknown')} **Date:** {date_iso}\n\n" f"**URL:** {meta.get('link', '')}\n\n" f"{extracted.strip()}\n" ) else: domain = (meta.get("link", "").split("/")[2].replace("www.", "") if meta.get("link") else "") markdown_doc = ( f"# {meta.get('title', 'Untitled')}\n\n" f"**Domain:** {domain}\n\n" f"**URL:** {meta.get('link', '')}\n\n" f"{extracted.strip()}\n" ) # Run markdown chunker chunks = _run_markdown_chunker( markdown_doc, tokenizer_or_token_counter=tokenizer_or_token_counter, chunk_size=chunk_size, chunk_overlap=chunk_overlap, heading_level=heading_level, min_characters_per_chunk=min_characters_per_chunk, max_characters_per_section=max_characters_per_section, clean_text=clean_text, ) # Enrich with metadata for traceability for c in chunks: c.setdefault("source_title", meta.get("title")) c.setdefault("url", meta.get("link")) if search_type == "news": c.setdefault("source", meta.get("source")) c.setdefault("date", meta.get("date")) else: c.setdefault("domain", domain) all_chunks.append(c) duration = time.time() - start_time await record_request(duration, num_results) return json.dumps(all_chunks, ensure_ascii=False) except Exception as e: duration = time.time() - start_time await record_request(duration, num_results) return json.dumps([{"error": str(e)}]) # Create Gradio interface with gr.Blocks(title="Web Search MCP Server") as demo: gr.HTML( """

🤝 Community resource — please use responsibly to keep this service available for everyone

""" ) gr.Markdown("# 🔍 Web Search MCP Server") with gr.Tabs(): with gr.Tab("App"): gr.Markdown( """ This MCP server provides web search capabilities to LLMs. It can perform general web searches or specifically search for fresh news articles, extracting the main content from results. **⚡ Speed-Focused:** Optimized to complete the entire search process - from query to fully extracted web content - in under 2 seconds. Check out the Analytics tab to see real-time performance metrics. **Search Types:** - **General Search**: Diverse results from various sources (blogs, docs, articles, etc.) - **News Search**: Fresh news articles and breaking stories from news sources **Note:** This interface is primarily designed for MCP tool usage by LLMs, but you can also test it manually below. """ ) gr.HTML( """
Use with MCP
""", padding=0, ) with gr.Row(): with gr.Column(scale=3): query_input = gr.Textbox( label="Search Query", placeholder='e.g. "OpenAI news", "climate change 2024", "AI developments"', info="Required: Enter your search query", ) with gr.Column(scale=1): search_type_input = gr.Radio( choices=["search", "news"], value="search", label="Search Type", info="Choose search type", ) with gr.Row(): with gr.Column(scale=3): serper_key_input = gr.Textbox( label="Serper API Key", placeholder="Enter your Serper API key or set SERPER_API_KEY env var", type="password", ) with gr.Column(scale=1): set_key_btn = gr.Button("Save API Key") with gr.Accordion("Chunking Parameters", open=False): with gr.Row(): num_results_input = gr.Slider( minimum=1, maximum=20, value=4, step=1, label="Number of Results", info="Results to fetch (1-20)", ) chunk_size_input = gr.Slider(100, 4000, value=1000, step=50, label="Chunk Size (characters)") heading_level_input = gr.Slider(1, 6, value=3, step=1, label="Max Heading Level") with gr.Row(): min_chars_input = gr.Slider(0, 1000, value=50, step=10, label="Min characters per chunk") max_chars_input = gr.Slider(500, 10000, value=4000, step=100, label="Max characters per section") with gr.Row(): tokenizer_input = gr.Dropdown(choices=["character"], value="character", label="Tokenizer") overlap_input = gr.Slider(0, 400, value=0, step=10, label="Chunk overlap (reserved)") clean_text_input = gr.Checkbox(value=True, label="Clean text (strip inline markdown/URLs)") search_button = gr.Button("Search + Chunk", variant="primary") output = gr.Textbox( label="Chunks (JSON List[Dict])", lines=25, max_lines=50, info="Output is a JSON string list of chunk dicts", ) # Add examples gr.Examples( examples=[ ["OpenAI GPT-5 latest developments", "news", 5], ["React hooks useState", "search", 4], ["Tesla stock price today", "news", 6], ["Apple Vision Pro reviews", "search", 4], ["best Italian restaurants NYC", "search", 4], ], inputs=[ query_input, search_type_input, num_results_input, tokenizer_input, chunk_size_input, overlap_input, heading_level_input, min_chars_input, max_chars_input, clean_text_input, ], outputs=output, fn=search_and_chunk, cache_examples=False, ) def _set_serper_key(key: str) -> str: global SERPER_API_KEY_OVERRIDE SERPER_API_KEY_OVERRIDE = (key or "").strip() or None # Minimal validation/echo without exposing the full key if SERPER_API_KEY_OVERRIDE: return "Serper API key saved in-session." return "Cleared in-session API key. Using environment if set." set_key_btn.click(fn=_set_serper_key, inputs=serper_key_input, outputs=output) with gr.Tab("Analytics"): gr.Markdown("## Community Usage Analytics") gr.Markdown( "Track daily request counts and average response times from all community users." ) with gr.Row(): with gr.Column(): requests_plot = gr.BarPlot( value=last_n_days_df( 14 ), # Show only last 14 days for better visibility x="date", y="count", title="Daily Request Count", tooltip=["date", "count"], height=350, x_label_angle=-45, # Rotate labels to prevent overlap container=False, ) with gr.Column(): avg_time_plot = gr.BarPlot( value=last_n_days_avg_time_df(14), # Show only last 14 days x="date", y="avg_time", title="Average Request Time (seconds)", tooltip=["date", "avg_time", "request_count"], height=350, x_label_angle=-45, container=False, ) search_button.click( fn=search_and_chunk, inputs=[ query_input, search_type_input, num_results_input, tokenizer_input, chunk_size_input, overlap_input, heading_level_input, min_chars_input, max_chars_input, clean_text_input, ], outputs=output, api_name=False, ) # Load fresh analytics data when the page loads or Analytics tab is clicked demo.load( fn=lambda: (last_n_days_df(14), last_n_days_avg_time_df(14)), outputs=[requests_plot, avg_time_plot], api_name=False, ) # Expose search_and_chunk as the MCP tool gr.api(search_and_chunk, api_name="search_and_chunk") # -------- Markdown chunk helper (from chonkie) -------- def _run_markdown_chunker( markdown_text: str, tokenizer_or_token_counter: str = "character", chunk_size: int = 1000, chunk_overlap: int = 0, heading_level: int = 3, min_characters_per_chunk: int = 50, max_characters_per_section: int = 4000, clean_text: bool = True, ) -> List[Dict[str, Any]]: """ Use chonkie's MarkdownChunker or MarkdownParser to chunk markdown text and return a List[Dict] with useful fields. This follows the documentation in the chonkie commit introducing MarkdownChunker and its parameters. """ markdown_text = markdown_text or "" if not markdown_text.strip(): return [] # Lazy import so the app can still run without the dependency until this is used try: try: from chonkie import MarkdownParser # type: ignore except Exception: try: from chonkie.chunker.markdown import MarkdownParser # type: ignore except Exception: MarkdownParser = None # type: ignore try: from chonkie import MarkdownChunker # type: ignore except Exception: from chonkie.chunker.markdown import MarkdownChunker # type: ignore except Exception as exc: return [{ "error": "chonkie not installed", "detail": "Install chonkie from the feat/markdown-chunker branch", "exception": str(exc), }] # Prefer MarkdownParser if available and it yields dicts if 'MarkdownParser' in globals() and MarkdownParser is not None: try: parser = MarkdownParser( tokenizer_or_token_counter=tokenizer_or_token_counter, chunk_size=int(chunk_size), chunk_overlap=int(chunk_overlap), heading_level=int(heading_level), min_characters_per_chunk=int(min_characters_per_chunk), max_characters_per_section=int(max_characters_per_section), clean_text=bool(clean_text), ) result = parser.parse(markdown_text) if hasattr(parser, 'parse') else parser(markdown_text) # type: ignore # If the parser returns list of dicts already, pass-through if isinstance(result, list) and (not result or isinstance(result[0], dict)): return result # type: ignore # Else, normalize below chunks = result except Exception: # Fall back to chunker if parser invocation fails chunks = None else: chunks = None # Fallback to MarkdownChunker if needed or normalization for non-dicts if chunks is None: chunker = MarkdownChunker( tokenizer_or_token_counter=tokenizer_or_token_counter, chunk_size=int(chunk_size), chunk_overlap=int(chunk_overlap), heading_level=int(heading_level), min_characters_per_chunk=int(min_characters_per_chunk), max_characters_per_section=int(max_characters_per_section), clean_text=bool(clean_text), ) if hasattr(chunker, 'chunk'): chunks = chunker.chunk(markdown_text) # type: ignore elif hasattr(chunker, 'split_text'): chunks = chunker.split_text(markdown_text) # type: ignore elif callable(chunker): chunks = chunker(markdown_text) # type: ignore else: return [{"error": "Unknown MarkdownChunker interface"}] # Normalize chunks to list of dicts normalized: List[Dict[str, Any]] = [] for c in (chunks or []): if isinstance(c, dict): normalized.append(c) continue item: Dict[str, Any] = {} for field in ("text", "start_index", "end_index", "token_count", "heading", "metadata"): if hasattr(c, field): try: item[field] = getattr(c, field) except Exception: pass if not item: # Last resort: string representation item = {"text": str(c)} normalized.append(item) return normalized with demo: pass if __name__ == "__main__": # Launch with MCP server enabled # The MCP endpoint will be available at: http://localhost:7860/gradio_api/mcp/sse demo.launch(mcp_server=True, show_api=True)