The MCP Inspector is an interactive debugging tool for testing MCP servers without a full client setup.
# Run directly with npx (no install needed)
npx @modelcontextprotocol/inspector
# Connect to a stdio server
npx @modelcontextprotocol/inspector -- uv run python server.py
# Connect to a stdio server with env vars
npx @modelcontextprotocol/inspector -e API_KEY=sk-key -- uv run python server.py
# Connect to an SSE server
npx @modelcontextprotocol/inspector --url http://localhost:8000/sse
# Connect to a Streamable HTTP server
npx @modelcontextprotocol/inspector --url http://localhost:8000/mcp
| Feature | How |
|---|---|
| List tools | Click "Tools" tab to see all registered tools |
| Call tools | Fill in arguments and click "Call" |
| List resources | Click "Resources" tab |
| Read resources | Click any resource to see its contents |
| List prompts | Click "Prompts" tab |
| Get prompts | Fill in arguments and see rendered prompt |
| View messages | "Messages" tab shows raw JSON-RPC traffic |
| Test notifications | See server notifications in real-time |
The Inspector shows raw JSON-RPC messages. Use this to verify:
Example Inspector message log:
→ {"jsonrpc":"2.0","id":1,"method":"initialize","params":{...}}
← {"jsonrpc":"2.0","id":1,"result":{"capabilities":{"tools":{}},...}}
→ {"jsonrpc":"2.0","method":"notifications/initialized"}
→ {"jsonrpc":"2.0","id":2,"method":"tools/list"}
← {"jsonrpc":"2.0","id":2,"result":{"tools":[{"name":"search",...}]}}
→ {"jsonrpc":"2.0","id":3,"method":"tools/call","params":{"name":"search","arguments":{"query":"test"}}}
← {"jsonrpc":"2.0","id":3,"result":{"content":[{"type":"text","text":"..."}]}}
# test_tools.py
import pytest
import json
from unittest.mock import AsyncMock, patch, MagicMock
# Test tool functions directly (they're just functions)
from my_server.server import search_docs, create_ticket
class TestSearchDocs:
def test_basic_search(self):
"""Test search returns formatted results."""
with patch("my_server.server.perform_search") as mock_search:
mock_search.return_value = [
MagicMock(title="Doc 1", snippet="First result"),
MagicMock(title="Doc 2", snippet="Second result"),
]
result = search_docs("test query")
assert "Doc 1" in result
assert "Doc 2" in result
mock_search.assert_called_once_with("test query")
def test_empty_search(self):
"""Test search with no results."""
with patch("my_server.server.perform_search") as mock_search:
mock_search.return_value = []
result = search_docs("nonexistent")
assert result == ""
def test_search_error(self):
"""Test search handles errors gracefully."""
with patch("my_server.server.perform_search") as mock_search:
mock_search.side_effect = ConnectionError("API down")
with pytest.raises(ConnectionError):
search_docs("test")
class TestCreateTicket:
def test_create_ticket(self):
"""Test ticket creation returns confirmation."""
with patch("my_server.server.api") as mock_api:
mock_api.create.return_value = MagicMock(id=42, url="https://example.com/42")
result = create_ticket("Bug report", "Something broke", "high")
assert "#42" in result
assert "https://example.com/42" in result
def test_create_ticket_validation(self):
"""Test ticket creation validates required fields."""
# FastMCP validates types before calling the function
# Test the function's own validation
with pytest.raises(ValueError):
create_ticket("", "body", "medium") # Empty title
import pytest
import asyncio
@pytest.mark.asyncio
async def test_async_tool():
"""Test an async tool handler."""
with patch("my_server.server.httpx.AsyncClient") as mock_client:
mock_response = AsyncMock()
mock_response.text = '{"data": "test"}'
mock_response.raise_for_status = MagicMock()
mock_client.return_value.__aenter__ = AsyncMock(return_value=mock_response)
mock_client.return_value.__aexit__ = AsyncMock(return_value=False)
# Better: use a real mock client
mock_instance = AsyncMock()
mock_instance.get.return_value = mock_response
with patch("my_server.server.httpx.AsyncClient") as MockClient:
MockClient.return_value.__aenter__.return_value = mock_instance
MockClient.return_value.__aexit__.return_value = False
result = await fetch_data("https://example.com/api")
assert "test" in result
import pytest
from mcp.server.fastmcp import FastMCP
# Create a test server
mcp = FastMCP("test-server")
@mcp.tool()
def add(a: int, b: int) -> str:
"""Add two numbers."""
return str(a + b)
@mcp.resource("config://test")
def get_config() -> str:
return '{"key": "value"}'
@pytest.mark.asyncio
async def test_tool_via_mcp():
"""Test tools through the MCP protocol layer."""
async with mcp.test_client() as client:
# List tools
tools = await client.list_tools()
assert any(t.name == "add" for t in tools)
# Call a tool
result = await client.call_tool("add", {"a": 2, "b": 3})
assert result[0].text == "5"
@pytest.mark.asyncio
async def test_resource_via_mcp():
"""Test resources through the MCP protocol layer."""
async with mcp.test_client() as client:
resources = await client.list_resources()
assert any(r.uri == "config://test" for r in resources)
content = await client.read_resource("config://test")
assert '"key"' in content[0].text
// tools.test.ts
import { describe, it, expect, vi, beforeEach } from "vitest";
import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
import { InMemoryTransport } from "@modelcontextprotocol/sdk/inMemory.js";
import { Client } from "@modelcontextprotocol/sdk/client/index.js";
import { z } from "zod";
describe("search tool", () => {
let server: McpServer;
let client: Client;
beforeEach(async () => {
server = new McpServer({ name: "test", version: "1.0.0" });
server.tool("search", "Search docs", { query: z.string() }, async ({ query }) => {
// In real code, this calls an external service
const results = await mockSearch(query);
return {
content: [{ type: "text", text: results.join("\n") }],
};
});
// Connect via in-memory transport
const [clientTransport, serverTransport] = InMemoryTransport.createLinkedPair();
await server.connect(serverTransport);
client = new Client({ name: "test-client", version: "1.0.0" });
await client.connect(clientTransport);
});
it("should return search results", async () => {
const result = await client.callTool({ name: "search", arguments: { query: "test" } });
expect(result.content).toHaveLength(1);
expect(result.content[0].type).toBe("text");
});
it("should list tools", async () => {
const tools = await client.listTools();
expect(tools.tools).toHaveLength(1);
expect(tools.tools[0].name).toBe("search");
});
});
describe("error handling", () => {
it("should return isError for invalid input", async () => {
const result = await client.callTool({
name: "query_db",
arguments: { sql: "DELETE FROM users" },
});
expect(result.isError).toBe(true);
expect(result.content[0].text).toContain("Only SELECT queries");
});
it("should handle tool not found", async () => {
await expect(
client.callTool({ name: "nonexistent", arguments: {} })
).rejects.toThrow();
});
});
import pytest
import asyncio
from mcp import ClientSession
from mcp.client.stdio import stdio_client
@pytest.mark.asyncio
async def test_server_integration():
"""Test the full server by spawning it as a subprocess."""
async with stdio_client(
command="uv",
args=["run", "python", "server.py"],
env={"API_KEY": "test-key"},
) as (read, write):
async with ClientSession(read, write) as session:
# Initialize
await session.initialize()
# List tools
tools = await session.list_tools()
tool_names = [t.name for t in tools.tools]
assert "search" in tool_names
# Call a tool
result = await session.call_tool("search", {"query": "test"})
assert len(result.content) > 0
assert result.content[0].type == "text"
# List resources
resources = await session.list_resources()
assert len(resources.resources) > 0
# Read a resource
content = await session.read_resource("config://app")
assert len(content.contents) > 0
import pytest
import httpx
import subprocess
import time
@pytest.fixture(scope="module")
def server_process():
"""Start the MCP server as a subprocess."""
proc = subprocess.Popen(
["uv", "run", "python", "server.py", "--transport", "streamable-http", "--port", "8765"],
env={**os.environ, "API_KEY": "test-key"},
)
time.sleep(2) # Wait for server to start
yield proc
proc.terminate()
proc.wait()
@pytest.mark.asyncio
async def test_http_server(server_process):
"""Test the server via HTTP transport."""
async with httpx.AsyncClient(base_url="http://localhost:8765") as client:
# Initialize
resp = await client.post("/mcp", json={
"jsonrpc": "2.0",
"id": 1,
"method": "initialize",
"params": {
"protocolVersion": "2025-03-26",
"capabilities": {},
"clientInfo": {"name": "test", "version": "1.0.0"},
},
})
assert resp.status_code == 200
result = resp.json()["result"]
assert "capabilities" in result
# List tools
resp = await client.post("/mcp", json={
"jsonrpc": "2.0",
"id": 2,
"method": "tools/list",
})
tools = resp.json()["result"]["tools"]
assert len(tools) > 0
from unittest.mock import AsyncMock
from mcp.types import TextContent, CallToolResult
def create_mock_session():
"""Create a mock MCP session for testing."""
session = AsyncMock()
# Mock tool listing
session.list_tools.return_value = MockToolList(tools=[
MockTool(name="search", description="Search docs", inputSchema={
"type": "object",
"properties": {"query": {"type": "string"}},
"required": ["query"],
}),
])
# Mock tool calls
async def mock_call_tool(name, arguments):
if name == "search":
return CallToolResult(
content=[TextContent(type="text", text="Mock result for: " + arguments["query"])]
)
raise ValueError(f"Unknown tool: {name}")
session.call_tool = AsyncMock(side_effect=mock_call_tool)
return session
import logging
import json
# Enable protocol-level logging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger("mcp.protocol")
# Custom message logger
class MessageLogger:
def log_request(self, method: str, params: dict, id: int):
logger.debug(f"→ [{id}] {method}: {json.dumps(params, indent=2)}")
def log_response(self, id: int, result: dict):
logger.debug(f"← [{id}] Result: {json.dumps(result, indent=2, default=str)[:500]}")
def log_notification(self, method: str, params: dict):
logger.debug(f"→ (notif) {method}: {json.dumps(params, indent=2)}")
def log_error(self, id: int, error: dict):
logger.error(f"← [{id}] Error: {json.dumps(error, indent=2)}")
import json
from datetime import datetime
class MessageCapture:
"""Capture MCP messages for debugging and replay."""
def __init__(self, output_file: str = "mcp_capture.jsonl"):
self.output_file = output_file
self.messages = []
def capture(self, direction: str, message: dict):
entry = {
"timestamp": datetime.utcnow().isoformat(),
"direction": direction, # "sent" or "received"
"message": message,
}
self.messages.append(entry)
with open(self.output_file, "a") as f:
f.write(json.dumps(entry) + "\n")
def replay(self) -> list[dict]:
"""Load captured messages for analysis."""
with open(self.output_file) as f:
return [json.loads(line) for line in f]
| Issue | Symptom | Diagnosis |
|---|---|---|
Missing jsonrpc field |
Server returns parse error | Check all messages include "jsonrpc": "2.0" |
| Wrong method name | Method not found error | Verify against spec: tools/call not tool/call |
Missing id on request |
No response received | All requests need unique id; notifications don't |
params vs arguments |
Tool gets empty args | tools/call uses params.arguments for tool args |
| Content format wrong | Client shows raw object | Must be [{"type": "text", "text": "..."}] |
| Protocol version mismatch | Initialize fails | Use "2025-03-26" (check spec for latest) |
# Check 1: Can you run the server directly?
uv run python server.py
# If this fails, fix the Python/dependency issues first
# Check 2: Does the command path exist?
which uv # or: which python, which npx
# Ensure the command is on PATH
# Check 3: Are dependencies installed?
cd /path/to/server && uv pip list | grep mcp
# Check 4: Check stderr for errors (stdio servers)
# Add to server.py:
import sys
print("Server starting...", file=sys.stderr)
# Check 1: Does list_tools work?
# Use MCP Inspector to verify
npx @modelcontextprotocol/inspector -- uv run python server.py
# Check 2: Is the tool registered correctly?
# Verify with a simple test:
@mcp.tool()
def test_tool() -> str:
"""A test tool that always works."""
return "Tool is working!"
# Check 3: Invalid inputSchema
# Ensure schema is valid JSON Schema
# Common mistake: using Python types instead of JSON Schema types
# BAD: {"type": "str"}
# GOOD: {"type": "string"}
# Check 1: Are env vars set in the CLIENT config, not just your shell?
# Claude Desktop reads env from claude_desktop_config.json, NOT your shell profile
# Check 2: Verify env vars are accessible
@mcp.tool()
def debug_env() -> str:
"""Show environment variables (for debugging only)."""
import os
return json.dumps({
k: v[:5] + "..." if len(v) > 5 else v
for k, v in os.environ.items()
if k.startswith("MY_") # Only show your app's vars
})
# Check 3: Token expiration
# Add logging to token refresh:
import sys
print(f"Token expires at: {expires_at}", file=sys.stderr)
# Check 1: Add timeouts to all external calls
async with httpx.AsyncClient(timeout=30.0) as client:
resp = await client.get(url)
# Check 2: Break long operations into steps
@mcp.tool()
async def process_large_dataset(dataset_id: str, ctx: Context) -> str:
"""Process a large dataset in chunks with progress."""
chunks = get_chunks(dataset_id)
results = []
for i, chunk in enumerate(chunks):
await ctx.report_progress(i, len(chunks))
results.append(await process_chunk(chunk))
return json.dumps({"processed": len(results)})
# Check 3: Use streaming for large responses
# Return a summary instead of full data
@mcp.tool()
def query_large_table(table: str) -> str:
"""Query a table, returning summary + sample."""
count = db.count(table)
sample = db.query(f"SELECT * FROM {table} LIMIT 10")
return json.dumps({
"total_rows": count,
"sample": sample,
"message": f"Showing 10 of {count} rows. Use pagination for more.",
}, default=str)
# Check 1: Don't print to stdout in stdio servers!
# stdout IS the protocol channel. Use stderr for logging.
import sys
print("Debug info", file=sys.stderr) # Correct
# print("Debug info") # WRONG - corrupts protocol stream
# Check 2: Ensure tool results are serializable
@mcp.tool()
def get_data() -> str:
result = db.query("SELECT * FROM users")
# BAD: datetime objects aren't JSON serializable by default
# return json.dumps(result)
# GOOD: handle non-serializable types
return json.dumps(result, default=str)
# .github/workflows/test-mcp.yml
name: Test MCP Server
on: [push, pull_request]
jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Install uv
uses: astral-sh/setup-uv@v4
- name: Install dependencies
run: uv sync
- name: Run unit tests
run: uv run pytest tests/ -v
- name: Run integration tests
run: uv run pytest tests/integration/ -v
env:
API_KEY: ${{ secrets.TEST_API_KEY }}
- name: Test server starts
run: |
uv run python server.py &
SERVER_PID=$!
sleep 3
# Verify server is running
kill -0 $SERVER_PID 2>/dev/null && echo "Server started successfully"
kill $SERVER_PID
- name: Test with MCP Inspector
run: |
npx @modelcontextprotocol/inspector --test -- uv run python server.py
# Dockerfile.test
FROM python:3.12-slim
WORKDIR /app
COPY . .
RUN pip install uv && uv sync
RUN uv run pytest tests/ -v
# docker-compose.test.yml
services:
test:
build:
context: .
dockerfile: Dockerfile.test
environment:
- API_KEY=test-key
- DATABASE_URL=postgresql://postgres:test@db:5432/testdb
depends_on:
- db
db:
image: postgres:16
environment:
POSTGRES_PASSWORD: test
POSTGRES_DB: testdb
docker compose -f docker-compose.test.yml up --build --abort-on-container-exit
import pytest
import asyncio
import time
@pytest.mark.asyncio
async def test_concurrent_tool_calls():
"""Test server handles concurrent requests correctly."""
async with mcp.test_client() as client:
start = time.time()
# Fire 20 concurrent tool calls
tasks = [
client.call_tool("search", {"query": f"test-{i}"})
for i in range(20)
]
results = await asyncio.gather(*tasks)
elapsed = time.time() - start
assert len(results) == 20
assert all(len(r) > 0 for r in results)
print(f"20 concurrent calls completed in {elapsed:.2f}s")
@pytest.mark.asyncio
async def test_large_response():
"""Test server handles large responses without issues."""
async with mcp.test_client() as client:
result = await client.call_tool("generate_report", {
"size": "large", # Generates a multi-KB response
})
assert len(result[0].text) > 10000
# Verify it's valid JSON
data = json.loads(result[0].text)
assert "report" in data
@pytest.mark.asyncio
async def test_response_size_limit():
"""Verify server truncates oversized responses."""
async with mcp.test_client() as client:
result = await client.call_tool("get_all_data", {})
text = result[0].text
# Server should paginate or truncate
assert len(text) < 1_000_000 # Under 1MB
import tracemalloc
@pytest.mark.asyncio
async def test_memory_usage():
"""Test that tool calls don't leak memory."""
tracemalloc.start()
async with mcp.test_client() as client:
snapshot1 = tracemalloc.take_snapshot()
# Run 100 tool calls
for i in range(100):
await client.call_tool("search", {"query": f"test-{i}"})
snapshot2 = tracemalloc.take_snapshot()
stats = snapshot2.compare_to(snapshot1, "lineno")
# Check no single allocation grew more than 10MB
for stat in stats[:5]:
assert stat.size_diff < 10 * 1024 * 1024, f"Memory leak detected: {stat}"
tracemalloc.stop()
# macOS
tail -f ~/Library/Logs/Claude/mcp-server-*.log
# Windows
Get-Content "$env:APPDATA\Claude\Logs\mcp-server-*.log" -Wait
# Look for:
# - Server startup errors
# - Tool call failures
# - Transport disconnections
Add a debug tool to your server for troubleshooting:
import sys
import os
import platform
@mcp.tool()
def server_debug_info() -> str:
"""Return server diagnostic information (remove in production)."""
return json.dumps({
"python_version": sys.version,
"platform": platform.platform(),
"cwd": os.getcwd(),
"env_vars": {k: "***" for k in os.environ if k.startswith("MY_")},
"pid": os.getpid(),
"argv": sys.argv,
}, indent=2)
Since stdout is the protocol channel, use stderr for all debugging:
import sys
import logging
# Configure logging to stderr
logging.basicConfig(
stream=sys.stderr,
level=logging.DEBUG,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
)
logger = logging.getLogger("my-server")
@mcp.tool()
def my_tool(query: str) -> str:
logger.debug(f"my_tool called with query={query!r}")
try:
result = process(query)
logger.info(f"my_tool succeeded: {len(result)} chars")
return result
except Exception:
logger.exception("my_tool failed")
raise
import json
class ProtocolRecorder:
"""Record MCP protocol messages for reproduction."""
def __init__(self, output_path: str = "mcp_recording.jsonl"):
self.output_path = output_path
self._file = open(output_path, "w")
def record(self, direction: str, message: dict):
self._file.write(json.dumps({
"direction": direction,
"message": message,
"timestamp": time.time(),
}) + "\n")
self._file.flush()
def close(self):
self._file.close()
class ProtocolReplayer:
"""Replay recorded messages against a server."""
def __init__(self, recording_path: str):
with open(recording_path) as f:
self.entries = [json.loads(line) for line in f]
async def replay(self, session):
"""Replay recorded messages and compare responses."""
sent = [e for e in self.entries if e["direction"] == "sent"]
received = [e for e in self.entries if e["direction"] == "received"]
for i, entry in enumerate(sent):
msg = entry["message"]
if "id" in msg:
# It's a request
method = msg["method"]
params = msg.get("params", {})
if method == "tools/call":
result = await session.call_tool(
params["name"],
params.get("arguments", {}),
)
# Compare with recorded response
expected = received[i] if i < len(received) else None
if expected:
print(f"[{method}] Match: {result == expected['message']['result']}")
When filing bug reports, create a minimal reproduction:
#!/usr/bin/env python3
"""Minimal reproduction for MCP issue #XXX.
Run: uv run python repro.py
Test: npx @modelcontextprotocol/inspector -- uv run python repro.py
"""
from mcp.server.fastmcp import FastMCP
mcp = FastMCP("repro-server")
@mcp.tool()
def trigger_bug(input: str) -> str:
"""This tool demonstrates the bug."""
# Minimal code that triggers the issue
result = problematic_operation(input)
return result
if __name__ == "__main__":
mcp.run()