server-architecture.md 20 KB

MCP Server Architecture

Protocol Overview

MCP uses JSON-RPC 2.0 as its wire protocol. Every message is a JSON object with:

  • Requests: {"jsonrpc": "2.0", "id": 1, "method": "tools/call", "params": {...}}
  • Responses: {"jsonrpc": "2.0", "id": 1, "result": {...}}
  • Notifications: {"jsonrpc": "2.0", "method": "notifications/progress", "params": {...}} (no id, no response expected)

The protocol is transport-agnostic. The same JSON-RPC messages flow over stdio pipes, SSE streams, or HTTP requests.

Server Lifecycle

┌──────────────┐     ┌──────────────────────┐     ┌─────────┐
│ Uninitialized │────→│    Initializing       │────→│  Ready  │
└──────────────┘     │                      │     └────┬────┘
                     │ Client sends          │          │
                     │ initialize request    │     ┌────┴────┐
                     │ Server responds with  │     │ Serving │
                     │ capabilities          │     │ requests│
                     │ Client sends          │     └────┬────┘
                     │ initialized notif     │          │
                     └──────────────────────┘     ┌────┴────┐
                                                  │Shutdown │
                                                  └─────────┘

Phase 1: Initialization

Client sends initialize with its capabilities and protocol version. Server responds with its own capabilities.

// Client → Server
{
  "jsonrpc": "2.0",
  "id": 1,
  "method": "initialize",
  "params": {
    "protocolVersion": "2025-03-26",
    "capabilities": {},
    "clientInfo": { "name": "claude-code", "version": "1.0.0" }
  }
}

// Server → Client
{
  "jsonrpc": "2.0",
  "id": 1,
  "result": {
    "protocolVersion": "2025-03-26",
    "capabilities": {
      "tools": { "listChanged": true },
      "resources": { "subscribe": true, "listChanged": true },
      "prompts": { "listChanged": true },
      "logging": {}
    },
    "serverInfo": { "name": "my-server", "version": "1.0.0" }
  }
}

Phase 2: Initialized Notification

Client sends notifications/initialized to confirm. Server transitions to ready state.

Phase 3: Serving

Server handles requests: tools/list, tools/call, resources/list, resources/read, prompts/list, prompts/get, completion/complete.

Phase 4: Shutdown

Transport closes (process exit for stdio, connection close for HTTP/SSE). Server cleans up resources.

Capability Negotiation

Servers declare what they support during initialization:

Capability Meaning Sub-capabilities
tools Server offers tools listChanged - notify on tool list changes
resources Server offers resources subscribe - clients can subscribe to changes; listChanged
prompts Server offers prompts listChanged - notify on prompt list changes
logging Server can emit log messages (none)
experimental Experimental features Varies by implementation

FastMCP Server Setup (Python)

FastMCP is the recommended high-level API for Python MCP servers.

Basic Server

from mcp.server.fastmcp import FastMCP

# Create server with metadata
mcp = FastMCP(
    "my-server",
    version="1.0.0",
    description="A server that does useful things",
)

@mcp.tool()
def greet(name: str) -> str:
    """Greet a user by name."""
    return f"Hello, {name}!"

if __name__ == "__main__":
    mcp.run()  # stdio by default

Server with Dependencies

from mcp.server.fastmcp import FastMCP
import httpx

mcp = FastMCP("api-server")

# Dependencies are injected per-request via FastMCP's Context
@mcp.tool()
async def fetch_data(url: str) -> str:
    """Fetch data from a URL."""
    async with httpx.AsyncClient() as client:
        resp = await client.get(url, timeout=30.0)
        resp.raise_for_status()
        return resp.text

Lifespan Handlers

Use lifespan to manage resources that live for the server's entire lifetime:

from contextlib import asynccontextmanager
from mcp.server.fastmcp import FastMCP

@asynccontextmanager
async def lifespan(server: FastMCP):
    """Initialize and cleanup server resources."""
    # Startup: create connection pools, load config
    db = await create_db_pool()
    server.state["db"] = db
    try:
        yield
    finally:
        # Shutdown: cleanup
        await db.close()

mcp = FastMCP("db-server", lifespan=lifespan)

@mcp.tool()
async def query_db(sql: str, ctx: Context) -> str:
    """Run a read-only SQL query."""
    db = ctx.server.state["db"]
    results = await db.fetch(sql)
    return json.dumps(results, default=str)

FastMCP Context Object

The Context parameter gives tools access to server internals:

from mcp.server.fastmcp import FastMCP, Context

mcp = FastMCP("context-demo")

