MCP uses JSON-RPC 2.0 as its wire protocol. Every message is a JSON object with:
{"jsonrpc": "2.0", "id": 1, "method": "tools/call", "params": {...}}{"jsonrpc": "2.0", "id": 1, "result": {...}}{"jsonrpc": "2.0", "method": "notifications/progress", "params": {...}} (no id, no response expected)The protocol is transport-agnostic. The same JSON-RPC messages flow over stdio pipes, SSE streams, or HTTP requests.
┌──────────────┐ ┌──────────────────────┐ ┌─────────┐
│ Uninitialized │────→│ Initializing │────→│ Ready │
└──────────────┘ │ │ └────┬────┘
│ Client sends │ │
│ initialize request │ ┌────┴────┐
│ Server responds with │ │ Serving │
│ capabilities │ │ requests│
│ Client sends │ └────┬────┘
│ initialized notif │ │
└──────────────────────┘ ┌────┴────┐
│Shutdown │
└─────────┘
Client sends initialize with its capabilities and protocol version. Server responds with its own capabilities.
// Client → Server
{
"jsonrpc": "2.0",
"id": 1,
"method": "initialize",
"params": {
"protocolVersion": "2025-03-26",
"capabilities": {},
"clientInfo": { "name": "claude-code", "version": "1.0.0" }
}
}
// Server → Client
{
"jsonrpc": "2.0",
"id": 1,
"result": {
"protocolVersion": "2025-03-26",
"capabilities": {
"tools": { "listChanged": true },
"resources": { "subscribe": true, "listChanged": true },
"prompts": { "listChanged": true },
"logging": {}
},
"serverInfo": { "name": "my-server", "version": "1.0.0" }
}
}
Client sends notifications/initialized to confirm. Server transitions to ready state.
Server handles requests: tools/list, tools/call, resources/list, resources/read, prompts/list, prompts/get, completion/complete.
Transport closes (process exit for stdio, connection close for HTTP/SSE). Server cleans up resources.
Servers declare what they support during initialization:
| Capability | Meaning | Sub-capabilities |
|---|---|---|
tools |
Server offers tools | listChanged - notify on tool list changes |
resources |
Server offers resources | subscribe - clients can subscribe to changes; listChanged |
prompts |
Server offers prompts | listChanged - notify on prompt list changes |
logging |
Server can emit log messages | (none) |
experimental |
Experimental features | Varies by implementation |
FastMCP is the recommended high-level API for Python MCP servers.
from mcp.server.fastmcp import FastMCP
# Create server with metadata
mcp = FastMCP(
"my-server",
version="1.0.0",
description="A server that does useful things",
)
@mcp.tool()
def greet(name: str) -> str:
"""Greet a user by name."""
return f"Hello, {name}!"
if __name__ == "__main__":
mcp.run() # stdio by default
from mcp.server.fastmcp import FastMCP
import httpx
mcp = FastMCP("api-server")
# Dependencies are injected per-request via FastMCP's Context
@mcp.tool()
async def fetch_data(url: str) -> str:
"""Fetch data from a URL."""
async with httpx.AsyncClient() as client:
resp = await client.get(url, timeout=30.0)
resp.raise_for_status()
return resp.text
Use lifespan to manage resources that live for the server's entire lifetime:
from contextlib import asynccontextmanager
from mcp.server.fastmcp import FastMCP
@asynccontextmanager
async def lifespan(server: FastMCP):
"""Initialize and cleanup server resources."""
# Startup: create connection pools, load config
db = await create_db_pool()
server.state["db"] = db
try:
yield
finally:
# Shutdown: cleanup
await db.close()
mcp = FastMCP("db-server", lifespan=lifespan)
@mcp.tool()
async def query_db(sql: str, ctx: Context) -> str:
"""Run a read-only SQL query."""
db = ctx.server.state["db"]
results = await db.fetch(sql)
return json.dumps(results, default=str)
The Context parameter gives tools access to server internals:
from mcp.server.fastmcp import FastMCP, Context
mcp = FastMCP("context-demo")
@mcp.tool()
async def long_operation(items: list[str], ctx: Context) -> str:
"""Process items with progress reporting."""
results = []
for i, item in enumerate(items):
await ctx.report_progress(i, len(items))
result = await process_item(item)
results.append(result)
await ctx.info(f"Processed {item}") # Log to client
return json.dumps(results)
Context provides:
ctx.report_progress(current, total) - send progress notificationsctx.info(message), ctx.debug(message), ctx.warning(message), ctx.error(message) - loggingctx.read_resource(uri) - read another resource from within a toolctx.server - access the FastMCP server instance and its statectx.request_context - access the low-level request context and session# stdio (default) - for Claude Desktop / Claude Code
mcp.run()
mcp.run(transport="stdio")
# SSE - for web clients
mcp.run(transport="sse", host="0.0.0.0", port=8000)
# Streamable HTTP - for production
mcp.run(transport="streamable-http", host="0.0.0.0", port=8000)
import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
import { z } from "zod";
const server = new McpServer({
name: "my-server",
version: "1.0.0",
});
// Tools, resources, and prompts registered via server methods
server.tool("greet", "Greet a user", { name: z.string() }, async ({ name }) => ({
content: [{ type: "text", text: `Hello, ${name}!` }],
}));
const transport = new StdioServerTransport();
await server.connect(transport);
For maximum control, use the Server class directly:
import { Server } from "@modelcontextprotocol/sdk/server/index.js";
import {
CallToolRequestSchema,
ListToolsRequestSchema,
} from "@modelcontextprotocol/sdk/types.js";
const server = new Server(
{ name: "low-level-server", version: "1.0.0" },
{ capabilities: { tools: { listChanged: true } } }
);
server.setRequestHandler(ListToolsRequestSchema, async () => ({
tools: [
{
name: "greet",
description: "Greet a user",
inputSchema: {
type: "object" as const,
properties: {
name: { type: "string", description: "User name" },
},
required: ["name"],
},
},
],
}));
server.setRequestHandler(CallToolRequestSchema, async (request) => {
if (request.params.name === "greet") {
const { name } = request.params.arguments as { name: string };
return {
content: [{ type: "text", text: `Hello, ${name}!` }],
};
}
throw new McpError(ErrorCode.MethodNotFound, `Unknown tool: ${request.params.name}`);
});
import { McpError, ErrorCode } from "@modelcontextprotocol/sdk/types.js";
// Standard JSON-RPC error codes
throw new McpError(ErrorCode.InvalidParams, "Missing required field: query");
throw new McpError(ErrorCode.MethodNotFound, "Unknown tool: foo");
throw new McpError(ErrorCode.InternalError, "Database connection failed");
// Custom error codes (use negative numbers per JSON-RPC spec)
throw new McpError(-32001, "Rate limit exceeded");
# tools/files.py
from mcp.server.fastmcp import FastMCP
def register_file_tools(mcp: FastMCP):
@mcp.tool()
def read_file(path: str) -> str:
"""Read contents of a file."""
with open(path) as f:
return f.read()
@mcp.tool()
def write_file(path: str, content: str) -> str:
"""Write content to a file."""
with open(path, "w") as f:
f.write(content)
return f"Wrote {len(content)} bytes to {path}"
@mcp.tool()
def list_files(directory: str) -> str:
"""List files in a directory."""
import os
entries = os.listdir(directory)
return "\n".join(entries)
# tools/database.py
def register_db_tools(mcp: FastMCP):
@mcp.tool()
def query(sql: str) -> str:
"""Execute a read-only SQL query."""
...
@mcp.tool()
def list_tables() -> str:
"""List all database tables."""
...
# server.py
from mcp.server.fastmcp import FastMCP
from tools.files import register_file_tools
from tools.database import register_db_tools
mcp = FastMCP("multi-tool-server")
register_file_tools(mcp)
register_db_tools(mcp)
if __name__ == "__main__":
mcp.run()
Prefix tool names to avoid collisions when multiple servers are active:
@mcp.tool(name="myapp_search") # Not just "search"
def search(query: str) -> str:
...
@mcp.tool(name="myapp_create_item") # Not just "create"
def create_item(title: str) -> str:
...
from mcp.server.fastmcp import FastMCP
import logging
logger = logging.getLogger("mcp-server")
mcp = FastMCP("logged-server")
# Use lifespan for server-level middleware
@asynccontextmanager
async def lifespan(server: FastMCP):
logger.info("Server starting up")
yield
logger.info("Server shutting down")
# For per-tool logging, use a decorator
def logged_tool(func):
async def wrapper(*args, **kwargs):
logger.info(f"Tool called: {func.__name__} with {kwargs}")
try:
result = await func(*args, **kwargs) if asyncio.iscoroutinefunction(func) else func(*args, **kwargs)
logger.info(f"Tool {func.__name__} succeeded")
return result
except Exception as e:
logger.error(f"Tool {func.__name__} failed: {e}")
raise
wrapper.__name__ = func.__name__
wrapper.__doc__ = func.__doc__
wrapper.__annotations__ = func.__annotations__
return wrapper
import time
from collections import defaultdict
class RateLimiter:
def __init__(self, max_calls: int, window_seconds: int):
self.max_calls = max_calls
self.window = window_seconds
self.calls: dict[str, list[float]] = defaultdict(list)
def check(self, key: str) -> bool:
now = time.time()
self.calls[key] = [t for t in self.calls[key] if now - t < self.window]
if len(self.calls[key]) >= self.max_calls:
return False
self.calls[key].append(now)
return True
rate_limiter = RateLimiter(max_calls=60, window_seconds=60)
@mcp.tool()
async def rate_limited_api_call(endpoint: str, ctx: Context) -> str:
"""Call an API with rate limiting."""
if not rate_limiter.check("api"):
return "Rate limit exceeded. Please wait before making more requests."
async with httpx.AsyncClient() as client:
resp = await client.get(f"https://api.example.com/{endpoint}")
return resp.text
import time
from functools import wraps
def cached(ttl_seconds: int = 300):
"""Cache tool results for the given TTL."""
cache: dict[str, tuple[float, str]] = {}
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
key = f"{func.__name__}:{args}:{kwargs}"
if key in cache:
cached_at, result = cache[key]
if time.time() - cached_at < ttl_seconds:
return result
result = await func(*args, **kwargs) if asyncio.iscoroutinefunction(func) else func(*args, **kwargs)
cache[key] = (time.time(), result)
return result
return wrapper
return decorator
@mcp.tool()
@cached(ttl_seconds=60)
async def get_weather(city: str) -> str:
"""Get current weather for a city (cached for 60s)."""
async with httpx.AsyncClient() as client:
resp = await client.get(f"https://weather.api/current?city={city}")
return resp.text
from mcp.server.fastmcp import FastMCP
mcp = FastMCP("robust-server")
@mcp.tool()
async def safe_operation(input: str) -> str:
"""Perform an operation with proper error handling."""
try:
result = await do_something(input)
return json.dumps({"status": "success", "data": result})
except ValueError as e:
# User-facing error: return as tool result with error flag
# FastMCP handles this by returning content with isError=True
raise ValueError(f"Invalid input: {e}")
except httpx.HTTPError as e:
raise RuntimeError(f"API request failed: {e}")
except Exception as e:
# Log internally, return safe message
logger.exception("Unexpected error in safe_operation")
raise RuntimeError("An unexpected error occurred. Check server logs.")
server.tool("safe_operation", "Do something safely", { input: z.string() }, async ({ input }) => {
try {
const result = await doSomething(input);
return { content: [{ type: "text", text: JSON.stringify(result) }] };
} catch (error) {
// Return error as tool result (visible to LLM)
return {
content: [{ type: "text", text: `Error: ${error.message}` }],
isError: true,
};
}
});
| Error Type | Handling | Example |
|---|---|---|
| Invalid input | Return clear message, isError: true |
"Missing required field: query" |
| Auth failure | Return message suggesting config check | "API key invalid. Check MY_API_KEY env var" |
| External API error | Return status + retry suggestion | "GitHub API returned 503. Try again in 30s" |
| Internal error | Log details, return safe message | "Internal error. Check server logs" |
| Timeout | Return partial results if available | "Operation timed out. Partial results: ..." |
from mcp.server.fastmcp import FastMCP, Context
mcp = FastMCP("logged-server")
@mcp.tool()
async def debug_tool(data: str, ctx: Context) -> str:
"""A tool with comprehensive logging."""
await ctx.debug(f"Received data: {data[:100]}")
await ctx.info("Processing started")
try:
result = process(data)
await ctx.info(f"Processing complete: {len(result)} items")
return json.dumps(result)
except Exception as e:
await ctx.error(f"Processing failed: {e}")
raise
| Level | Use For |
|---|---|
debug |
Detailed diagnostic info, request/response bodies |
info |
Normal operations, progress updates |
warning |
Recoverable issues, deprecation notices |
error |
Failures that affect the current operation |
import signal
from contextlib import asynccontextmanager
from mcp.server.fastmcp import FastMCP
@asynccontextmanager
async def lifespan(server: FastMCP):
# Startup
db_pool = await create_pool()
http_client = httpx.AsyncClient()
server.state["db"] = db_pool
server.state["http"] = http_client
try:
yield
finally:
# Cleanup: close connections, flush buffers
await http_client.aclose()
await db_pool.close()
logger.info("Server shutdown complete")
mcp = FastMCP("graceful-server", lifespan=lifespan)
Each client connection gets its own session. Do not share mutable state between sessions without synchronization:
# BAD: Shared mutable state without locks
results_cache = {} # All sessions share this dict unsafely
# GOOD: Per-session state via context
@mcp.tool()
async def track_calls(ctx: Context) -> str:
"""Track how many times this session called this tool."""
session = ctx.request_context.session
if not hasattr(session, "call_count"):
session.call_count = 0
session.call_count += 1
return f"This session has made {session.call_count} calls"
# GOOD: Shared state with proper locking
import asyncio
_lock = asyncio.Lock()
_shared_cache: dict = {}
@mcp.tool()
async def cached_lookup(key: str) -> str:
async with _lock:
if key not in _shared_cache:
_shared_cache[key] = await expensive_lookup(key)
return _shared_cache[key]
When your server's available tools, resources, or prompts change at runtime:
@mcp.tool()
async def enable_advanced_tools(ctx: Context) -> str:
"""Dynamically register new tools and notify the client."""
register_advanced_tools(ctx.server)
# Notify client that tool list has changed
await ctx.request_context.session.send_resource_list_changed()
return "Advanced tools enabled"
my-mcp-server/
├── pyproject.toml
├── src/
│ └── my_server/
│ ├── __init__.py
│ ├── server.py # FastMCP instance + main()
│ ├── tools/
│ │ ├── __init__.py
│ │ ├── files.py # File operation tools
│ │ └── api.py # API wrapper tools
│ ├── resources/
│ │ ├── __init__.py
│ │ └── config.py # Configuration resources
│ └── prompts/
│ ├── __init__.py
│ └── workflows.py # Prompt templates
└── tests/
├── test_tools.py
└── test_resources.py
my-mcp-server/
├── package.json
├── tsconfig.json
├── src/
│ ├── index.ts # Server setup + transport
│ ├── tools/
│ │ ├── files.ts
│ │ └── api.ts
│ ├── resources/
│ │ └── config.ts
│ └── prompts/
│ └── workflows.ts
└── tests/
├── tools.test.ts
└── resources.test.ts
{
"mcpServers": {
"my-python-server": {
"command": "uv",
"args": ["run", "--directory", "/path/to/my-server", "python", "-m", "my_server"],
"env": {
"API_KEY": "your-key",
"DATABASE_URL": "postgresql://localhost/mydb"
}
},
"my-ts-server": {
"command": "npx",
"args": ["tsx", "/path/to/my-server/src/index.ts"],
"env": {
"API_KEY": "your-key"
}
}
}
}
In .claude/settings.json or project settings:
{
"mcpServers": {
"my-server": {
"command": "uv",
"args": ["run", "--directory", "/path/to/server", "python", "server.py"],
"env": {
"API_KEY": "your-key"
}
}
}
}
Or via CLI:
claude mcp add my-server -- uv run --directory /path/to/server python server.py