testing-debugging.md 25 KB

Testing and Debugging

MCP Inspector

The MCP Inspector is an interactive debugging tool for testing MCP servers without a full client setup.

Installation and Usage

# Run directly with npx (no install needed)
npx @modelcontextprotocol/inspector

# Connect to a stdio server
npx @modelcontextprotocol/inspector -- uv run python server.py

# Connect to a stdio server with env vars
npx @modelcontextprotocol/inspector -e API_KEY=sk-key -- uv run python server.py

# Connect to an SSE server
npx @modelcontextprotocol/inspector --url http://localhost:8000/sse

# Connect to a Streamable HTTP server
npx @modelcontextprotocol/inspector --url http://localhost:8000/mcp

What You Can Do with Inspector

Feature How
List tools Click "Tools" tab to see all registered tools
Call tools Fill in arguments and click "Call"
List resources Click "Resources" tab
Read resources Click any resource to see its contents
List prompts Click "Prompts" tab
Get prompts Fill in arguments and see rendered prompt
View messages "Messages" tab shows raw JSON-RPC traffic
Test notifications See server notifications in real-time

Inspecting Protocol Messages

The Inspector shows raw JSON-RPC messages. Use this to verify:

  • Request format matches the MCP specification
  • Response content is properly structured
  • Error codes and messages are correct
  • Notifications are sent at the right times
Example Inspector message log:

→ {"jsonrpc":"2.0","id":1,"method":"initialize","params":{...}}
← {"jsonrpc":"2.0","id":1,"result":{"capabilities":{"tools":{}},...}}
→ {"jsonrpc":"2.0","method":"notifications/initialized"}
→ {"jsonrpc":"2.0","id":2,"method":"tools/list"}
← {"jsonrpc":"2.0","id":2,"result":{"tools":[{"name":"search",...}]}}
→ {"jsonrpc":"2.0","id":3,"method":"tools/call","params":{"name":"search","arguments":{"query":"test"}}}
← {"jsonrpc":"2.0","id":3,"result":{"content":[{"type":"text","text":"..."}]}}

Unit Testing Tools (Python)

Testing with pytest

# test_tools.py
import pytest
import json
from unittest.mock import AsyncMock, patch, MagicMock

# Test tool functions directly (they're just functions)
from my_server.server import search_docs, create_ticket

class TestSearchDocs:
    def test_basic_search(self):
        """Test search returns formatted results."""
        with patch("my_server.server.perform_search") as mock_search:
            mock_search.return_value = [
                MagicMock(title="Doc 1", snippet="First result"),
                MagicMock(title="Doc 2", snippet="Second result"),
            ]
            result = search_docs("test query")
            assert "Doc 1" in result
            assert "Doc 2" in result
            mock_search.assert_called_once_with("test query")

    def test_empty_search(self):
        """Test search with no results."""
        with patch("my_server.server.perform_search") as mock_search:
            mock_search.return_value = []
            result = search_docs("nonexistent")
            assert result == ""

    def test_search_error(self):
        """Test search handles errors gracefully."""
        with patch("my_server.server.perform_search") as mock_search:
            mock_search.side_effect = ConnectionError("API down")
            with pytest.raises(ConnectionError):
                search_docs("test")


class TestCreateTicket:
    def test_create_ticket(self):
        """Test ticket creation returns confirmation."""
        with patch("my_server.server.api") as mock_api:
            mock_api.create.return_value = MagicMock(id=42, url="https://example.com/42")
            result = create_ticket("Bug report", "Something broke", "high")
            assert "#42" in result
            assert "https://example.com/42" in result

    def test_create_ticket_validation(self):
        """Test ticket creation validates required fields."""
        # FastMCP validates types before calling the function
        # Test the function's own validation
        with pytest.raises(ValueError):
            create_ticket("", "body", "medium")  # Empty title

Testing Async Tools

import pytest
import asyncio

@pytest.mark.asyncio
async def test_async_tool():
    """Test an async tool handler."""
    with patch("my_server.server.httpx.AsyncClient") as mock_client:
        mock_response = AsyncMock()
        mock_response.text = '{"data": "test"}'
        mock_response.raise_for_status = MagicMock()
        mock_client.return_value.__aenter__ = AsyncMock(return_value=mock_response)
        mock_client.return_value.__aexit__ = AsyncMock(return_value=False)

        # Better: use a real mock client
        mock_instance = AsyncMock()
        mock_instance.get.return_value = mock_response

        with patch("my_server.server.httpx.AsyncClient") as MockClient:
            MockClient.return_value.__aenter__.return_value = mock_instance
            MockClient.return_value.__aexit__.return_value = False

            result = await fetch_data("https://example.com/api")
            assert "test" in result

