mcp-lens
v1.0.0
Published
Observability and instrumentation layer for the Model Context Protocol π€π
Downloads
113
Maintainers
Readme
mcp-lens π
Observability middleware for the Model Context Protocol. Intercepts MCP traffic automatically and forwards structured records to any backend (such as console, file, Datadog, OpenTelemetry) without modifying your server or client code.
βοΈ How it works
mcp-lens wraps the internal Protocol object of an McpServer or Client instance. Every inbound or outbound request, notification, and tool call passes through the instrumentation layer, which produces a LensRecord. That record is run through a configurable processor pipeline (for redaction, truncation, classification, etc.) and then dispatched to one or more adapters that forward it to your backend.
Adapters are fault-isolated: if one throws, the error is caught and logged internally, and all other adapters continue to run. Your application is never affected.
π¦ Installation
npm install mcp-lensRequires @modelcontextprotocol/sdk as a peer dependency (already installed if you are building an MCP application).
π Quick start
The minimal setup: instrument a server and print every event to the console.
import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
import { createLens, adapters, instrumentServer } from "mcp-lens";
const lens = createLens({
serviceName: "my-mcp-server",
adapters: [adapters.console()]
});
const server = new McpServer({ name: "my-server", version: "1.0.0" });
instrumentServer(server, { lens });
server.tool("ping", {}, async () => ({
content: [{ type: "text", text: "pong" }]
}));
const transport = new StdioServerTransport();
await server.connect(transport);From this point on every tools/call, resources/list, and other MCP requests will appear in the console:
[mcp-lens] β±οΈ my-mcp-server | mcp.request (trace: abc123) [4ms]
[mcp-lens] β±οΈ my-mcp-server | tool.execution (trace: abc123) [2ms]π₯οΈ Instrumenting a client
import { Client } from "@modelcontextprotocol/sdk/client/index.js";
import { StdioClientTransport } from "@modelcontextprotocol/sdk/client/stdio.js";
import { createLens, adapters, instrumentClient } from "mcp-lens";
const lens = createLens({
serviceName: "my-mcp-client",
adapters: [adapters.console()]
});
const client = new Client({ name: "my-client", version: "1.0.0" }, { capabilities: {} });
instrumentClient(client, { lens });
const transport = new StdioClientTransport({ command: "node", args: ["./server.js"] });
await client.connect(transport);
const result = await client.request(
{ method: "tools/call", params: { name: "ping", arguments: {} } },
z.any()
);Every outbound request and incoming notification is automatically captured.
π¬ Tool-level tracing
instrumentServer captures a span at the MCP protocol boundary (tools/call). If you also want a child span that captures what happens inside your tool handler, including its own duration and result, use withObservedTool.
import { withObservedTool } from "mcp-lens";
server.tool(
"search",
{ query: z.string(), limit: z.number().optional() },
withObservedTool("search", async ({ query, limit = 10 }) => {
const results = await db.search(query, limit);
return {
content: [{ type: "text", text: JSON.stringify(results) }]
};
}, { lens })
);The resulting spans are linked by traceId, so you can reconstruct the full call tree in your backend:
tools/call (mcp.request, incoming, traceId: abc123)
βββ search (tool.execution, traceId: abc123, durationMs: 38)π Data privacy and processing
Processors intercept LensRecord objects before they reach adapters. Add them to the pipeline in order of execution.
import { processing } from "mcp-lens";π Key redaction
Strips values from any field whose key matches a list (case-insensitive). Works recursively through nested objects and arrays.
lens.pipeline.addProcessor(
new processing.RedactionProcessor([
new processing.KeyRedactor(["password", "token", "api_key", "secret"])
])
);Input:
{ "user": "alice", "password": "hunter2", "nested": { "token": "abc" } }After redaction:
{ "user": "alice", "password": "[REDACTED]", "nested": { "token": "[REDACTED]" } }π§Ή Regex redaction
Replaces pattern matches inside string values. Useful for emails, credit card numbers, IPs, etc.
lens.pipeline.addProcessor(
new processing.RedactionProcessor([
new processing.RegexRedactor(
/\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}\b/g,
"[EMAIL]"
),
new processing.RegexRedactor(
/\b\d{4}[- ]?\d{4}[- ]?\d{4}[- ]?\d{4}\b/g,
"[CARD]"
)
])
);βοΈ Payload truncation
Prevents oversized payloads from bloating your logs. Truncates strings over the character limit and arrays over the item limit.
lens.pipeline.addProcessor(
new processing.TruncationProcessor(
4096, // max string length in characters
50 // max array items
)
);Strings become "first 4096 chars... [TRUNCATED]". Arrays become [item1, item2, β¦, "... [N MORE ITEMS TRUNCATED]"].
π·οΈ Error classification
Normalizes error codes into a standard structure with category and retryable fields.
lens.pipeline.addProcessor(new processing.ErrorClassifierProcessor());| Error code pattern | category | retryable |
|---|---|---|
| ECONNREFUSED, ETIMEDOUT | network_error | true |
| Starts with mcp. | protocol_error | - |
| Anything else | internal_error | - |
π€ Attribute normalization
Sanitizes attribute keys so they are safe for most observability backends, converting spaces and special characters to underscores and lowercasing everything.
lens.pipeline.addProcessor(new processing.AttributeNormalizerProcessor());"HTTP Status" β "http_status", "user@domain" β "user_domain".
π§© Combining processors
Processors run in the order they are added. A typical production setup:
const { RedactionProcessor, KeyRedactor, RegexRedactor, TruncationProcessor, ErrorClassifierProcessor, AttributeNormalizerProcessor } = processing;
lens.pipeline.addProcessor(new RedactionProcessor([
new KeyRedactor(["password", "token", "api_key", "secret", "authorization"]),
new RegexRedactor(/\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}\b/g, "[EMAIL]"),
]));
lens.pipeline.addProcessor(new TruncationProcessor(4096, 50));
lens.pipeline.addProcessor(new ErrorClassifierProcessor());
lens.pipeline.addProcessor(new AttributeNormalizerProcessor());π‘ Built-in adapters
π¨οΈ Console adapter
Prints a one-line summary per record to stdout. Errors are printed with a second line to stderr. Intended for development and debugging.
import { adapters } from "mcp-lens";
const lens = createLens({
serviceName: "my-server",
adapters: [adapters.console()]
});Output:
[mcp-lens] β±οΈ my-server | tools/call (trace: abc123) [12ms]
[mcp-lens] β my-server | tools/call
βββ Error: ERR_TOOL_NOT_FOUND - Tool "foo" not foundπ JSON adapter
Buffers records in memory and appends them as NDJSON to a file. Flushes every 5 seconds or when 100 records accumulate, whichever comes first. Creates parent directories automatically.
import { adapters } from "mcp-lens";
const lens = createLens({
serviceName: "my-server",
adapters: [
adapters.console(),
adapters.json({ filePath: "./logs/mcp.ndjson" })
]
});Each line in the file is a JSON-serialized LensRecord:
{"id":"span-1","kind":"span","timestamp":"2026-03-06T12:00:00.000Z","traceId":"abc123","operation":"mcp.request","status":"ok","durationMs":4,"attributes":{"mcp.method":"tools/call","mcp.direction":"incoming"}}π Custom adapters
Use createAdapter to connect any backend without implementing the full LensAdapter interface manually.
import { createLens, createAdapter } from "mcp-lens";
const webhookAdapter = createAdapter({
name: "my-webhook",
onHandle: async (record) => {
await fetch("https://hooks.example.com/mcp", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify(record)
});
}
});
const lens = createLens({
serviceName: "my-server",
adapters: [webhookAdapter]
});All hooks are optional:
| Hook | When called |
|---|---|
| onInit(context) | Once when the adapter is added to the engine |
| onHandle(record) | For every emitted LensRecord |
| onFlush() | When lens.flush() is called |
| onShutdown() | When lens.shutdown() is called |
π Datadog example:
const datadogAdapter = createAdapter({
name: "datadog",
onInit: ({ serviceName }) => {
console.log(`Datadog adapter started for service: ${serviceName}`);
},
onHandle: async (record) => {
await fetch("https://http-intake.logs.datadoghq.com/api/v2/logs", {
method: "POST",
headers: {
"DD-API-KEY": process.env.DD_API_KEY!,
"Content-Type": "application/json"
},
body: JSON.stringify({
ddsource: "mcp-lens",
service: record.attributes["service.name"],
message: JSON.stringify(record)
})
});
}
});π Loki example:
const lokiAdapter = createAdapter({
name: "loki",
onHandle: async (record) => {
await fetch("http://localhost:3100/loki/api/v1/push", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
streams: [{
stream: { service: "my-server", operation: record.operation },
values: [[`${Date.now()}000000`, JSON.stringify(record)]]
}]
})
});
}
});π The LensRecord format
Every event emitted by the engine has this shape:
type LensRecord = {
id: string; // unique record ID
kind: "span" | "event" | "error";
timestamp: string; // ISO 8601
traceId?: string; // shared across a call chain
spanId?: string; // this record's span ID
parentSpanId?: string; // set when nested inside another span
operation: string; // e.g. "mcp.request", "tool.execution"
status?: "ok" | "error";
durationMs?: number;
attributes: Record<string, unknown>;
payload?: unknown; // raw request/response (stripped by redactors)
error?: {
code: string;
message: string;
category: string; // "network_error" | "protocol_error" | "internal_error" | "tool_error"
retryable?: boolean;
};
};Standard attributes added by the instrumentation layer:
| Attribute | Value |
|---|---|
| mcp.method | "tools/call", "resources/list", etc. |
| mcp.direction | "incoming" or "outgoing" |
| mcp.type | "request" or "notification" |
| mcp.result | The raw response object |
| tool.name | Set by withObservedTool |
| tool.result | Set by withObservedTool |
π§ Advanced use cases
π’ Multi-tenant tracing
If you handle requests from multiple tenants, inject a tenant ID into every record via a custom processor:
import { LensProcessor, LensRecord } from "mcp-lens";
class TenantProcessor implements LensProcessor {
name = "tenant";
constructor(private getTenantId: () => string | undefined) {}
process(record: LensRecord): LensRecord {
const tenantId = this.getTenantId();
if (tenantId) {
record.attributes["tenant.id"] = tenantId;
}
return record;
}
}
lens.pipeline.addProcessor(new TenantProcessor(() => currentTenant.get()));π² Dynamic sampling
Drop records you don't need before they reach any adapter. Return null from a processor to discard the record entirely:
import { LensProcessor, LensRecord } from "mcp-lens";
class SamplingProcessor implements LensProcessor {
name = "sampler";
constructor(private rate: number) {}
process(record: LensRecord): LensRecord | null {
if (record.status === "error") return record;
return Math.random() < this.rate ? record : null;
}
}
lens.pipeline.addProcessor(new SamplingProcessor(0.1)); // keep 10% of ok spansπ Sending to multiple backends
Adapters receive every record independently. Configure as many as you need:
const lens = createLens({
serviceName: "my-server",
adapters: [
adapters.console(),
adapters.json({ filePath: "./logs/mcp.ndjson" }),
datadogAdapter,
metricsAdapter,
]
});If datadogAdapter throws on a particular record, the others still receive it.
π Graceful shutdown
Call lens.shutdown() when your process is about to exit. This triggers onShutdown() on every adapter (and flushes any pending buffers in the JSON adapter).
process.on("SIGTERM", async () => {
await lens.shutdown();
process.exit(0);
});You can also flush manually without shutting down, useful in serverless environments between invocations:
await lens.flush();π API Reference
createLens(config)
Creates and returns a LensEngine. Automatically initializes all provided adapters.
createLens(config: LensConfig): LensEngine
interface LensConfig {
serviceName: string; // required, identifies this service in all records
environment?: string; // optional, e.g. "production", "staging"
adapters?: LensAdapter[];
}| Parameter | Type | Required | Description |
|---|---|---|---|
| serviceName | string | β
| Included in every LensRecord to identify the origin service |
| environment | string | β | Forwarded to adapters via LensRuntimeContext |
| adapters | LensAdapter[] | β | List of adapters to receive records. More can be added later via engine.addAdapter() |
instrumentServer(server, options)
Patches an McpServer (high-level) or a raw Server (low-level) instance. Safe to call before or after tools and resources are registered.
instrumentServer(
server: McpServer | Server,
options: InstrumentOptions
): void
interface InstrumentOptions {
lens: LensEngine;
}| Parameter | Type | Description |
|---|---|---|
| server | McpServer \| Server | The MCP server instance to instrument |
| options.lens | LensEngine | The engine that will receive the emitted records |
What gets patched:
setRequestHandler- wraps every incoming request handler with a spansetNotificationHandler- wraps every incoming notification handler with an eventnotification- wraps outgoing server notifications
instrumentClient(client, options)
Patches a Client instance to capture outgoing requests and incoming notifications.
instrumentClient(
client: Client,
options: InstrumentOptions
): void| Parameter | Type | Description |
|---|---|---|
| client | Client | The MCP client instance to instrument |
| options.lens | LensEngine | The engine that will receive the emitted records |
What gets patched:
request- wraps every outgoing request with a span, tracking duration and errorssetNotificationHandler- wraps incoming notification handlers
withObservedTool(name, handler, options)
Wraps a tool handler function with a child span. The span is linked to the outer tools/call span via traceId.
withObservedTool<TArgs extends any[], TResult>(
name: string,
handler: (...args: TArgs) => TResult | Promise<TResult>,
options: InstrumentOptions
): (...args: TArgs) => Promise<TResult>| Parameter | Type | Description |
|---|---|---|
| name | string | Stored as tool.name in the record's attributes |
| handler | function | Your tool implementation |
| options.lens | LensEngine | The engine that will receive the emitted span |
Records produced:
| Field | Value |
|---|---|
| operation | "tool.execution" |
| attributes["tool.name"] | The name argument |
| attributes["tool.result"] | The return value (before redactors run) |
| status | "ok" or "error" |
| error.category | "tool_error" on failure |
createAdapter(hooks)
Creates a LensAdapter from simple callback hooks. All hooks are optional.
createAdapter(hooks: {
name: string;
onInit?: (context: LensRuntimeContext) => void | Promise<void>;
onHandle?: (record: LensRecord) => void | Promise<void>;
onFlush?: () => void | Promise<void>;
onShutdown?: () => void | Promise<void>;
}): LensAdapter
interface LensRuntimeContext {
serviceName: string;
environment?: string;
}| Hook | When invoked |
|---|---|
| onInit(context) | Once, immediately when the adapter is added to the engine |
| onHandle(record) | On every LensRecord after the processor pipeline |
| onFlush() | When lens.flush() is called |
| onShutdown() | When lens.shutdown() is called |
adapters.console()
Returns a ConsoleAdapter. Prints one line per record to stdout. Errors add a second line with code and message to stderr. No configuration parameters.
adapters.json(options)
Returns a JSONAdapter that writes records as NDJSON (newline-delimited JSON).
adapters.json(options: JSONAdapterOptions): JSONAdapter
interface JSONAdapterOptions {
filePath: string; // path to the output file, created if it doesn't exist
}| Behaviour | Details |
|---|---|
| Buffering | Records are buffered in memory |
| Auto-flush | Flushes every 5 seconds or when the buffer reaches 100 records |
| Directory creation | Parent directories are created automatically |
| Error handling | On write failure, records are re-queued and retried on the next flush |
| Shutdown | Calling lens.shutdown() flushes any remaining records before exit |
LensEngine
The object returned by createLens.
class LensEngine {
readonly pipeline: ProcessorPipeline;
readonly spanManager: SpanManager;
readonly contextManager: ContextManager;
addAdapter(adapter: LensAdapter): void;
emit(record: LensRecord): Promise<void>;
flush(): Promise<void>;
shutdown(): Promise<void>;
}| Member | Description |
|---|---|
| pipeline.addProcessors(...processors) | Add one or more processors to the pipeline |
| pipeline.process(record) | Manually run a record through the pipeline (returns null if dropped) |
| spanManager.createRecord(kind, data) | Create a LensRecord with context propagation applied |
| contextManager.run(context, fn) | Run fn inside a trace context (uses AsyncLocalStorage) |
| contextManager.getCurrent() | Returns the active TraceContext or undefined |
| addAdapter(adapter) | Dynamically add an adapter after initialization |
| emit(record) | Run a record through the pipeline and dispatch to all adapters |
| flush() | Call flush() on all adapters that support it |
| shutdown() | Call shutdown() on all adapters that support it |
Interfaces and types
interface LensAdapter {
name: string;
init?(context: LensRuntimeContext): void | Promise<void>;
handle(record: LensRecord): void | Promise<void>;
flush?(): void | Promise<void>;
shutdown?(): void | Promise<void>;
}
interface LensProcessor {
name: string;
process(record: LensRecord): LensRecord | null | Promise<LensRecord | null>;
// Return null to drop the record - it will not reach any adapter.
}
type LensRecord = {
id: string;
kind: "span" | "event" | "error";
timestamp: string;
traceId?: string;
spanId?: string;
parentSpanId?: string;
operation: string;
status?: "ok" | "error";
durationMs?: number;
attributes: Record<string, unknown>;
payload?: unknown;
error?: {
code: string;
message: string;
category: string;
retryable?: boolean;
};
};π€ Contributing
We love contributions! Feel free to open issues or PRs. Make sure you run tests:
npm run build
npm run build:types
npm run test
npm run lintπ License
MIT License. See LICENSE for more details. Build safely! π°β¨
