File size: 14,282 Bytes
6ef48c6
 
59993d0
6ef48c6
d454433
6ef48c6
 
 
e90574b
edda836
531d6f9
 
59993d0
e90574b
6ef48c6
e90574b
9d978bc
 
e90574b
 
531d6f9
 
 
f2aca49
531d6f9
 
36a4f5e
 
 
6ef48c6
9d978bc
6ef48c6
9d978bc
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6ef48c6
 
 
 
9d978bc
 
 
6ef48c6
 
e90574b
6ef48c6
9d978bc
6ef48c6
 
e90574b
6ef48c6
36a4f5e
9d978bc
 
 
6ef48c6
59993d0
f4bd350
6ef48c6
59993d0
6ef48c6
 
 
 
 
 
36a4f5e
9d978bc
 
 
6ef48c6
 
 
 
f2aca49
59993d0
 
f2aca49
6ef48c6
9d978bc
36a4f5e
 
 
 
9d978bc
 
 
 
 
36a4f5e
6ef48c6
9d978bc
6ef48c6
 
59993d0
 
6ef48c6
 
9d978bc
 
 
 
 
36a4f5e
9d978bc
59993d0
 
36a4f5e
e90574b
6ef48c6
9d978bc
6ef48c6
 
 
e90574b
6ef48c6
 
 
e90574b
9d978bc
6ef48c6
 
 
 
 
 
 
 
 
 
 
 
f2aca49
 
 
6ef48c6
4424462
 
 
 
 
 
36a4f5e
 
 
4424462
 
 
9d978bc
36a4f5e
4424462
 
 
 
 
 
 
 
 
36a4f5e
 
4424462
 
 
 
 
 
36a4f5e
6ef48c6
 
 
59993d0
 
9d978bc
6ef48c6
 
9d978bc
e90574b
f2aca49
 
 
f4bd350
59993d0
 
 
f4bd350
6ef48c6
 
 
59993d0
 
 
6ef48c6
 
 
 
 
d454433
 
 
 
 
 
 
 
 
1d115f5
59993d0
 
 
 
 
 
 
 
 
 
f4bd350
59993d0
 
 
 
 
 
 
 
 
9d978bc
59993d0
 
 
 
 
 
 
 
 
 
 
 
9d978bc
36a4f5e
59993d0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6ef48c6
59993d0
4e72c55
59993d0
 
 
 
 
 
6ef48c6
59993d0
 
 
 
 
 
 
 
 
 
 
 
 
 
6ef48c6
59993d0
 
f4bd350
 
 
 
59993d0
 
 
f4bd350
 
 
59993d0
 
 
 
 
 
 
 
f4bd350
59993d0
 
 
 
 
 
 
 
 
 
 
b4fa9b6
6ef48c6
59993d0
36a4f5e
59993d0
85f4370
6ef48c6
e90574b
59993d0
 
f4bd350
 
 
59993d0
b4fa9b6
85f4370
 
 
e90574b
 
6ef48c6
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
import os
import asyncio
import time
from typing import Optional
from datetime import datetime
import httpx
import trafilatura
import gradio as gr
from dateutil import parser as dateparser
from limits import parse
from limits.aio.storage import MemoryStorage
from limits.aio.strategies import MovingWindowRateLimiter
from analytics import record_request, last_n_days_df, last_n_days_avg_time_df

# Configuration
SERPER_API_KEY = os.getenv("SERPER_API_KEY")
SERPER_SEARCH_ENDPOINT = "https://google.serper.dev/search"
SERPER_NEWS_ENDPOINT = "https://google.serper.dev/news"
HEADERS = {"X-API-KEY": SERPER_API_KEY, "Content-Type": "application/json"}

# Rate limiting
storage = MemoryStorage()
limiter = MovingWindowRateLimiter(storage)
rate_limit = parse("360/hour")


