import os import asyncio import time from typing import Optional from datetime import datetime import httpx import trafilatura import gradio as gr from dateutil import parser as dateparser from limits import parse from limits.aio.storage import MemoryStorage from limits.aio.strategies import MovingWindowRateLimiter from analytics import record_request, last_n_days_df, last_n_days_avg_time_df # Configuration SERPER_API_KEY = os.getenv("SERPER_API_KEY") SERPER_SEARCH_ENDPOINT = "https://google.serper.dev/search" SERPER_NEWS_ENDPOINT = "https://google.serper.dev/news" HEADERS = {"X-API-KEY": SERPER_API_KEY, "Content-Type": "application/json"} # Rate limiting storage = MemoryStorage() limiter = MovingWindowRateLimiter(storage) rate_limit = parse("360/hour") async def search_web( query: str, search_type: str = "search", num_results: Optional[int] = 4 ) -> str: """ Search the web for information or fresh news, returning extracted content. This tool can perform two types of searches: - "search" (default): General web search for diverse, relevant content from various sources - "news": Specifically searches for fresh news articles and breaking stories Use "news" mode when looking for: - Breaking news or very recent events - Time-sensitive information - Current affairs and latest developments - Today's/this week's happenings Use "search" mode (default) for: - General information and research - Technical documentation or guides - Historical information - Diverse perspectives from various sources Args: query (str): The search query. This is REQUIRED. Examples: "apple inc earnings", "climate change 2024", "AI developments" search_type (str): Type of search. This is OPTIONAL. Default is "search". Options: "search" (general web search) or "news" (fresh news articles). Use "news" for time-sensitive, breaking news content. num_results (int): Number of results to fetch. This is OPTIONAL. Default is 4. Range: 1-20. More results = more context but longer response time. Returns: str: Formatted text containing extracted content with metadata (title, source, date, URL, and main text) for each result, separated by dividers. Returns error message if API key is missing or search fails. Examples: - search_web("OpenAI GPT-5", "news") - Get 5 fresh news articles about OpenAI - search_web("python tutorial", "search") - Get 4 general results about Python (default count) - search_web("stock market today", "news", 10) - Get 10 news articles about today's market - search_web("machine learning basics") - Get 4 general search results (all defaults) """ start_time = time.time() if not SERPER_API_KEY: await record_request(None, num_results) # Record even failed requests return "Error: SERPER_API_KEY environment variable is not set. Please set it to use this tool." # Validate and constrain num_results if num_results is None: num_results = 4 num_results = max(1, min(20, num_results)) # Validate search_type if search_type not in ["search", "news"]: search_type = "search" try: # Check rate limit if not await limiter.hit(rate_limit, "global"): print(f"[{datetime.now().isoformat()}] Rate limit exceeded") duration = time.time() - start_time await record_request(duration, num_results) return "Error: Rate limit exceeded. Please try again later (limit: 500 requests per hour)." # Select endpoint based on search type endpoint = ( SERPER_NEWS_ENDPOINT if search_type == "news" else SERPER_SEARCH_ENDPOINT ) # Prepare payload payload = {"q": query, "num": num_results} if search_type == "news": payload["type"] = "news" payload["page"] = 1 async with httpx.AsyncClient(timeout=15) as client: resp = await client.post(endpoint, headers=HEADERS, json=payload) if resp.status_code != 200: duration = time.time() - start_time await record_request(duration, num_results) return f"Error: Search API returned status {resp.status_code}. Please check your API key and try again." # Extract results based on search type if search_type == "news": results = resp.json().get("news", []) else: results = resp.json().get("organic", []) if not results: duration = time.time() - start_time await record_request(duration, num_results) return f"No {search_type} results found for query: '{query}'. Try a different search term or search type." # Fetch HTML content concurrently urls = [r["link"] for r in results] async with httpx.AsyncClient(timeout=20, follow_redirects=True) as client: tasks = [client.get(u) for u in urls] responses = await asyncio.gather(*tasks, return_exceptions=True) # Extract and format content chunks = [] successful_extractions = 0 for meta, response in zip(results, responses): if isinstance(response, Exception): continue # Extract main text content body = trafilatura.extract( response.text, include_formatting=False, include_comments=False ) if not body: continue successful_extractions += 1 print( f"[{datetime.now().isoformat()}] Successfully extracted content from {meta['link']}" ) # Format the chunk based on search type if search_type == "news": # News results have date and source try: date_str = meta.get("date", "") if date_str: date_iso = dateparser.parse(date_str, fuzzy=True).strftime( "%Y-%m-%d" ) else: date_iso = "Unknown" except Exception: date_iso = "Unknown" chunk = ( f"## {meta['title']}\n" f"**Source:** {meta.get('source', 'Unknown')} " f"**Date:** {date_iso}\n" f"**URL:** {meta['link']}\n\n" f"{body.strip()}\n" ) else: # Search results don't have date/source but have domain domain = meta["link"].split("/")[2].replace("www.", "") chunk = ( f"## {meta['title']}\n" f"**Domain:** {domain}\n" f"**URL:** {meta['link']}\n\n" f"{body.strip()}\n" ) chunks.append(chunk) if not chunks: duration = time.time() - start_time await record_request(duration, num_results) return f"Found {len(results)} {search_type} results for '{query}', but couldn't extract readable content from any of them. The websites might be blocking automated access." result = "\n---\n".join(chunks) summary = f"Successfully extracted content from {successful_extractions} out of {len(results)} {search_type} results for query: '{query}'\n\n---\n\n" print( f"[{datetime.now().isoformat()}] Extraction complete: {successful_extractions}/{len(results)} successful for query '{query}'" ) # Record successful request with duration duration = time.time() - start_time await record_request(duration, num_results) return summary + result except Exception as e: # Record failed request with duration duration = time.time() - start_time return f"Error occurred while searching: {str(e)}. Please try again or check your query." # Create Gradio interface with gr.Blocks(title="Web Search MCP Server") as demo: gr.HTML( """

