Initial commit

This commit is contained in:
2026-03-17 07:15:38 +00:00
commit ed207ab5a4
9 changed files with 2137 additions and 0 deletions

290
mcps/docker_mcp.py Normal file
View File

@@ -0,0 +1,290 @@
import docker
from docker.errors import APIError, NotFound
from mcp.server.fastmcp import FastMCP
mcp = FastMCP("Docker MCP", instructions="MCP server for managing Docker containers.")
_client: docker.DockerClient | None = None
def get_client() -> docker.DockerClient:
global _client
if _client is None:
_client = docker.from_env()
return _client
def _container_summary(c) -> dict:
return {
"id": c.short_id,
"name": c.name,
"image": str(c.image.tags[0] if c.image.tags else c.image.short_id),
"status": c.status,
"ports": c.ports,
}
@mcp.tool()
def list_containers(all: bool = False) -> list[dict]:
"""List Docker containers.
Args:
all: If True, include stopped containers. Default is only running.
"""
client = get_client()
containers = client.containers.list(all=all)
return [_container_summary(c) for c in containers]
@mcp.tool()
def inspect_container(container_id: str) -> dict:
"""Get detailed information about a container.
Args:
container_id: Container ID or name.
"""
client = get_client()
try:
c = client.containers.get(container_id)
except NotFound:
return {"error": f"Container '{container_id}' not found."}
return {
"id": c.id,
"short_id": c.short_id,
"name": c.name,
"status": c.status,
"image": str(c.image.tags[0] if c.image.tags else c.image.short_id),
"labels": c.labels,
"ports": c.ports,
"created": str(c.attrs.get("Created", "")),
"platform": c.attrs.get("Platform", ""),
"state": c.attrs.get("State", {}),
"network_settings": {
k: v.get("IPAddress", "") if isinstance(v, dict) else v
for k, v in (c.attrs.get("NetworkSettings", {}).get("Networks", {})).items()
},
"mounts": [
{
"source": m.get("Source", ""),
"destination": m.get("Destination", ""),
"mode": m.get("Mode", ""),
}
for m in c.attrs.get("Mounts", [])
],
"env": c.attrs.get("Config", {}).get("Env", []),
"cmd": c.attrs.get("Config", {}).get("Cmd", []),
}
@mcp.tool()
def start_container(container_id: str) -> str:
"""Start a stopped container.
Args:
container_id: Container ID or name.
"""
client = get_client()
try:
c = client.containers.get(container_id)
c.start()
return f"Container '{c.name}' started."
except NotFound:
return f"Container '{container_id}' not found."
except APIError as e:
return f"Failed to start container: {e.explanation}"
@mcp.tool()
def stop_container(container_id: str, timeout: int = 10) -> str:
"""Stop a running container.
Args:
container_id: Container ID or name.
timeout: Seconds to wait before killing. Default 10.
"""
client = get_client()
try:
c = client.containers.get(container_id)
c.stop(timeout=timeout)
return f"Container '{c.name}' stopped."
except NotFound:
return f"Container '{container_id}' not found."
except APIError as e:
return f"Failed to stop container: {e.explanation}"
@mcp.tool()
def restart_container(container_id: str, timeout: int = 10) -> str:
"""Restart a container.
Args:
container_id: Container ID or name.
timeout: Seconds to wait before killing during restart. Default 10.
"""
client = get_client()
try:
c = client.containers.get(container_id)
c.restart(timeout=timeout)
return f"Container '{c.name}' restarted."
except NotFound:
return f"Container '{container_id}' not found."
except APIError as e:
return f"Failed to restart container: {e.explanation}"
@mcp.tool()
def remove_container(container_id: str, force: bool = False) -> str:
"""Remove a container.
Args:
container_id: Container ID or name.
force: Force remove a running container. Default False.
"""
client = get_client()
try:
c = client.containers.get(container_id)
name = c.name
c.remove(force=force)
return f"Container '{name}' removed."
except NotFound:
return f"Container '{container_id}' not found."
except APIError as e:
return f"Failed to remove container: {e.explanation}"
@mcp.tool()
def container_logs(container_id: str, tail: int = 100, timestamps: bool = False) -> str:
"""Get logs from a container.
Args:
container_id: Container ID or name.
tail: Number of lines from the end. Default 100.
timestamps: Include timestamps in output. Default False.
"""
client = get_client()
try:
c = client.containers.get(container_id)
logs = c.logs(tail=tail, timestamps=timestamps)
return logs.decode("utf-8", errors="replace")
except NotFound:
return f"Container '{container_id}' not found."
except APIError as e:
return f"Failed to get logs: {e.explanation}"
@mcp.tool()
def exec_in_container(container_id: str, command: str) -> str:
"""Execute a command inside a running container.
Args:
container_id: Container ID or name.
command: The command to run (e.g. "ls -la /app").
"""
client = get_client()
try:
c = client.containers.get(container_id)
exit_code, output = c.exec_run(command)
decoded = output.decode("utf-8", errors="replace")
return f"exit_code={exit_code}\n{decoded}"
except NotFound:
return f"Container '{container_id}' not found."
except APIError as e:
return f"Failed to exec command: {e.explanation}"
@mcp.tool()
def run_container(
image: str,
name: str | None = None,
command: str | None = None,
detach: bool = True,
ports: dict[str, int] | None = None,
environment: dict[str, str] | None = None,
volumes: dict[str, dict[str, str]] | None = None,
) -> str:
"""Run a new container from an image.
Args:
image: Docker image to run (e.g. "nginx:latest").
name: Optional container name.
command: Optional command to run.
detach: Run in background. Default True.
ports: Port mapping, e.g. {"80/tcp": 8080}.
environment: Environment variables, e.g. {"KEY": "value"}.
volumes: Volume mounts, e.g. {"/host/path": {"bind": "/container/path", "mode": "rw"}}.
"""
client = get_client()
try:
c = client.containers.run(
image,
command=command,
name=name,
detach=detach,
ports=ports,
environment=environment,
volumes=volumes,
)
if detach:
return f"Container '{c.name}' ({c.short_id}) started from image '{image}'."
return c.decode("utf-8", errors="replace") if isinstance(c, bytes) else str(c)
except APIError as e:
return f"Failed to run container: {e.explanation}"
@mcp.tool()
def container_stats(container_id: str) -> dict:
"""Get resource usage statistics for a container (CPU, memory, network I/O).
Args:
container_id: Container ID or name.
"""
client = get_client()
try:
c = client.containers.get(container_id)
stats = c.stats(stream=False)
# CPU
cpu_delta = (
stats["cpu_stats"]["cpu_usage"]["total_usage"]
- stats["precpu_stats"]["cpu_usage"]["total_usage"]
)
system_delta = stats["cpu_stats"].get("system_cpu_usage", 0) - stats[
"precpu_stats"
].get("system_cpu_usage", 0)
num_cpus = stats["cpu_stats"].get("online_cpus", 1)
cpu_percent = (
(cpu_delta / system_delta) * num_cpus * 100.0 if system_delta > 0 else 0.0
)
# Memory
mem_usage = stats["memory_stats"].get("usage", 0)
mem_limit = stats["memory_stats"].get("limit", 1)
mem_percent = (mem_usage / mem_limit) * 100.0
# Network
net_rx = 0
net_tx = 0
for iface_stats in stats.get("networks", {}).values():
net_rx += iface_stats.get("rx_bytes", 0)
net_tx += iface_stats.get("tx_bytes", 0)
return {
"container": c.name,
"cpu_percent": round(cpu_percent, 2),
"memory_usage_mb": round(mem_usage / (1024 * 1024), 2),
"memory_limit_mb": round(mem_limit / (1024 * 1024), 2),
"memory_percent": round(mem_percent, 2),
"network_rx_mb": round(net_rx / (1024 * 1024), 2),
"network_tx_mb": round(net_tx / (1024 * 1024), 2),
}
except NotFound:
return {"error": f"Container '{container_id}' not found."}
except APIError as e:
return {"error": f"Failed to get stats: {e.explanation}"}
except KeyError, ZeroDivisionError:
return {"error": "Stats unavailable for this container."}
if __name__ == "__main__":
mcp.run()

