import os import asyncio import time from typing import Optional from datetime import datetime import httpx import trafilatura import gradio as gr from dateutil import parser as dateparser from limits import parse from limits.aio.storage import MemoryStorage from limits.aio.strategies import MovingWindowRateLimiter from analytics import record_request, last_n_days_df, last_n_days_avg_time_df # Configuration SERPER_API_KEY = os.getenv("SERPER_API_KEY") SERPER_SEARCH_ENDPOINT = "https://google.serper.dev/search" SERPER_NEWS_ENDPOINT = "https://google.serper.dev/news" HEADERS = {"X-API-KEY": SERPER_API_KEY, "Content-Type": "application/json"} # Rate limiting storage = MemoryStorage() limiter = MovingWindowRateLimiter(storage) rate_limit = parse("360/hour") async def search_web( query: str, search_type: str = "search", num_results: Optional[int] = 4 ) -> str: """ Search the web for information or fresh news, returning extracted content. This tool can perform two types of searches: - "search" (default): General web search for diverse, relevant content from various sources - "news": Specifically searches for fresh news articles and breaking stories Use "news" mode when looking for: - Breaking news or very recent events - Time-sensitive information - Current affairs and latest developments - Today's/this week's happenings Use "search" mode (default) for: - General information and research - Technical documentation or guides - Historical information - Diverse perspectives from various sources Args: query (str): The search query. This is REQUIRED. Examples: "apple inc earnings", "climate change 2024", "AI developments" search_type (str): Type of search. This is OPTIONAL. Default is "search". Options: "search" (general web search) or "news" (fresh news articles). Use "news" for time-sensitive, breaking news content. num_results (int): Number of results to fetch. This is OPTIONAL. Default is 4. Range: 1-20. More results = more context but longer response time. Returns: str: Formatted text containing extracted content with metadata (title, source, date, URL, and main text) for each result, separated by dividers. Returns error message if API key is missing or search fails. Examples: - search_web("OpenAI GPT-5", "news") - Get 5 fresh news articles about OpenAI - search_web("python tutorial", "search") - Get 4 general results about Python (default count) - search_web("stock market today", "news", 10) - Get 10 news articles about today's market - search_web("machine learning basics") - Get 4 general search results (all defaults) """ start_time = time.time() if not SERPER_API_KEY: await record_request(None, num_results) # Record even failed requests return "Error: SERPER_API_KEY environment variable is not set. Please set it to use this tool." # Validate and constrain num_results if num_results is None: num_results = 4 num_results = max(1, min(20, num_results)) # Validate search_type if search_type not in ["search", "news"]: search_type = "search" try: # Check rate limit if not await limiter.hit(rate_limit, "global"): print(f"[{datetime.now().isoformat()}] Rate limit exceeded") duration = time.time() - start_time await record_request(duration, num_results) return "Error: Rate limit exceeded. Please try again later (limit: 500 requests per hour)." # Select endpoint based on search type endpoint = ( SERPER_NEWS_ENDPOINT if search_type == "news" else SERPER_SEARCH_ENDPOINT ) # Prepare payload payload = {"q": query, "num": num_results} if search_type == "news": payload["type"] = "news" payload["page"] = 1 async with httpx.AsyncClient(timeout=15) as client: resp = await client.post(endpoint, headers=HEADERS, json=payload) if resp.status_code != 200: duration = time.time() - start_time await record_request(duration, num_results) return f"Error: Search API returned status {resp.status_code}. Please check your API key and try again." # Extract results based on search type if search_type == "news": results = resp.json().get("news", []) else: results = resp.json().get("organic", []) if not results: duration = time.time() - start_time await record_request(duration, num_results) return f"No {search_type} results found for query: '{query}'. Try a different search term or search type." # Fetch HTML content concurrently urls = [r["link"] for r in results] async with httpx.AsyncClient(timeout=20, follow_redirects=True) as client: tasks = [client.get(u) for u in urls] responses = await asyncio.gather(*tasks, return_exceptions=True) # Extract and format content chunks = [] successful_extractions = 0 for meta, response in zip(results, responses): if isinstance(response, Exception): continue # Extract main text content body = trafilatura.extract( response.text, include_formatting=False, include_comments=False ) if not body: continue successful_extractions += 1 print( f"[{datetime.now().isoformat()}] Successfully extracted content from {meta['link']}" ) # Format the chunk based on search type if search_type == "news": # News results have date and source try: date_str = meta.get("date", "") if date_str: date_iso = dateparser.parse(date_str, fuzzy=True).strftime( "%Y-%m-%d" ) else: date_iso = "Unknown" except Exception: date_iso = "Unknown" chunk = ( f"## {meta['title']}\n" f"**Source:** {meta.get('source', 'Unknown')} " f"**Date:** {date_iso}\n" f"**URL:** {meta['link']}\n\n" f"{body.strip()}\n" ) else: # Search results don't have date/source but have domain domain = meta["link"].split("/")[2].replace("www.", "") chunk = ( f"## {meta['title']}\n" f"**Domain:** {domain}\n" f"**URL:** {meta['link']}\n\n" f"{body.strip()}\n" ) chunks.append(chunk) if not chunks: duration = time.time() - start_time await record_request(duration, num_results) return f"Found {len(results)} {search_type} results for '{query}', but couldn't extract readable content from any of them. The websites might be blocking automated access." result = "\n---\n".join(chunks) summary = f"Successfully extracted content from {successful_extractions} out of {len(results)} {search_type} results for query: '{query}'\n\n---\n\n" print( f"[{datetime.now().isoformat()}] Extraction complete: {successful_extractions}/{len(results)} successful for query '{query}'" ) # Record successful request with duration duration = time.time() - start_time await record_request(duration, num_results) return summary + result except Exception as e: # Record failed request with duration duration = time.time() - start_time return f"Error occurred while searching: {str(e)}. Please try again or check your query." # Create Gradio interface with gr.Blocks(title="Web Search MCP Server") as demo: gr.HTML( """
🤝 Community resource — please use responsibly to keep this service available for everyone