213 lines
5.9 KiB
Python
213 lines
5.9 KiB
Python
from __future__ import annotations
|
|
|
|
import re
|
|
from urllib.parse import quote_plus, urljoin
|
|
|
|
import httpx
|
|
from bs4 import BeautifulSoup
|
|
from markdownify import markdownify
|
|
from mcp.server.fastmcp import FastMCP
|
|
|
|
mcp = FastMCP(
|
|
"Web Search MCP",
|
|
instructions="MCP server for searching the web and fetching page content.",
|
|
)
|
|
|
|
_USER_AGENT = (
|
|
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 "
|
|
"(KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36"
|
|
)
|
|
_HEADERS = {
|
|
"User-Agent": _USER_AGENT,
|
|
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
|
|
"Accept-Language": "en-US,en;q=0.9",
|
|
}
|
|
|
|
_TIMEOUT = 15.0
|
|
|
|
|
|
def _get_client() -> httpx.Client:
|
|
return httpx.Client(
|
|
headers=_HEADERS,
|
|
timeout=_TIMEOUT,
|
|
follow_redirects=True,
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Search
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _parse_duckduckgo(html: str) -> list[dict]:
|
|
"""Parse DuckDuckGo HTML search results."""
|
|
soup = BeautifulSoup(html, "html.parser")
|
|
results: list[dict] = []
|
|
|
|
for result_div in soup.select(".result"):
|
|
title_tag = result_div.select_one(".result__a")
|
|
snippet_tag = result_div.select_one(".result__snippet")
|
|
if not title_tag:
|
|
continue
|
|
|
|
href = title_tag.get("href", "")
|
|
# DuckDuckGo wraps URLs in a redirect; extract the actual URL
|
|
if "uddg=" in str(href):
|
|
from urllib.parse import parse_qs, urlparse
|
|
|
|
parsed = urlparse(str(href))
|
|
qs = parse_qs(parsed.query)
|
|
href = qs.get("uddg", [str(href)])[0]
|
|
|
|
results.append(
|
|
{
|
|
"title": title_tag.get_text(strip=True),
|
|
"url": str(href),
|
|
"snippet": snippet_tag.get_text(strip=True) if snippet_tag else "",
|
|
}
|
|
)
|
|
|
|
return results
|
|
|
|
|
|
@mcp.tool()
|
|
def web_search(query: str, max_results: int = 10) -> list[dict]:
|
|
"""Search the web using DuckDuckGo and return results.
|
|
|
|
Args:
|
|
query: The search query string.
|
|
max_results: Maximum number of results to return. Default 10.
|
|
"""
|
|
url = f"https://html.duckduckgo.com/html/?q={quote_plus(query)}"
|
|
with _get_client() as client:
|
|
try:
|
|
resp = client.get(url)
|
|
resp.raise_for_status()
|
|
except httpx.HTTPStatusError as e:
|
|
return [
|
|
{"error": f"HTTP {e.response.status_code}: {e.response.reason_phrase}"}
|
|
]
|
|
except httpx.RequestError as e:
|
|
return [{"error": f"Request failed: {e}"}]
|
|
|
|
results = _parse_duckduckgo(resp.text)
|
|
return results[:max_results]
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Fetch page
|
|
# ---------------------------------------------------------------------------
|
|
|
|
_STRIP_TAGS = {
|
|
"script",
|
|
"style",
|
|
"nav",
|
|
"footer",
|
|
"header",
|
|
"noscript",
|
|
"svg",
|
|
"img",
|
|
"iframe",
|
|
}
|
|
|
|
|
|
def _clean_html(html: str, base_url: str) -> str:
|
|
"""Convert HTML to clean markdown."""
|
|
soup = BeautifulSoup(html, "html.parser")
|
|
|
|
# Remove unwanted tags
|
|
for tag in soup.find_all(_STRIP_TAGS):
|
|
tag.decompose()
|
|
|
|
# Resolve relative URLs
|
|
for a_tag in soup.find_all("a", href=True):
|
|
a_tag["href"] = urljoin(base_url, a_tag["href"])
|
|
|
|
# Convert to markdown
|
|
md = markdownify(str(soup), heading_style="ATX", strip=["img"])
|
|
|
|
# Collapse excessive blank lines
|
|
md = re.sub(r"\n{3,}", "\n\n", md)
|
|
return md.strip()
|
|
|
|
|
|
@mcp.tool()
|
|
def fetch_page(url: str, max_length: int = 20000) -> dict:
|
|
"""Fetch a web page and return its content as markdown.
|
|
|
|
Args:
|
|
url: The URL to fetch.
|
|
max_length: Maximum character length of returned content. Default 20000.
|
|
"""
|
|
with _get_client() as client:
|
|
try:
|
|
resp = client.get(url)
|
|
resp.raise_for_status()
|
|
except httpx.HTTPStatusError as e:
|
|
return {
|
|
"error": f"HTTP {e.response.status_code}: {e.response.reason_phrase}",
|
|
"url": url,
|
|
}
|
|
except httpx.RequestError as e:
|
|
return {"error": str(e), "url": url}
|
|
|
|
content_type = resp.headers.get("content-type", "")
|
|
if "text/html" not in content_type and "application/xhtml" not in content_type:
|
|
return {
|
|
"url": str(resp.url),
|
|
"content_type": content_type,
|
|
"content": resp.text[:max_length],
|
|
}
|
|
|
|
md = _clean_html(resp.text, str(resp.url))
|
|
truncated = len(md) > max_length
|
|
return {
|
|
"url": str(resp.url),
|
|
"title": _extract_title(resp.text),
|
|
"content": md[:max_length],
|
|
"truncated": truncated,
|
|
}
|
|
|
|
|
|
def _extract_title(html: str) -> str:
|
|
soup = BeautifulSoup(html, "html.parser")
|
|
tag = soup.find("title")
|
|
return tag.get_text(strip=True) if tag else ""
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Multi-search: search + fetch top results in one call
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@mcp.tool()
|
|
def search_and_read(
|
|
query: str, num_pages: int = 3, max_page_length: int = 10000
|
|
) -> list[dict]:
|
|
"""Search the web and fetch the top results in one step.
|
|
|
|
This combines web_search and fetch_page: it searches for the query,
|
|
then fetches and converts the top results to markdown.
|
|
|
|
Args:
|
|
query: The search query string.
|
|
num_pages: Number of top results to fetch. Default 3.
|
|
max_page_length: Max characters per page. Default 10000.
|
|
"""
|
|
results = web_search(query, max_results=num_pages)
|
|
output: list[dict] = []
|
|
for r in results:
|
|
page = fetch_page(r["url"], max_length=max_page_length)
|
|
output.append(
|
|
{
|
|
"search_title": r["title"],
|
|
"search_snippet": r["snippet"],
|
|
**page,
|
|
}
|
|
)
|
|
return output
|
|
|
|
|
|
if __name__ == "__main__":
|
|
mcp.run()
|