async def search_web(
    query: str, search_type: str = "search", num_results: Optional[int] = 4
) -> str:
    """
    Search the web for information or fresh news, returning extracted content.

    This tool can perform two types of searches:
    - "search" (default): General web search for diverse, relevant content from various sources
    - "news": Specifically searches for fresh news articles and breaking stories

    Use "news" mode when looking for:
    - Breaking news or very recent events
    - Time-sensitive information
    - Current affairs and latest developments
    - Today's/this week's happenings

    Use "search" mode (default) for:
    - General information and research
    - Technical documentation or guides
    - Historical information
    - Diverse perspectives from various sources

    Args:
        query (str): The search query. This is REQUIRED. Examples: "apple inc earnings",
                    "climate change 2024", "AI developments"
        search_type (str): Type of search. This is OPTIONAL. Default is "search".
                          Options: "search" (general web search) or "news" (fresh news articles).
                          Use "news" for time-sensitive, breaking news content.
        num_results (int): Number of results to fetch. This is OPTIONAL. Default is 4.
                          Range: 1-20. More results = more context but longer response time.

    Returns:
        str: Formatted text containing extracted content with metadata (title,
             source, date, URL, and main text) for each result, separated by dividers.
             Returns error message if API key is missing or search fails.

    Examples:
        - search_web("OpenAI GPT-5", "news") - Get 5 fresh news articles about OpenAI
        - search_web("python tutorial", "search") - Get 4 general results about Python (default count)
        - search_web("stock market today", "news", 10) - Get 10 news articles about today's market
        - search_web("machine learning basics") - Get 4 general search results (all defaults)
    """
    start_time = time.time()

    if not SERPER_API_KEY:
        await record_request()  # Record even failed requests
        return "Error: SERPER_API_KEY environment variable is not set. Please set it to use this tool."

    # Validate and constrain num_results
    if num_results is None:
        num_results = 4
    num_results = max(1, min(20, num_results))

    # Validate search_type
    if search_type not in ["search", "news"]:
        search_type = "search"

    try:
        # Check rate limit
        if not await limiter.hit(rate_limit, "global"):
            print(f"[{datetime.now().isoformat()}] Rate limit exceeded")
            duration = time.time() - start_time
            await record_request(duration)
            return "Error: Rate limit exceeded. Please try again later (limit: 500 requests per hour)."

        # Select endpoint based on search type
        endpoint = (
            SERPER_NEWS_ENDPOINT if search_type == "news" else SERPER_SEARCH_ENDPOINT
        )

        # Prepare payload
        payload = {"q": query, "num": num_results}
        if search_type == "news":
            payload["type"] = "news"
            payload["page"] = 1

        async with httpx.AsyncClient(timeout=15) as client:
            resp = await client.post(endpoint, headers=HEADERS, json=payload)

        if resp.status_code != 200:
            duration = time.time() - start_time
            await record_request(duration)
            return f"Error: Search API returned status {resp.status_code}. Please check your API key and try again."

        # Extract results based on search type
        if search_type == "news":
            results = resp.json().get("news", [])
        else:
            results = resp.json().get("organic", [])

        if not results:
            duration = time.time() - start_time
            await record_request(duration)
            return f"No {search_type} results found for query: '{query}'. Try a different search term or search type."

        # Fetch HTML content concurrently
        urls = [r["link"] for r in results]
        async with httpx.AsyncClient(timeout=20, follow_redirects=True) as client:
            tasks = [client.get(u) for u in urls]
            responses = await asyncio.gather(*tasks, return_exceptions=True)

        # Extract and format content
        chunks = []
        successful_extractions = 0

        for meta, response in zip(results, responses):
            if isinstance(response, Exception):
                continue

            # Extract main text content
            body = trafilatura.extract(
                response.text, include_formatting=False, include_comments=False
            )

            if not body:
                continue

            successful_extractions += 1
            print(
                f"[{datetime.now().isoformat()}] Successfully extracted content from {meta['link']}"
            )

            # Format the chunk based on search type
            if search_type == "news":
                # News results have date and source
                try:
                    date_str = meta.get("date", "")
                    if date_str:
                        date_iso = dateparser.parse(date_str, fuzzy=True).strftime(
                            "%Y-%m-%d"
                        )
                    else:
                        date_iso = "Unknown"
                except Exception:
                    date_iso = "Unknown"

                chunk = (
                    f"## {meta['title']}\n"
                    f"**Source:** {meta.get('source', 'Unknown')}   "
                    f"**Date:** {date_iso}\n"
                    f"**URL:** {meta['link']}\n\n"
                    f"{body.strip()}\n"
                )
            else:
                # Search results don't have date/source but have domain
                domain = meta["link"].split("/")[2].replace("www.", "")

                chunk = (
                    f"## {meta['title']}\n"
                    f"**Domain:** {domain}\n"
                    f"**URL:** {meta['link']}\n\n"
                    f"{body.strip()}\n"
                )

            chunks.append(chunk)

        if not chunks:
            duration = time.time() - start_time
            await record_request(duration)
            return f"Found {len(results)} {search_type} results for '{query}', but couldn't extract readable content from any of them. The websites might be blocking automated access."

        result = "\n---\n".join(chunks)
        summary = f"Successfully extracted content from {successful_extractions} out of {len(results)} {search_type} results for query: '{query}'\n\n---\n\n"

        print(
            f"[{datetime.now().isoformat()}] Extraction complete: {successful_extractions}/{len(results)} successful for query '{query}'"
        )

        # Record successful request with duration
        duration = time.time() - start_time
        await record_request(duration)

        return summary + result

    except Exception as e:
        # Record failed request with duration
        duration = time.time() - start_time
        await record_request(duration)
        return f"Error occurred while searching: {str(e)}. Please try again or check your query."