@mcp.tool()
async def long_operation(items: list[str], ctx: Context) -> str:
    """Process items with progress reporting."""
    results = []
    for i, item in enumerate(items):
        await ctx.report_progress(i, len(items))
        result = await process_item(item)
        results.append(result)
        await ctx.info(f"Processed {item}")  # Log to client
    return json.dumps(results)

Context provides:

  • ctx.report_progress(current, total) - send progress notifications
  • ctx.info(message), ctx.debug(message), ctx.warning(message), ctx.error(message) - logging
  • ctx.read_resource(uri) - read another resource from within a tool
  • ctx.server - access the FastMCP server instance and its state
  • ctx.request_context - access the low-level request context and session

Running with Different Transports

# stdio (default) - for Claude Desktop / Claude Code
mcp.run()
mcp.run(transport="stdio")

# SSE - for web clients
mcp.run(transport="sse", host="0.0.0.0", port=8000)

# Streamable HTTP - for production
mcp.run(transport="streamable-http", host="0.0.0.0", port=8000)

TypeScript SDK Server Setup

Basic Server with McpServer

import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
import { z } from "zod";

const server = new McpServer({
  name: "my-server",
  version: "1.0.0",
});

// Tools, resources, and prompts registered via server methods
server.tool("greet", "Greet a user", { name: z.string() }, async ({ name }) => ({
  content: [{ type: "text", text: `Hello, ${name}!` }],
}));

const transport = new StdioServerTransport();
await server.connect(transport);

Low-Level Server API

For maximum control, use the Server class directly:

import { Server } from "@modelcontextprotocol/sdk/server/index.js";
import {
  CallToolRequestSchema,
  ListToolsRequestSchema,
} from "@modelcontextprotocol/sdk/types.js";

const server = new Server(
  { name: "low-level-server", version: "1.0.0" },
  { capabilities: { tools: { listChanged: true } } }
);

server.setRequestHandler(ListToolsRequestSchema, async () => ({
  tools: [
    {
      name: "greet",
      description: "Greet a user",
      inputSchema: {
        type: "object" as const,
        properties: {
          name: { type: "string", description: "User name" },
        },
        required: ["name"],
      },
    },
  ],
}));

server.setRequestHandler(CallToolRequestSchema, async (request) => {
  if (request.params.name === "greet") {
    const { name } = request.params.arguments as { name: string };
    return {
      content: [{ type: "text", text: `Hello, ${name}!` }],
    };
  }
  throw new McpError(ErrorCode.MethodNotFound, `Unknown tool: ${request.params.name}`);
});

McpError for Typed Errors

import { McpError, ErrorCode } from "@modelcontextprotocol/sdk/types.js";

// Standard JSON-RPC error codes
throw new McpError(ErrorCode.InvalidParams, "Missing required field: query");
throw new McpError(ErrorCode.MethodNotFound, "Unknown tool: foo");
throw new McpError(ErrorCode.InternalError, "Database connection failed");

// Custom error codes (use negative numbers per JSON-RPC spec)
throw new McpError(-32001, "Rate limit exceeded");

Multi-Tool Server Organization

By Domain (Recommended)

# tools/files.py
from mcp.server.fastmcp import FastMCP

def register_file_tools(mcp: FastMCP):
    @mcp.tool()
    def read_file(path: str) -> str:
        """Read contents of a file."""
        with open(path) as f:
            return f.read()

    @mcp.tool()
    def write_file(path: str, content: str) -> str:
        """Write content to a file."""
        with open(path, "w") as f:
            f.write(content)
        return f"Wrote {len(content)} bytes to {path}"

    @mcp.tool()
    def list_files(directory: str) -> str:
        """List files in a directory."""
        import os
        entries = os.listdir(directory)
        return "\n".join(entries)

# tools/database.py
def register_db_tools(mcp: FastMCP):
    @mcp.tool()
    def query(sql: str) -> str:
        """Execute a read-only SQL query."""
        ...

    @mcp.tool()
    def list_tables() -> str:
        """List all database tables."""
        ...

# server.py
from mcp.server.fastmcp import FastMCP
from tools.files import register_file_tools
from tools.database import register_db_tools

mcp = FastMCP("multi-tool-server")
register_file_tools(mcp)
register_db_tools(mcp)

if __name__ == "__main__":
    mcp.run()

Tool Namespacing

Prefix tool names to avoid collisions when multiple servers are active:

@mcp.tool(name="myapp_search")  # Not just "search"
def search(query: str) -> str:
    ...