855
mcps/gitea_mcp.py Normal file
View File

@@ -0,0 +1,855 @@
"""Gitea MCP server — git-style tools for a self-hosted Gitea instance."""
from __future__ import annotations
import base64
import os
import httpx
from mcp.server.fastmcp import FastMCP
mcp = FastMCP(
"Gitea MCP",
instructions="MCP server for interacting with a Gitea instance. "
"Provides tools for repositories, issues, pull requests, files, branches, "
"releases, organizations, and users.",
)
GITEA_URL = os.environ.get("GITEA_URL", "http://localhost:3000")
GITEA_TOKEN = os.environ.get("GITEA_TOKEN", "")
_TIMEOUT = 15.0
def _client() -> httpx.Client:
return httpx.Client(
base_url=f"{GITEA_URL}/api/v1",
headers={
"Authorization": f"token {GITEA_TOKEN}",
"Accept": "application/json",
"Content-Type": "application/json",
},
timeout=_TIMEOUT,
)
def _request(method: str, path: str, **kwargs) -> dict | list | str:
with _client() as c:
try:
resp = c.request(method, path, **kwargs)
resp.raise_for_status()
if resp.status_code == 204:
return {"ok": True}
return resp.json()
except httpx.HTTPStatusError as e:
body = e.response.text[:500]
return {"error": f"HTTP {e.response.status_code}", "detail": body}
except httpx.RequestError as e:
return {"error": str(e)}
def _get(path: str, **params) -> dict | list | str:
return _request("GET", path, params=params)
def _post(path: str, body: dict | None = None) -> dict | list | str:
return _request("POST", path, json=body or {})
def _patch(path: str, body: dict) -> dict | list | str:
return _request("PATCH", path, json=body)
def _delete(path: str) -> dict | list | str:
return _request("DELETE", path)
# ───────────────────────────────────────────────────────────────────
# User / Auth
# ───────────────────────────────────────────────────────────────────
@mcp.tool()
def get_authenticated_user() -> dict | list | str:
"""Get the currently authenticated Gitea user."""
return _get("/user")
# ───────────────────────────────────────────────────────────────────
# Repositories
# ───────────────────────────────────────────────────────────────────
@mcp.tool()
def list_repos(limit: int = 20, page: int = 1) -> dict | list | str:
"""List repositories accessible to the authenticated user.
Args:
limit: Number of results per page. Default 20.
page: Page number. Default 1.
"""
return _get("/user/repos", limit=limit, page=page)
@mcp.tool()
def get_repo(owner: str, repo: str) -> dict | list | str:
"""Get details of a repository.
Args:
owner: Repository owner username.
repo: Repository name.
"""
return _get(f"/repos/{owner}/{repo}")
@mcp.tool()
def create_repo(
name: str,
description: str = "",
private: bool = False,
auto_init: bool = True,
default_branch: str = "main",
) -> dict | list | str:
"""Create a new repository for the authenticated user.
Args:
name: Repository name.
description: Repository description.
private: Whether the repo is private. Default False.
auto_init: Initialize with a README. Default True.
default_branch: Default branch name. Default "main".
"""
return _post(
"/user/repos",
{
"name": name,
"description": description,
"private": private,
"auto_init": auto_init,
"default_branch": default_branch,
},
)
@mcp.tool()
def delete_repo(owner: str, repo: str) -> dict | list | str:
"""Delete a repository. This is irreversible.
Args:
owner: Repository owner username.
repo: Repository name.
"""
return _delete(f"/repos/{owner}/{repo}")
@mcp.tool()
def search_repos(query: str, limit: int = 10) -> dict | list | str:
"""Search for repositories.
Args:
query: Search query string.
limit: Maximum results to return. Default 10.
"""
return _get("/repos/search", q=query, limit=limit)
@mcp.tool()
def fork_repo(owner: str, repo: str, new_name: str | None = None) -> dict | list | str:
"""Fork a repository.
Args:
owner: Owner of the repository to fork.
repo: Repository name to fork.
new_name: Optional new name for the fork.
"""
body: dict = {}
if new_name:
body["name"] = new_name
return _post(f"/repos/{owner}/{repo}/forks", body)
# ───────────────────────────────────────────────────────────────────
# Branches
# ───────────────────────────────────────────────────────────────────
@mcp.tool()
def list_branches(owner: str, repo: str) -> dict | list | str:
"""List branches of a repository.
Args:
owner: Repository owner.
repo: Repository name.
"""
return _get(f"/repos/{owner}/{repo}/branches")
@mcp.tool()
def get_branch(owner: str, repo: str, branch: str) -> dict | list | str:
"""Get details of a specific branch.
Args:
owner: Repository owner.
repo: Repository name.
branch: Branch name.
"""
return _get(f"/repos/{owner}/{repo}/branches/{branch}")
@mcp.tool()
def create_branch(
owner: str, repo: str, branch_name: str, old_branch: str = "main"
) -> dict | list | str:
"""Create a new branch.
Args:
owner: Repository owner.
repo: Repository name.
branch_name: Name of the new branch.
old_branch: Branch to create from. Default "main".
"""
return _post(
f"/repos/{owner}/{repo}/branches",
{
"new_branch_name": branch_name,
"old_branch_name": old_branch,
},
)
@mcp.tool()
def delete_branch(owner: str, repo: str, branch: str) -> dict | list | str:
"""Delete a branch.
Args:
owner: Repository owner.
repo: Repository name.
branch: Branch name to delete.
"""
return _delete(f"/repos/{owner}/{repo}/branches/{branch}")
# ───────────────────────────────────────────────────────────────────
# File contents
# ───────────────────────────────────────────────────────────────────
@mcp.tool()
def get_file(
owner: str, repo: str, filepath: str, ref: str | None = None
) -> dict | list | str:
"""Get the contents of a file from a repository. Returns decoded text content.
Args:
owner: Repository owner.
repo: Repository name.
filepath: Path to the file in the repo.
ref: Optional branch/tag/commit to read from.
"""
params = {}
if ref:
params["ref"] = ref
result = _get(f"/repos/{owner}/{repo}/contents/{filepath}", **params)
# Decode base64 content for convenience
if (
isinstance(result, dict)
and "content" in result
and result.get("encoding") == "base64"
):
try:
result["content"] = base64.b64decode(result["content"]).decode(
"utf-8", errors="replace"
)
result["encoding"] = "utf-8"
except Exception:
pass
return result
@mcp.tool()
def create_or_update_file(
owner: str,
repo: str,
filepath: str,
content: str,
message: str,
branch: str | None = None,
sha: str | None = None,
) -> dict | list | str:
"""Create or update a file in a repository.
To update an existing file you must provide the current sha (from get_file).
Args:
owner: Repository owner.
repo: Repository name.
filepath: Path where the file will be created/updated.
content: File content (plain text, will be base64-encoded).
message: Commit message.
branch: Branch to commit to. Uses repo default if omitted.
sha: Current SHA of the file (required for updates, omit for creation).
"""
body: dict = {
"content": base64.b64encode(content.encode()).decode(),
"message": message,
}
if branch:
body["branch"] = branch
if sha:
body["sha"] = sha
method = "PUT"
with _client() as c:
try:
resp = c.request(
method, f"/repos/{owner}/{repo}/contents/{filepath}", json=body
)
resp.raise_for_status()
return resp.json()
except httpx.HTTPStatusError as e:
return {
"error": f"HTTP {e.response.status_code}",
"detail": e.response.text[:500],
}
except httpx.RequestError as e:
return {"error": str(e)}
@mcp.tool()
def delete_file(
owner: str,
repo: str,
filepath: str,
message: str,
sha: str,
branch: str | None = None,
) -> dict | list | str:
"""Delete a file from a repository.
Args:
owner: Repository owner.
repo: Repository name.
filepath: Path of the file to delete.
message: Commit message.
sha: Current SHA of the file (from get_file).
branch: Branch to delete from. Uses repo default if omitted.
"""
body: dict = {"message": message, "sha": sha}
if branch:
body["branch"] = branch
with _client() as c:
try:
resp = c.request(
"DELETE", f"/repos/{owner}/{repo}/contents/{filepath}", json=body
)
resp.raise_for_status()
return resp.json()
except httpx.HTTPStatusError as e:
return {
"error": f"HTTP {e.response.status_code}",
"detail": e.response.text[:500],
}
except httpx.RequestError as e:
return {"error": str(e)}
@mcp.tool()
def list_directory(
owner: str, repo: str, path: str = "", ref: str | None = None
) -> dict | list | str:
"""List files and directories at a path in a repository.
Args:
owner: Repository owner.
repo: Repository name.
path: Directory path. Empty string for root.
ref: Optional branch/tag/commit.
"""
params = {}
if ref:
params["ref"] = ref
return _get(f"/repos/{owner}/{repo}/contents/{path}", **params)
# ───────────────────────────────────────────────────────────────────
# Commits
# ───────────────────────────────────────────────────────────────────
@mcp.tool()
def list_commits(
owner: str, repo: str, sha: str | None = None, limit: int = 10, page: int = 1
) -> dict | list | str:
"""List commits in a repository.
Args:
owner: Repository owner.
repo: Repository name.
sha: Optional branch/tag/commit SHA to list from.
limit: Number of commits per page. Default 10.
page: Page number. Default 1.
"""
params: dict = {"limit": limit, "page": page}
if sha:
params["sha"] = sha
return _get(f"/repos/{owner}/{repo}/git/commits", **params)
@mcp.tool()
def get_commit(owner: str, repo: str, sha: str) -> dict | list | str:
"""Get details of a specific commit.
Args:
owner: Repository owner.
repo: Repository name.
sha: Commit SHA.
"""
return _get(f"/repos/{owner}/{repo}/git/commits/{sha}")
# ───────────────────────────────────────────────────────────────────
# Issues
# ───────────────────────────────────────────────────────────────────
@mcp.tool()
def list_issues(
owner: str,
repo: str,
state: str = "open",
labels: str | None = None,
limit: int = 20,
page: int = 1,
) -> dict | list | str:
"""List issues in a repository.
Args:
owner: Repository owner.
repo: Repository name.
state: Filter by state: "open", "closed", or "all". Default "open".
labels: Comma-separated label names to filter by.
limit: Results per page. Default 20.
page: Page number. Default 1.
"""
params: dict = {"state": state, "limit": limit, "page": page, "type": "issues"}
if labels:
params["labels"] = labels
return _get(f"/repos/{owner}/{repo}/issues", **params)
@mcp.tool()
def get_issue(owner: str, repo: str, index: int) -> dict | list | str:
"""Get details of an issue.
Args:
owner: Repository owner.
repo: Repository name.
index: Issue number.
"""
return _get(f"/repos/{owner}/{repo}/issues/{index}")
@mcp.tool()
def create_issue(
owner: str,
repo: str,
title: str,
body: str = "",
labels: list[int] | None = None,
assignees: list[str] | None = None,
) -> dict | list | str:
"""Create a new issue.
Args:
owner: Repository owner.
repo: Repository name.
title: Issue title.
body: Issue body/description.
labels: List of label IDs to add.
assignees: List of usernames to assign.
"""
payload: dict = {"title": title, "body": body}
if labels:
payload["labels"] = labels
if assignees:
payload["assignees"] = assignees
return _post(f"/repos/{owner}/{repo}/issues", payload)
@mcp.tool()
def edit_issue(
owner: str,
repo: str,
index: int,
title: str | None = None,
body: str | None = None,
state: str | None = None,
) -> dict | list | str:
"""Edit an existing issue.
Args:
owner: Repository owner.
repo: Repository name.
index: Issue number.
title: New title (omit to keep current).
body: New body (omit to keep current).
state: Set to "open" or "closed".
"""
payload: dict = {}
if title is not None:
payload["title"] = title
if body is not None:
payload["body"] = body
if state is not None:
payload["state"] = state
return _patch(f"/repos/{owner}/{repo}/issues/{index}", payload)
@mcp.tool()
def list_issue_comments(owner: str, repo: str, index: int) -> dict | list | str:
"""List comments on an issue.
Args:
owner: Repository owner.
repo: Repository name.
index: Issue number.
"""
return _get(f"/repos/{owner}/{repo}/issues/{index}/comments")
@mcp.tool()
def create_issue_comment(
owner: str, repo: str, index: int, body: str
) -> dict | list | str:
"""Add a comment to an issue.
Args:
owner: Repository owner.
repo: Repository name.
index: Issue number.
body: Comment text.
"""
return _post(f"/repos/{owner}/{repo}/issues/{index}/comments", {"body": body})
# ───────────────────────────────────────────────────────────────────
# Labels
# ───────────────────────────────────────────────────────────────────
@mcp.tool()
def list_labels(owner: str, repo: str) -> dict | list | str:
"""List labels in a repository.
Args:
owner: Repository owner.
repo: Repository name.
"""
return _get(f"/repos/{owner}/{repo}/labels")
@mcp.tool()
def create_label(
owner: str, repo: str, name: str, color: str = "#0075ca", description: str = ""
) -> dict | list | str:
"""Create a label in a repository.
Args:
owner: Repository owner.
repo: Repository name.
name: Label name.
color: Hex color code. Default "#0075ca".
description: Label description.
"""
return _post(
f"/repos/{owner}/{repo}/labels",
{
"name": name,
"color": color,
"description": description,
},
)
# ───────────────────────────────────────────────────────────────────
# Pull Requests
# ───────────────────────────────────────────────────────────────────
@mcp.tool()
def list_pull_requests(
owner: str, repo: str, state: str = "open", limit: int = 20, page: int = 1
) -> dict | list | str:
"""List pull requests in a repository.
Args:
owner: Repository owner.
repo: Repository name.
state: Filter by state: "open", "closed", or "all". Default "open".
limit: Results per page. Default 20.
page: Page number. Default 1.
"""
return _get(f"/repos/{owner}/{repo}/pulls", state=state, limit=limit, page=page)
@mcp.tool()
def get_pull_request(owner: str, repo: str, index: int) -> dict | list | str:
"""Get details of a pull request.
Args:
owner: Repository owner.
repo: Repository name.
index: Pull request number.
"""
return _get(f"/repos/{owner}/{repo}/pulls/{index}")
@mcp.tool()
def create_pull_request(
owner: str,
repo: str,
title: str,
head: str,
base: str = "main",
body: str = "",
) -> dict | list | str:
"""Create a new pull request.
Args:
owner: Repository owner.
repo: Repository name.
title: PR title.
head: Source branch.
base: Target branch. Default "main".
body: PR description.
"""
return _post(
f"/repos/{owner}/{repo}/pulls",
{
"title": title,
"head": head,
"base": base,
"body": body,
},
)
@mcp.tool()
def merge_pull_request(
owner: str, repo: str, index: int, merge_style: str = "merge", message: str = ""
) -> dict | list | str:
"""Merge a pull request.
Args:
owner: Repository owner.
repo: Repository name.
index: Pull request number.
merge_style: One of "merge", "rebase", "rebase-merge", "squash". Default "merge".
message: Optional merge commit message.
"""
payload: dict = {"Do": merge_style}
if message:
payload["merge_message_field"] = message
return _post(f"/repos/{owner}/{repo}/pulls/{index}/merge", payload)
@mcp.tool()
def list_pr_comments(owner: str, repo: str, index: int) -> dict | list | str:
"""List review comments on a pull request.
Args:
owner: Repository owner.
repo: Repository name.
index: Pull request number.
"""
return _get(f"/repos/{owner}/{repo}/pulls/{index}/reviews")
@mcp.tool()
def get_pr_diff(owner: str, repo: str, index: int) -> dict | list | str:
"""Get the diff of a pull request.
Args:
owner: Repository owner.
repo: Repository name.
index: Pull request number.
"""
with _client() as c:
try:
resp = c.get(f"/repos/{owner}/{repo}/pulls/{index}.diff")
resp.raise_for_status()
return {"diff": resp.text[:50000]}
except httpx.HTTPStatusError as e:
return {
"error": f"HTTP {e.response.status_code}",
"detail": e.response.text[:500],
}
except httpx.RequestError as e:
return {"error": str(e)}
# ───────────────────────────────────────────────────────────────────
# Releases / Tags
# ───────────────────────────────────────────────────────────────────
@mcp.tool()
def list_releases(
owner: str, repo: str, limit: int = 10, page: int = 1
) -> dict | list | str:
"""List releases in a repository.
Args:
owner: Repository owner.
repo: Repository name.
limit: Results per page. Default 10.
page: Page number. Default 1.
"""
return _get(f"/repos/{owner}/{repo}/releases", limit=limit, page=page)
@mcp.tool()
def create_release(
owner: str,
repo: str,
tag_name: str,
name: str = "",
body: str = "",
draft: bool = False,
prerelease: bool = False,
target: str | None = None,
) -> dict | list | str:
"""Create a new release.
Args:
owner: Repository owner.
repo: Repository name.
tag_name: Tag for the release (e.g. "v1.0.0").
name: Release title.
body: Release notes.
draft: Whether this is a draft release.
prerelease: Whether this is a pre-release.
target: Branch or commit SHA to tag. Uses default branch if omitted.
"""
payload: dict = {
"tag_name": tag_name,
"name": name or tag_name,
"body": body,
"draft": draft,
"prerelease": prerelease,
}
if target:
payload["target_commitish"] = target
return _post(f"/repos/{owner}/{repo}/releases", payload)
@mcp.tool()
def list_tags(owner: str, repo: str) -> dict | list | str:
"""List tags in a repository.
Args:
owner: Repository owner.
repo: Repository name.
"""
return _get(f"/repos/{owner}/{repo}/tags")
# ───────────────────────────────────────────────────────────────────
# Organizations
# ───────────────────────────────────────────────────────────────────
@mcp.tool()
def list_orgs() -> dict | list | str:
"""List organizations the authenticated user belongs to."""
return _get("/user/orgs")
@mcp.tool()
def get_org(org: str) -> dict | list | str:
"""Get details of an organization.
Args:
org: Organization name.
"""
return _get(f"/orgs/{org}")
@mcp.tool()
def list_org_repos(org: str, limit: int = 20, page: int = 1) -> dict | list | str:
"""List repositories in an organization.
Args:
org: Organization name.
limit: Results per page. Default 20.
page: Page number. Default 1.
"""
return _get(f"/orgs/{org}/repos", limit=limit, page=page)
# ───────────────────────────────────────────────────────────────────
# Users
# ───────────────────────────────────────────────────────────────────
@mcp.tool()
def get_user(username: str) -> dict | list | str:
"""Get a user's profile.
Args:
username: The username to look up.
"""
return _get(f"/users/{username}")
@mcp.tool()
def list_user_repos(username: str, limit: int = 20, page: int = 1) -> dict | list | str:
"""List a user's repositories.
Args:
username: The username.
limit: Results per page. Default 20.
page: Page number. Default 1.
"""
return _get(f"/users/{username}/repos", limit=limit, page=page)
# ───────────────────────────────────────────────────────────────────
# Milestones
# ───────────────────────────────────────────────────────────────────
@mcp.tool()
def list_milestones(owner: str, repo: str, state: str = "open") -> dict | list | str:
"""List milestones in a repository.
Args:
owner: Repository owner.
repo: Repository name.
state: Filter by state: "open", "closed", or "all". Default "open".
"""
return _get(f"/repos/{owner}/{repo}/milestones", state=state)
@mcp.tool()
def create_milestone(
owner: str, repo: str, title: str, description: str = ""
) -> dict | list | str:
"""Create a milestone in a repository.
Args:
owner: Repository owner.
repo: Repository name.
title: Milestone title.
description: Milestone description.
"""
return _post(
f"/repos/{owner}/{repo}/milestones",
{
"title": title,
"description": description,
},
)
if __name__ == "__main__":
mcp.run()