Testing with FastMCP Test Client

import pytest
from mcp.server.fastmcp import FastMCP

# Create a test server
mcp = FastMCP("test-server")

@mcp.tool()
def add(a: int, b: int) -> str:
    """Add two numbers."""
    return str(a + b)

@mcp.resource("config://test")
def get_config() -> str:
    return '{"key": "value"}'

@pytest.mark.asyncio
async def test_tool_via_mcp():
    """Test tools through the MCP protocol layer."""
    async with mcp.test_client() as client:
        # List tools
        tools = await client.list_tools()
        assert any(t.name == "add" for t in tools)

        # Call a tool
        result = await client.call_tool("add", {"a": 2, "b": 3})
        assert result[0].text == "5"

@pytest.mark.asyncio
async def test_resource_via_mcp():
    """Test resources through the MCP protocol layer."""
    async with mcp.test_client() as client:
        resources = await client.list_resources()
        assert any(r.uri == "config://test" for r in resources)

        content = await client.read_resource("config://test")
        assert '"key"' in content[0].text

Unit Testing Tools (TypeScript)

Testing with vitest

// tools.test.ts
import { describe, it, expect, vi, beforeEach } from "vitest";
import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
import { InMemoryTransport } from "@modelcontextprotocol/sdk/inMemory.js";
import { Client } from "@modelcontextprotocol/sdk/client/index.js";
import { z } from "zod";

describe("search tool", () => {
  let server: McpServer;
  let client: Client;

  beforeEach(async () => {
    server = new McpServer({ name: "test", version: "1.0.0" });

    server.tool("search", "Search docs", { query: z.string() }, async ({ query }) => {
      // In real code, this calls an external service
      const results = await mockSearch(query);
      return {
        content: [{ type: "text", text: results.join("\n") }],
      };
    });

    // Connect via in-memory transport
    const [clientTransport, serverTransport] = InMemoryTransport.createLinkedPair();
    await server.connect(serverTransport);

    client = new Client({ name: "test-client", version: "1.0.0" });
    await client.connect(clientTransport);
  });

  it("should return search results", async () => {
    const result = await client.callTool({ name: "search", arguments: { query: "test" } });
    expect(result.content).toHaveLength(1);
    expect(result.content[0].type).toBe("text");
  });

  it("should list tools", async () => {
    const tools = await client.listTools();
    expect(tools.tools).toHaveLength(1);
    expect(tools.tools[0].name).toBe("search");
  });
});

Testing Error Handling

describe("error handling", () => {
  it("should return isError for invalid input", async () => {
    const result = await client.callTool({
      name: "query_db",
      arguments: { sql: "DELETE FROM users" },
    });
    expect(result.isError).toBe(true);
    expect(result.content[0].text).toContain("Only SELECT queries");
  });

  it("should handle tool not found", async () => {
    await expect(
      client.callTool({ name: "nonexistent", arguments: {} })
    ).rejects.toThrow();
  });
});

Integration Testing

Full Server Integration Test (Python)

import pytest
import asyncio
from mcp import ClientSession
from mcp.client.stdio import stdio_client

@pytest.mark.asyncio
async def test_server_integration():
    """Test the full server by spawning it as a subprocess."""
    async with stdio_client(
        command="uv",
        args=["run", "python", "server.py"],
        env={"API_KEY": "test-key"},
    ) as (read, write):
        async with ClientSession(read, write) as session:
            # Initialize
            await session.initialize()

            # List tools
            tools = await session.list_tools()
            tool_names = [t.name for t in tools.tools]
            assert "search" in tool_names

            # Call a tool
            result = await session.call_tool("search", {"query": "test"})
            assert len(result.content) > 0
            assert result.content[0].type == "text"

            # List resources
            resources = await session.list_resources()
            assert len(resources.resources) > 0

            # Read a resource
            content = await session.read_resource("config://app")
            assert len(content.contents) > 0

Integration Test with HTTP Transport

import pytest
import httpx
import subprocess
import time

@pytest.fixture(scope="module")
def server_process():
    """Start the MCP server as a subprocess."""
    proc = subprocess.Popen(
        ["uv", "run", "python", "server.py", "--transport", "streamable-http", "--port", "8765"],
        env={**os.environ, "API_KEY": "test-key"},
    )
    time.sleep(2)  # Wait for server to start
    yield proc
    proc.terminate()
    proc.wait()