🤝 Community resource — please use responsibly to keep this service available for everyone

""" ) gr.Markdown("# 🔍 Web Search MCP Server") with gr.Tabs(): with gr.Tab("App"): gr.Markdown( """ This MCP server provides web search capabilities to LLMs. It can perform general web searches or specifically search for fresh news articles, extracting the main content from results. **⚡ Speed-Focused:** Optimized to complete the entire search process - from query to fully extracted web content - in under 2 seconds. Check out the Analytics tab to see real-time performance metrics. **Search Types:** - **General Search**: Diverse results from various sources (blogs, docs, articles, etc.) - **News Search**: Fresh news articles and breaking stories from news sources **Note:** This interface is primarily designed for MCP tool usage by LLMs, but you can also test it manually below. """ ) gr.HTML( """
Use with MCP
""", padding=0, ) with gr.Row(): with gr.Column(scale=3): query_input = gr.Textbox( label="Search Query", placeholder='e.g. "OpenAI news", "climate change 2024", "AI developments"', info="Required: Enter your search query", ) with gr.Column(scale=1): search_type_input = gr.Radio( choices=["search", "news"], value="search", label="Search Type", info="Choose search type", ) with gr.Row(): num_results_input = gr.Slider( minimum=1, maximum=20, value=4, step=1, label="Number of Results", info="Optional: How many results to fetch (default: 4)", ) search_button = gr.Button("Search", variant="primary") output = gr.Textbox( label="Extracted Content", lines=25, max_lines=50, info="The extracted article content will appear here", ) # Add examples gr.Examples( examples=[ ["OpenAI GPT-5 latest developments", "news", 5], ["React hooks useState", "search", 4], ["Tesla stock price today", "news", 6], ["Apple Vision Pro reviews", "search", 4], ["best Italian restaurants NYC", "search", 4], ], inputs=[query_input, search_type_input, num_results_input], outputs=output, fn=search_web, cache_examples=False, ) with gr.Tab("Analytics"): gr.Markdown("## Community Usage Analytics") gr.Markdown( "Track daily request counts and average response times from all community users." ) with gr.Row(): with gr.Column(): requests_plot = gr.BarPlot( value=last_n_days_df( 14 ), # Show only last 14 days for better visibility x="date", y="count", title="Daily Request Count", tooltip=["date", "count"], height=350, x_label_angle=-45, # Rotate labels to prevent overlap container=False, ) with gr.Column(): avg_time_plot = gr.BarPlot( value=last_n_days_avg_time_df(14), # Show only last 14 days x="date", y="avg_time", title="Average Request Time (seconds)", tooltip=["date", "avg_time", "request_count"], height=350, x_label_angle=-45, container=False, ) search_button.click( fn=search_web, # Use search_web directly instead of search_and_log inputs=[query_input, search_type_input, num_results_input], outputs=output, api_name=False, # Hide this endpoint from API & MCP ) # Load fresh analytics data when the page loads or Analytics tab is clicked demo.load( fn=lambda: (last_n_days_df(14), last_n_days_avg_time_df(14)), outputs=[requests_plot, avg_time_plot], api_name=False, ) # Expose search_web as the only MCP tool gr.api(search_web, api_name="search_web") if __name__ == "__main__": # Launch with MCP server enabled # The MCP endpoint will be available at: http://localhost:7860/gradio_api/mcp/sse demo.launch(mcp_server=True, show_api=True)