212
mcps/websearch.py Normal file
View File

@@ -0,0 +1,212 @@
from __future__ import annotations
import re
from urllib.parse import quote_plus, urljoin
import httpx
from bs4 import BeautifulSoup
from markdownify import markdownify
from mcp.server.fastmcp import FastMCP
mcp = FastMCP(
"Web Search MCP",
instructions="MCP server for searching the web and fetching page content.",
)
_USER_AGENT = (
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36"
)
_HEADERS = {
"User-Agent": _USER_AGENT,
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "en-US,en;q=0.9",
}
_TIMEOUT = 15.0
def _get_client() -> httpx.Client:
return httpx.Client(
headers=_HEADERS,
timeout=_TIMEOUT,
follow_redirects=True,
)
# ---------------------------------------------------------------------------
# Search
# ---------------------------------------------------------------------------
def _parse_duckduckgo(html: str) -> list[dict]:
"""Parse DuckDuckGo HTML search results."""
soup = BeautifulSoup(html, "html.parser")
results: list[dict] = []
for result_div in soup.select(".result"):
title_tag = result_div.select_one(".result__a")
snippet_tag = result_div.select_one(".result__snippet")
if not title_tag:
continue
href = title_tag.get("href", "")
# DuckDuckGo wraps URLs in a redirect; extract the actual URL
if "uddg=" in str(href):
from urllib.parse import parse_qs, urlparse
parsed = urlparse(str(href))
qs = parse_qs(parsed.query)
href = qs.get("uddg", [str(href)])[0]
results.append(
{
"title": title_tag.get_text(strip=True),
"url": str(href),
"snippet": snippet_tag.get_text(strip=True) if snippet_tag else "",
}
)
return results
@mcp.tool()
def web_search(query: str, max_results: int = 10) -> list[dict]:
"""Search the web using DuckDuckGo and return results.
Args:
query: The search query string.
max_results: Maximum number of results to return. Default 10.
"""
url = f"https://html.duckduckgo.com/html/?q={quote_plus(query)}"
with _get_client() as client:
try:
resp = client.get(url)
resp.raise_for_status()
except httpx.HTTPStatusError as e:
return [
{"error": f"HTTP {e.response.status_code}: {e.response.reason_phrase}"}
]
except httpx.RequestError as e:
return [{"error": f"Request failed: {e}"}]
results = _parse_duckduckgo(resp.text)
return results[:max_results]
# ---------------------------------------------------------------------------
# Fetch page
# ---------------------------------------------------------------------------
_STRIP_TAGS = {
"script",
"style",
"nav",
"footer",
"header",
"noscript",
"svg",
"img",
"iframe",
}
def _clean_html(html: str, base_url: str) -> str:
"""Convert HTML to clean markdown."""
soup = BeautifulSoup(html, "html.parser")
# Remove unwanted tags
for tag in soup.find_all(_STRIP_TAGS):
tag.decompose()
# Resolve relative URLs
for a_tag in soup.find_all("a", href=True):
a_tag["href"] = urljoin(base_url, a_tag["href"])
# Convert to markdown
md = markdownify(str(soup), heading_style="ATX", strip=["img"])
# Collapse excessive blank lines
md = re.sub(r"\n{3,}", "\n\n", md)
return md.strip()
@mcp.tool()
def fetch_page(url: str, max_length: int = 20000) -> dict:
"""Fetch a web page and return its content as markdown.
Args:
url: The URL to fetch.
max_length: Maximum character length of returned content. Default 20000.
"""
with _get_client() as client:
try:
resp = client.get(url)
resp.raise_for_status()
except httpx.HTTPStatusError as e:
return {
"error": f"HTTP {e.response.status_code}: {e.response.reason_phrase}",
"url": url,
}
except httpx.RequestError as e:
return {"error": str(e), "url": url}
content_type = resp.headers.get("content-type", "")
if "text/html" not in content_type and "application/xhtml" not in content_type:
return {
"url": str(resp.url),
"content_type": content_type,
"content": resp.text[:max_length],
}
md = _clean_html(resp.text, str(resp.url))
truncated = len(md) > max_length
return {
"url": str(resp.url),
"title": _extract_title(resp.text),
"content": md[:max_length],
"truncated": truncated,
}
def _extract_title(html: str) -> str:
soup = BeautifulSoup(html, "html.parser")
tag = soup.find("title")
return tag.get_text(strip=True) if tag else ""
# ---------------------------------------------------------------------------
# Multi-search: search + fetch top results in one call
# ---------------------------------------------------------------------------
@mcp.tool()
def search_and_read(
query: str, num_pages: int = 3, max_page_length: int = 10000
) -> list[dict]:
"""Search the web and fetch the top results in one step.
This combines web_search and fetch_page: it searches for the query,
then fetches and converts the top results to markdown.
Args:
query: The search query string.
num_pages: Number of top results to fetch. Default 3.
max_page_length: Max characters per page. Default 10000.
"""
results = web_search(query, max_results=num_pages)
output: list[dict] = []
for r in results:
page = fetch_page(r["url"], max_length=max_page_length)
output.append(
{
"search_title": r["title"],
"search_snippet": r["snippet"],
**page,
}
)
return output
if __name__ == "__main__":
mcp.run()