@pytest.mark.asyncio
async def test_http_server(server_process):
    """Test the server via HTTP transport."""
    async with httpx.AsyncClient(base_url="http://localhost:8765") as client:
        # Initialize
        resp = await client.post("/mcp", json={
            "jsonrpc": "2.0",
            "id": 1,
            "method": "initialize",
            "params": {
                "protocolVersion": "2025-03-26",
                "capabilities": {},
                "clientInfo": {"name": "test", "version": "1.0.0"},
            },
        })
        assert resp.status_code == 200
        result = resp.json()["result"]
        assert "capabilities" in result

        # List tools
        resp = await client.post("/mcp", json={
            "jsonrpc": "2.0",
            "id": 2,
            "method": "tools/list",
        })
        tools = resp.json()["result"]["tools"]
        assert len(tools) > 0

Mock Clients

Python Mock Client

from unittest.mock import AsyncMock
from mcp.types import TextContent, CallToolResult

def create_mock_session():
    """Create a mock MCP session for testing."""
    session = AsyncMock()

    # Mock tool listing
    session.list_tools.return_value = MockToolList(tools=[
        MockTool(name="search", description="Search docs", inputSchema={
            "type": "object",
            "properties": {"query": {"type": "string"}},
            "required": ["query"],
        }),
    ])

    # Mock tool calls
    async def mock_call_tool(name, arguments):
        if name == "search":
            return CallToolResult(
                content=[TextContent(type="text", text="Mock result for: " + arguments["query"])]
            )
        raise ValueError(f"Unknown tool: {name}")

    session.call_tool = AsyncMock(side_effect=mock_call_tool)
    return session

Protocol Debugging

Logging JSON-RPC Messages

import logging
import json

# Enable protocol-level logging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger("mcp.protocol")

# Custom message logger
class MessageLogger:
    def log_request(self, method: str, params: dict, id: int):
        logger.debug(f"→ [{id}] {method}: {json.dumps(params, indent=2)}")

    def log_response(self, id: int, result: dict):
        logger.debug(f"← [{id}] Result: {json.dumps(result, indent=2, default=str)[:500]}")

    def log_notification(self, method: str, params: dict):
        logger.debug(f"→ (notif) {method}: {json.dumps(params, indent=2)}")

    def log_error(self, id: int, error: dict):
        logger.error(f"← [{id}] Error: {json.dumps(error, indent=2)}")

Capturing Messages for Replay

import json
from datetime import datetime

class MessageCapture:
    """Capture MCP messages for debugging and replay."""

    def __init__(self, output_file: str = "mcp_capture.jsonl"):
        self.output_file = output_file
        self.messages = []

    def capture(self, direction: str, message: dict):
        entry = {
            "timestamp": datetime.utcnow().isoformat(),
            "direction": direction,  # "sent" or "received"
            "message": message,
        }
        self.messages.append(entry)
        with open(self.output_file, "a") as f:
            f.write(json.dumps(entry) + "\n")

    def replay(self) -> list[dict]:
        """Load captured messages for analysis."""
        with open(self.output_file) as f:
            return [json.loads(line) for line in f]

Common Protocol Issues

Issue Symptom Diagnosis
Missing jsonrpc field Server returns parse error Check all messages include "jsonrpc": "2.0"
Wrong method name Method not found error Verify against spec: tools/call not tool/call
Missing id on request No response received All requests need unique id; notifications don't
params vs arguments Tool gets empty args tools/call uses params.arguments for tool args
Content format wrong Client shows raw object Must be [{"type": "text", "text": "..."}]
Protocol version mismatch Initialize fails Use "2025-03-26" (check spec for latest)

Common Issues and Solutions

Server Not Starting

# Check 1: Can you run the server directly?
uv run python server.py
# If this fails, fix the Python/dependency issues first

# Check 2: Does the command path exist?
which uv    # or: which python, which npx
# Ensure the command is on PATH

# Check 3: Are dependencies installed?
cd /path/to/server && uv pip list | grep mcp

# Check 4: Check stderr for errors (stdio servers)
# Add to server.py:
import sys
print("Server starting...", file=sys.stderr)

Tool Not Appearing in Client

# Check 1: Does list_tools work?
# Use MCP Inspector to verify
npx @modelcontextprotocol/inspector -- uv run python server.py

# Check 2: Is the tool registered correctly?
# Verify with a simple test:
@mcp.tool()
def test_tool() -> str:
    """A test tool that always works."""
    return "Tool is working!"

# Check 3: Invalid inputSchema
# Ensure schema is valid JSON Schema
# Common mistake: using Python types instead of JSON Schema types
# BAD: {"type": "str"}
# GOOD: {"type": "string"}

Auth Failures