# Create Gradio interface
with gr.Blocks(title="Web Search MCP Server") as demo:
    gr.HTML(
        """
        <div style="background-color: rgba(59, 130, 246, 0.1); border: 1px solid rgba(59, 130, 246, 0.3); border-radius: 8px; padding: 12px; margin-bottom: 16px; text-align: center;">
            <p style="color: rgb(59, 130, 246); margin: 0; font-size: 14px; font-weight: 500;">
                🀝 Community resource β€” please use responsibly to keep this service available for everyone
            </p>
        </div>
        """
    )

    gr.Markdown("# πŸ” Web Search MCP Server")

    with gr.Tabs():
        with gr.Tab("App"):
            gr.Markdown(
                """
                This MCP server provides web search capabilities to LLMs. It can perform general web searches
                or specifically search for fresh news articles, extracting the main content from results.
                
                **⚑ Speed-Focused:** Optimized to complete the entire search process - from query to 
                fully extracted web content - in under 2 seconds. Check out the Analytics tab 
                to see real-time performance metrics.
                
                **Search Types:**
                - **General Search**: Diverse results from various sources (blogs, docs, articles, etc.)
                - **News Search**: Fresh news articles and breaking stories from news sources
                
                **Note:** This interface is primarily designed for MCP tool usage by LLMs, but you can 
                also test it manually below.
                """
            )

            gr.HTML(
                """
                <div style="margin-bottom: 24px;">
                    <a href="https://huggingface.co/spaces/victor/websearch?view=api">
                        <img src="https://huggingface.co/datasets/huggingface/badges/resolve/main/use-with-mcp-lg-dark.svg" 
                             alt="Use with MCP" 
                             style="height: 36px;">
                    </a>
                </div>
                """,
                padding=0,
            )

            with gr.Row():
                with gr.Column(scale=3):
                    query_input = gr.Textbox(
                        label="Search Query",
                        placeholder='e.g. "OpenAI news", "climate change 2024", "AI developments"',
                        info="Required: Enter your search query",
                    )
                with gr.Column(scale=1):
                    search_type_input = gr.Radio(
                        choices=["search", "news"],
                        value="search",
                        label="Search Type",
                        info="Choose search type",
                    )

            with gr.Row():
                num_results_input = gr.Slider(
                    minimum=1,
                    maximum=20,
                    value=4,
                    step=1,
                    label="Number of Results",
                    info="Optional: How many results to fetch (default: 4)",
                )

            search_button = gr.Button("Search", variant="primary")

            output = gr.Textbox(
                label="Extracted Content",
                lines=25,
                max_lines=50,
                info="The extracted article content will appear here",
            )

            # Add examples
            gr.Examples(
                examples=[
                    ["OpenAI GPT-5 latest developments", "news", 5],
                    ["React hooks useState", "search", 4],
                    ["Tesla stock price today", "news", 6],
                    ["Apple Vision Pro reviews", "search", 4],
                    ["best Italian restaurants NYC", "search", 4],
                ],
                inputs=[query_input, search_type_input, num_results_input],
                outputs=output,
                fn=search_web,
                cache_examples=False,
            )

        with gr.Tab("Analytics"):
            gr.Markdown("## Community Usage Analytics")
            gr.Markdown(
                "Track daily request counts and average response times from all community users."
            )

            with gr.Row():
                with gr.Column():
                    requests_plot = gr.BarPlot(
                        value=last_n_days_df(
                            14
                        ),  # Show only last 14 days for better visibility
                        x="date",
                        y="count",
                        title="Daily Request Count",
                        tooltip=["date", "count"],
                        height=350,
                        x_label_angle=-45,  # Rotate labels to prevent overlap
                        container=False,
                    )

                with gr.Column():
                    avg_time_plot = gr.BarPlot(
                        value=last_n_days_avg_time_df(14),  # Show only last 14 days
                        x="date",
                        y="avg_time",
                        title="Average Request Time (seconds)",
                        tooltip=["date", "avg_time", "request_count"],
                        height=350,
                        x_label_angle=-45,
                        container=False,
                    )

    search_button.click(
        fn=search_web,  # Use search_web directly instead of search_and_log
        inputs=[query_input, search_type_input, num_results_input],
        outputs=output,
        api_name=False,  # Hide this endpoint from API & MCP
    )

    # Load fresh analytics data when the page loads or Analytics tab is clicked
    demo.load(
        fn=lambda: (last_n_days_df(14), last_n_days_avg_time_df(14)),
        outputs=[requests_plot, avg_time_plot],
        api_name=False,
    )

    # Expose search_web as the only MCP tool
    gr.api(search_web, api_name="search_web")


if __name__ == "__main__":
    # Launch with MCP server enabled
    # The MCP endpoint will be available at: http://localhost:7860/gradio_api/mcp/sse
    demo.launch(mcp_server=True, show_api=True)