@mcp.tool(name="myapp_create_item")  # Not just "create"
def create_item(title: str) -> str:
    ...

Middleware Patterns

Request Logging

from mcp.server.fastmcp import FastMCP
import logging

logger = logging.getLogger("mcp-server")
mcp = FastMCP("logged-server")

# Use lifespan for server-level middleware
@asynccontextmanager
async def lifespan(server: FastMCP):
    logger.info("Server starting up")
    yield
    logger.info("Server shutting down")

# For per-tool logging, use a decorator
def logged_tool(func):
    async def wrapper(*args, **kwargs):
        logger.info(f"Tool called: {func.__name__} with {kwargs}")
        try:
            result = await func(*args, **kwargs) if asyncio.iscoroutinefunction(func) else func(*args, **kwargs)
            logger.info(f"Tool {func.__name__} succeeded")
            return result
        except Exception as e:
            logger.error(f"Tool {func.__name__} failed: {e}")
            raise
    wrapper.__name__ = func.__name__
    wrapper.__doc__ = func.__doc__
    wrapper.__annotations__ = func.__annotations__
    return wrapper

Rate Limiting

import time
from collections import defaultdict

class RateLimiter:
    def __init__(self, max_calls: int, window_seconds: int):
        self.max_calls = max_calls
        self.window = window_seconds
        self.calls: dict[str, list[float]] = defaultdict(list)

    def check(self, key: str) -> bool:
        now = time.time()
        self.calls[key] = [t for t in self.calls[key] if now - t < self.window]
        if len(self.calls[key]) >= self.max_calls:
            return False
        self.calls[key].append(now)
        return True

rate_limiter = RateLimiter(max_calls=60, window_seconds=60)

@mcp.tool()
async def rate_limited_api_call(endpoint: str, ctx: Context) -> str:
    """Call an API with rate limiting."""
    if not rate_limiter.check("api"):
        return "Rate limit exceeded. Please wait before making more requests."
    async with httpx.AsyncClient() as client:
        resp = await client.get(f"https://api.example.com/{endpoint}")
        return resp.text

Caching Middleware

import time
from functools import wraps

def cached(ttl_seconds: int = 300):
    """Cache tool results for the given TTL."""
    cache: dict[str, tuple[float, str]] = {}

    def decorator(func):
        @wraps(func)
        async def wrapper(*args, **kwargs):
            key = f"{func.__name__}:{args}:{kwargs}"
            if key in cache:
                cached_at, result = cache[key]
                if time.time() - cached_at < ttl_seconds:
                    return result
            result = await func(*args, **kwargs) if asyncio.iscoroutinefunction(func) else func(*args, **kwargs)
            cache[key] = (time.time(), result)
            return result
        return wrapper
    return decorator

@mcp.tool()
@cached(ttl_seconds=60)
async def get_weather(city: str) -> str:
    """Get current weather for a city (cached for 60s)."""
    async with httpx.AsyncClient() as client:
        resp = await client.get(f"https://weather.api/current?city={city}")
        return resp.text

Error Handling

Python Error Handling

from mcp.server.fastmcp import FastMCP

mcp = FastMCP("robust-server")

@mcp.tool()
async def safe_operation(input: str) -> str:
    """Perform an operation with proper error handling."""
    try:
        result = await do_something(input)
        return json.dumps({"status": "success", "data": result})
    except ValueError as e:
        # User-facing error: return as tool result with error flag
        # FastMCP handles this by returning content with isError=True
        raise ValueError(f"Invalid input: {e}")
    except httpx.HTTPError as e:
        raise RuntimeError(f"API request failed: {e}")
    except Exception as e:
        # Log internally, return safe message
        logger.exception("Unexpected error in safe_operation")
        raise RuntimeError("An unexpected error occurred. Check server logs.")

TypeScript Error Handling

server.tool("safe_operation", "Do something safely", { input: z.string() }, async ({ input }) => {
  try {
    const result = await doSomething(input);
    return { content: [{ type: "text", text: JSON.stringify(result) }] };
  } catch (error) {
    // Return error as tool result (visible to LLM)
    return {
      content: [{ type: "text", text: `Error: ${error.message}` }],
      isError: true,
    };
  }
});

Error Categories

Error Type Handling Example
Invalid input Return clear message, isError: true "Missing required field: query"
Auth failure Return message suggesting config check "API key invalid. Check MY_API_KEY env var"
External API error Return status + retry suggestion "GitHub API returned 503. Try again in 30s"
Internal error Log details, return safe message "Internal error. Check server logs"
Timeout Return partial results if available "Operation timed out. Partial results: ..."