# Check 1: Are env vars set in the CLIENT config, not just your shell?
# Claude Desktop reads env from claude_desktop_config.json, NOT your shell profile

# Check 2: Verify env vars are accessible
@mcp.tool()
def debug_env() -> str:
    """Show environment variables (for debugging only)."""
    import os
    return json.dumps({
        k: v[:5] + "..." if len(v) > 5 else v
        for k, v in os.environ.items()
        if k.startswith("MY_")  # Only show your app's vars
    })

# Check 3: Token expiration
# Add logging to token refresh:
import sys
print(f"Token expires at: {expires_at}", file=sys.stderr)

Timeout Errors

# Check 1: Add timeouts to all external calls
async with httpx.AsyncClient(timeout=30.0) as client:
    resp = await client.get(url)

# Check 2: Break long operations into steps
@mcp.tool()
async def process_large_dataset(dataset_id: str, ctx: Context) -> str:
    """Process a large dataset in chunks with progress."""
    chunks = get_chunks(dataset_id)
    results = []
    for i, chunk in enumerate(chunks):
        await ctx.report_progress(i, len(chunks))
        results.append(await process_chunk(chunk))
    return json.dumps({"processed": len(results)})

# Check 3: Use streaming for large responses
# Return a summary instead of full data
@mcp.tool()
def query_large_table(table: str) -> str:
    """Query a table, returning summary + sample."""
    count = db.count(table)
    sample = db.query(f"SELECT * FROM {table} LIMIT 10")
    return json.dumps({
        "total_rows": count,
        "sample": sample,
        "message": f"Showing 10 of {count} rows. Use pagination for more.",
    }, default=str)

JSON Parse Errors

# Check 1: Don't print to stdout in stdio servers!
# stdout IS the protocol channel. Use stderr for logging.
import sys
print("Debug info", file=sys.stderr)  # Correct
# print("Debug info")  # WRONG - corrupts protocol stream

# Check 2: Ensure tool results are serializable
@mcp.tool()
def get_data() -> str:
    result = db.query("SELECT * FROM users")
    # BAD: datetime objects aren't JSON serializable by default
    # return json.dumps(result)

    # GOOD: handle non-serializable types
    return json.dumps(result, default=str)

CI Testing

GitHub Actions

# .github/workflows/test-mcp.yml
name: Test MCP Server
on: [push, pull_request]

jobs:
  test:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4

      - name: Install uv
        uses: astral-sh/setup-uv@v4

      - name: Install dependencies
        run: uv sync

      - name: Run unit tests
        run: uv run pytest tests/ -v

      - name: Run integration tests
        run: uv run pytest tests/integration/ -v
        env:
          API_KEY: ${{ secrets.TEST_API_KEY }}

      - name: Test server starts
        run: |
          uv run python server.py &
          SERVER_PID=$!
          sleep 3
          # Verify server is running
          kill -0 $SERVER_PID 2>/dev/null && echo "Server started successfully"
          kill $SERVER_PID

      - name: Test with MCP Inspector
        run: |
          npx @modelcontextprotocol/inspector --test -- uv run python server.py

Docker-Based Testing

# Dockerfile.test
FROM python:3.12-slim
WORKDIR /app
COPY . .
RUN pip install uv && uv sync
RUN uv run pytest tests/ -v
# docker-compose.test.yml
services:
  test:
    build:
      context: .
      dockerfile: Dockerfile.test
    environment:
      - API_KEY=test-key
      - DATABASE_URL=postgresql://postgres:test@db:5432/testdb
    depends_on:
      - db
  db:
    image: postgres:16
    environment:
      POSTGRES_PASSWORD: test
      POSTGRES_DB: testdb
docker compose -f docker-compose.test.yml up --build --abort-on-container-exit

Performance Testing

Concurrent Tool Calls

import pytest
import asyncio
import time

@pytest.mark.asyncio
async def test_concurrent_tool_calls():
    """Test server handles concurrent requests correctly."""
    async with mcp.test_client() as client:
        start = time.time()

        # Fire 20 concurrent tool calls
        tasks = [
            client.call_tool("search", {"query": f"test-{i}"})
            for i in range(20)
        ]
        results = await asyncio.gather(*tasks)
        elapsed = time.time() - start

        assert len(results) == 20
        assert all(len(r) > 0 for r in results)
        print(f"20 concurrent calls completed in {elapsed:.2f}s")

Large Payload Handling

@pytest.mark.asyncio
async def test_large_response():
    """Test server handles large responses without issues."""
    async with mcp.test_client() as client:
        result = await client.call_tool("generate_report", {
            "size": "large",  # Generates a multi-KB response
        })
        assert len(result[0].text) > 10000
        # Verify it's valid JSON
        data = json.loads(result[0].text)
        assert "report" in data

@pytest.mark.asyncio
async def test_response_size_limit():
    """Verify server truncates oversized responses."""
    async with mcp.test_client() as client:
        result = await client.call_tool("get_all_data", {})
        text = result[0].text
        # Server should paginate or truncate
        assert len(text) < 1_000_000  # Under 1MB

Memory Usage

import tracemalloc

@pytest.mark.asyncio
async def test_memory_usage():
    """Test that tool calls don't leak memory."""
    tracemalloc.start()

    async with mcp.test_client() as client:
        snapshot1 = tracemalloc.take_snapshot()

        # Run 100 tool calls
        for i in range(100):
            await client.call_tool("search", {"query": f"test-{i}"})

        snapshot2 = tracemalloc.take_snapshot()
        stats = snapshot2.compare_to(snapshot1, "lineno")

        # Check no single allocation grew more than 10MB
        for stat in stats[:5]:
            assert stat.size_diff < 10 * 1024 * 1024, f"Memory leak detected: {stat}"

    tracemalloc.stop()

Debugging Tools

Claude Desktop Logs

# macOS
tail -f ~/Library/Logs/Claude/mcp-server-*.log

# Windows
Get-Content "$env:APPDATA\Claude\Logs\mcp-server-*.log" -Wait

# Look for:
# - Server startup errors
# - Tool call failures
# - Transport disconnections

Custom Debug Server

Add a debug tool to your server for troubleshooting:

import sys
import os
import platform

@mcp.tool()
def server_debug_info() -> str:
    """Return server diagnostic information (remove in production)."""
    return json.dumps({
        "python_version": sys.version,
        "platform": platform.platform(),
        "cwd": os.getcwd(),
        "env_vars": {k: "***" for k in os.environ if k.startswith("MY_")},
        "pid": os.getpid(),
        "argv": sys.argv,
    }, indent=2)

Stderr Logging for stdio Servers

Since stdout is the protocol channel, use stderr for all debugging:

import sys
import logging

# Configure logging to stderr
logging.basicConfig(
    stream=sys.stderr,
    level=logging.DEBUG,
    format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
)
logger = logging.getLogger("my-server")

@mcp.tool()
def my_tool(query: str) -> str:
    logger.debug(f"my_tool called with query={query!r}")
    try:
        result = process(query)
        logger.info(f"my_tool succeeded: {len(result)} chars")
        return result
    except Exception:
        logger.exception("my_tool failed")
        raise

Error Reproduction

Capturing and Replaying Protocol Messages

import json

class ProtocolRecorder:
    """Record MCP protocol messages for reproduction."""

    def __init__(self, output_path: str = "mcp_recording.jsonl"):
        self.output_path = output_path
        self._file = open(output_path, "w")

    def record(self, direction: str, message: dict):
        self._file.write(json.dumps({
            "direction": direction,
            "message": message,
            "timestamp": time.time(),
        }) + "\n")
        self._file.flush()

    def close(self):
        self._file.close()


class ProtocolReplayer:
    """Replay recorded messages against a server."""

    def __init__(self, recording_path: str):
        with open(recording_path) as f:
            self.entries = [json.loads(line) for line in f]

    async def replay(self, session):
        """Replay recorded messages and compare responses."""
        sent = [e for e in self.entries if e["direction"] == "sent"]
        received = [e for e in self.entries if e["direction"] == "received"]

        for i, entry in enumerate(sent):
            msg = entry["message"]
            if "id" in msg:
                # It's a request
                method = msg["method"]
                params = msg.get("params", {})
                if method == "tools/call":
                    result = await session.call_tool(
                        params["name"],
                        params.get("arguments", {}),
                    )
                    # Compare with recorded response
                    expected = received[i] if i < len(received) else None
                    if expected:
                        print(f"[{method}] Match: {result == expected['message']['result']}")

Minimal Reproduction Script

When filing bug reports, create a minimal reproduction:

#!/usr/bin/env python3
"""Minimal reproduction for MCP issue #XXX.

Run: uv run python repro.py
Test: npx @modelcontextprotocol/inspector -- uv run python repro.py
"""
from mcp.server.fastmcp import FastMCP

mcp = FastMCP("repro-server")

@mcp.tool()
def trigger_bug(input: str) -> str:
    """This tool demonstrates the bug."""
    # Minimal code that triggers the issue
    result = problematic_operation(input)
    return result

if __name__ == "__main__":
    mcp.run()