Logging

Python Logging

from mcp.server.fastmcp import FastMCP, Context

mcp = FastMCP("logged-server")

@mcp.tool()
async def debug_tool(data: str, ctx: Context) -> str:
    """A tool with comprehensive logging."""
    await ctx.debug(f"Received data: {data[:100]}")
    await ctx.info("Processing started")

    try:
        result = process(data)
        await ctx.info(f"Processing complete: {len(result)} items")
        return json.dumps(result)
    except Exception as e:
        await ctx.error(f"Processing failed: {e}")
        raise

Log Levels

Level Use For
debug Detailed diagnostic info, request/response bodies
info Normal operations, progress updates
warning Recoverable issues, deprecation notices
error Failures that affect the current operation

Graceful Shutdown

import signal
from contextlib import asynccontextmanager
from mcp.server.fastmcp import FastMCP

@asynccontextmanager
async def lifespan(server: FastMCP):
    # Startup
    db_pool = await create_pool()
    http_client = httpx.AsyncClient()
    server.state["db"] = db_pool
    server.state["http"] = http_client

    try:
        yield
    finally:
        # Cleanup: close connections, flush buffers
        await http_client.aclose()
        await db_pool.close()
        logger.info("Server shutdown complete")

mcp = FastMCP("graceful-server", lifespan=lifespan)

Connection and Session Management

Session Isolation

Each client connection gets its own session. Do not share mutable state between sessions without synchronization:

# BAD: Shared mutable state without locks
results_cache = {}  # All sessions share this dict unsafely

# GOOD: Per-session state via context
@mcp.tool()
async def track_calls(ctx: Context) -> str:
    """Track how many times this session called this tool."""
    session = ctx.request_context.session
    if not hasattr(session, "call_count"):
        session.call_count = 0
    session.call_count += 1
    return f"This session has made {session.call_count} calls"

# GOOD: Shared state with proper locking
import asyncio
_lock = asyncio.Lock()
_shared_cache: dict = {}

@mcp.tool()
async def cached_lookup(key: str) -> str:
    async with _lock:
        if key not in _shared_cache:
            _shared_cache[key] = await expensive_lookup(key)
        return _shared_cache[key]

Notifying Clients of Changes

When your server's available tools, resources, or prompts change at runtime:

@mcp.tool()
async def enable_advanced_tools(ctx: Context) -> str:
    """Dynamically register new tools and notify the client."""
    register_advanced_tools(ctx.server)
    # Notify client that tool list has changed
    await ctx.request_context.session.send_resource_list_changed()
    return "Advanced tools enabled"

Project Structure

Python (Recommended Layout)

my-mcp-server/
├── pyproject.toml
├── src/
│   └── my_server/
│       ├── __init__.py
│       ├── server.py          # FastMCP instance + main()
│       ├── tools/
│       │   ├── __init__.py
│       │   ├── files.py       # File operation tools
│       │   └── api.py         # API wrapper tools
│       ├── resources/
│       │   ├── __init__.py
│       │   └── config.py      # Configuration resources
│       └── prompts/
│           ├── __init__.py
│           └── workflows.py   # Prompt templates
└── tests/
    ├── test_tools.py
    └── test_resources.py

TypeScript (Recommended Layout)

my-mcp-server/
├── package.json
├── tsconfig.json
├── src/
│   ├── index.ts               # Server setup + transport
│   ├── tools/
│   │   ├── files.ts
│   │   └── api.ts
│   ├── resources/
│   │   └── config.ts
│   └── prompts/
│       └── workflows.ts
└── tests/
    ├── tools.test.ts
    └── resources.test.ts

Claude Desktop Configuration

{
  "mcpServers": {
    "my-python-server": {
      "command": "uv",
      "args": ["run", "--directory", "/path/to/my-server", "python", "-m", "my_server"],
      "env": {
        "API_KEY": "your-key",
        "DATABASE_URL": "postgresql://localhost/mydb"
      }
    },
    "my-ts-server": {
      "command": "npx",
      "args": ["tsx", "/path/to/my-server/src/index.ts"],
      "env": {
        "API_KEY": "your-key"
      }
    }
  }
}

Claude Code Configuration

In .claude/settings.json or project settings:

{
  "mcpServers": {
    "my-server": {
      "command": "uv",
      "args": ["run", "--directory", "/path/to/server", "python", "server.py"],
      "env": {
        "API_KEY": "your-key"
      }
    }
  }
}

Or via CLI:

claude mcp add my-server -- uv run --directory /path/to/server python server.py