@mrgeoffrich/claude-agent-sdk-tap
v0.1.9
Published
Transparent tap/observer for the Claude Agent SDK message stream with strongly-typed callbacks
Readme
claude-agent-sdk-tap
Transparent tap/observer for the Claude Agent SDK message stream. Calls strongly-typed callbacks for each message type without modifying the stream — useful for diagnostics, debugging, logging, and forwarding messages to a collection server.
Install
npm install @mrgeoffrich/claude-agent-sdk-tapRequires @anthropic-ai/claude-agent-sdk >=0.2.0 as a peer dependency.
Multi-turn agent with HTTP forwarding
The most common production pattern: a long-running agent (e.g. a sidecar service) that accepts user messages over an API, streams them to the SDK via an AsyncIterable, and forwards all tap messages to a collector.
import { tappedQuery } from "@mrgeoffrich/claude-agent-sdk-tap";
import { createHttpSink } from "@mrgeoffrich/claude-agent-sdk-tap/transport";
import type { SDKUserMessage } from "@mrgeoffrich/claude-agent-sdk-tap";
// --- Message queue: an async iterable that yields user messages on demand ---
class MessageQueue {
private pending: Array<{
resolve: (result: IteratorResult<SDKUserMessage>) => void;
}> = [];
private messages: SDKUserMessage[] = [];
private done = false;
/** Enqueue a user message (called when your API receives a request). */
push(text: string) {
const msg: SDKUserMessage = {
type: "user",
message: { role: "user", content: text } as any,
parent_tool_use_id: null,
session_id: "", // left empty — the tap library backfills this
};
if (this.pending.length > 0) {
this.pending.shift()!.resolve({ value: msg, done: false });
} else {
this.messages.push(msg);
}
}
/** Signal that no more messages will be sent. */
close() {
this.done = true;
for (const p of this.pending) {
p.resolve({ value: undefined as any, done: true });
}
this.pending = [];
}
[Symbol.asyncIterator](): AsyncIterator<SDKUserMessage> {
return {
next: () => {
if (this.messages.length > 0) {
return Promise.resolve({ value: this.messages.shift()!, done: false });
}
if (this.done) {
return Promise.resolve({ value: undefined as any, done: true });
}
return new Promise((resolve) => this.pending.push({ resolve }));
},
};
}
}
// --- Agent runner ---
async function runAgent(initialMessage: string) {
const queue = new MessageQueue();
queue.push(initialMessage);
// Set up HTTP sink for forwarding tap messages to a collector
const sink = createHttpSink("http://collector:8080/messages", {
batchSize: 10,
flushIntervalMs: 2000,
onError: (err) => console.error("Sink error:", err),
});
const stream = tappedQuery(
{
prompt: queue, // AsyncIterable — multi-turn input
options: {
model: "claude-sonnet-4-6",
systemPrompt: "You are a helpful assistant.",
},
},
{}, // handlers (optional — add typed handlers here)
{ onMessage: sink.send }, // every message is forwarded to the collector
);
// Consume the output stream
for await (const message of stream) {
switch (message.type) {
case "result":
if (message.subtype === "success") {
console.log("Turn complete:", message.result);
}
break;
case "assistant":
console.log("Response:", message.message.content);
break;
}
}
await sink.flush();
}Session ID handling
The SDK assigns a session_id in the system:init message, but the first user message is sent before system:init arrives — so it has an empty session_id. The tap library handles this automatically:
- Output stream: Messages with an empty
session_idare buffered until a message with a realsession_idarrives (typicallysystem:init). Buffered messages are backfilled and released in original order. - Input stream (
AsyncIterableprompt /streamInput()): Oncesystem:initpopulates the session ID, subsequent user messages are backfilled before handlers fire. tappedQuery.sessionId: Always reflects the latest captured session ID. Available as a property on the returned object.
This means your onMessage handler and collector always receive messages with a valid session_id — no special handling needed.
Forward all messages to your server
The simplest way to use this library is to forward every SDK message to an HTTP endpoint. Three steps:
- Create a sink pointed at your server
- Pass it as
onMessagewhen you call the SDK - Call
flush()when you're done to make sure everything is sent
import { tappedQuery } from "@mrgeoffrich/claude-agent-sdk-tap";
import { createHttpSink } from "@mrgeoffrich/claude-agent-sdk-tap/transport";
// 1. Point the sink at your server
const sink = createHttpSink("http://localhost:8080/messages");
// 2. Use tappedQuery instead of query — every message gets POSTed to your server
for await (const msg of tappedQuery(
{ prompt: "Hello", options: {} },
{},
{ onMessage: sink.send },
)) {
// your app logic here — messages pass through unchanged
}
// 3. Flush to ensure nothing is lost
await sink.flush();Your server receives a JSON POST for each message with this shape:
{
"sequence": 1,
"timestamp": "2026-03-19T08:00:00.000Z",
"type": "assistant",
"subtype": null,
"session_id": "abc-123",
"uuid": "msg-456",
"message": { /* the raw SDK message */ }
}That's it. Every message the SDK produces — assistant responses, tool calls, system events, results — gets forwarded to your endpoint in real time.
Quick start — typed callbacks
If you don't need to forward messages and just want to react to specific message types locally:
import { tappedQuery } from "@mrgeoffrich/claude-agent-sdk-tap";
for await (const msg of tappedQuery(
{ prompt: "Hello", options: {} },
{
assistant: (msg) => console.log("model:", msg.message.model),
result: (msg) => console.log("cost:", msg.total_cost_usd),
"system:init": (msg) => console.log("tools:", msg.tools),
},
)) {
// messages pass through unchanged
}You can also combine both — use typed handlers for local logging while forwarding everything to your server. See Combining handlers and sinks below.
API
tap(source, handlers?, options?)
Wraps any AsyncIterable<SDKMessage> (from query(), or any other source) and calls handlers for each message type. Returns an AsyncGenerator<SDKMessage> that yields every message unchanged.
import { query } from "@anthropic-ai/claude-agent-sdk";
import { tap } from "@mrgeoffrich/claude-agent-sdk-tap";
const stream = query({ prompt: "Hello", options: {} });
for await (const msg of tap(stream, {
assistant: (msg) => console.log(msg.message.content),
stream_event: (msg) => process.stdout.write("."),
})) {
// process messages as normal
}tappedQuery(params, handlers?, options?)
Convenience function that calls query() and tap() in one step. Equivalent to tap(query(params), handlers, options).
Handlers
All handlers are optional. Non-system messages use their type field as the key. System messages use system:<subtype> to disambiguate.
interface TapHandlers {
// Non-system types
assistant?: TapCallback<SDKAssistantMessage>;
user?: TapCallback<SDKUserMessage | SDKUserMessageReplay>;
result?: TapCallback<SDKResultSuccess | SDKResultError>;
stream_event?: TapCallback<SDKPartialAssistantMessage>;
tool_progress?: TapCallback<SDKToolProgressMessage>;
tool_use_summary?: TapCallback<SDKToolUseSummaryMessage>;
auth_status?: TapCallback<SDKAuthStatusMessage>;
rate_limit_event?: TapCallback<SDKRateLimitEvent>;
prompt_suggestion?: TapCallback<SDKPromptSuggestionMessage>;
// System subtypes
"system:init"?: TapCallback<SDKSystemMessage>;
"system:api_retry"?: TapCallback<SDKAPIRetryMessage>;
"system:compact_boundary"?: TapCallback<SDKCompactBoundaryMessage>;
"system:status"?: TapCallback<SDKStatusMessage>;
"system:hook_started"?: TapCallback<SDKHookStartedMessage>;
"system:hook_progress"?: TapCallback<SDKHookProgressMessage>;
"system:hook_response"?: TapCallback<SDKHookResponseMessage>;
"system:task_started"?: TapCallback<SDKTaskStartedMessage>;
"system:task_progress"?: TapCallback<SDKTaskProgressMessage>;
"system:task_notification"?: TapCallback<SDKTaskNotificationMessage>;
"system:local_command_output"?: TapCallback<SDKLocalCommandOutputMessage>;
"system:files_persisted"?: TapCallback<SDKFilesPersistedEvent>;
"system:elicitation_complete"?: TapCallback<SDKElicitationCompleteMessage>;
}Options
interface TapOptions {
/** Called for every message before the specific handler. */
onMessage?: TapCallback<SDKMessage>;
/** Called when a handler throws. Defaults to swallowing errors silently. */
onError?: (error: unknown, message: SDKMessage) => void;
/** When true, async callbacks are awaited before yielding. Default: false. */
awaitCallbacks?: boolean;
}Transport sinks
The transport module provides ready-made sinks for forwarding messages to a collection server over HTTP or gRPC.
HTTP sink
Zero additional dependencies. Supports batching.
import { tappedQuery } from "@mrgeoffrich/claude-agent-sdk-tap";
import { createHttpSink } from "@mrgeoffrich/claude-agent-sdk-tap/transport";
const sink = createHttpSink("http://localhost:8080/messages");
for await (const msg of tappedQuery(
{ prompt: "Hello", options: {} },
{},
{ onMessage: sink.send },
)) {
// process as normal
}
await sink.flush(); // ensure all messages are sentHTTP sink options
createHttpSink(url, {
headers: { Authorization: "Bearer ..." }, // extra headers
batchSize: 10, // buffer up to N messages before sending (default: 1)
flushIntervalMs: 500, // flush partial batches after this delay (default: 1000)
onError: (err) => {}, // error handler (default: console.error)
});When batchSize is 1 (default), each message is POSTed individually as a JSON object. When batchSize > 1, messages are POSTed as a JSON array.
gRPC sink
Requires @grpc/grpc-js as a peer dependency (optional — only needed if you use gRPC).
npm install @grpc/grpc-jsimport { tappedQuery } from "@mrgeoffrich/claude-agent-sdk-tap";
import { createGrpcSink } from "@mrgeoffrich/claude-agent-sdk-tap/transport";
const sink = await createGrpcSink("localhost:50051");
for await (const msg of tappedQuery(
{ prompt: "Hello", options: {} },
{},
{ onMessage: sink.send },
)) {
// process as normal
}
await sink.flush(); // end stream and close connectionThe gRPC sink streams messages over a client-side streaming call. Your server should implement:
service AgentMessages {
rpc StreamMessages (stream MessageEnvelope) returns (Ack);
}Message envelope
Both sinks wrap each message in an envelope:
interface MessageEnvelope {
sequence: number; // monotonically increasing per sink
timestamp: string; // ISO-8601
type: string; // e.g. "assistant", "system", "result"
subtype: string | null; // e.g. "init", "api_retry" (system messages only)
session_id: string;
uuid: string;
message: SDKMessage; // the raw message, unmodified
}Combining handlers and sinks
You can use typed handlers for local processing while simultaneously forwarding everything to a collection server:
import { tappedQuery } from "@mrgeoffrich/claude-agent-sdk-tap";
import { createHttpSink } from "@mrgeoffrich/claude-agent-sdk-tap/transport";
const sink = createHttpSink("http://collector:8080/messages", {
batchSize: 20,
flushIntervalMs: 2000,
});
for await (const msg of tappedQuery(
{ prompt: "Analyze this codebase", options: {} },
{
assistant: (msg) => console.log(`[${msg.message.model}]`, msg.message.content),
result: (msg) => {
if (msg.type === "result" && "total_cost_usd" in msg) {
console.log(`Done. Cost: $${msg.total_cost_usd}`);
}
},
"system:init": (msg) => console.log(`Session started with ${msg.tools.length} tools`),
},
{ onMessage: sink.send },
)) {
// your app logic here
}
await sink.flush();Receiving messages on your server
If you're building the server that receives tapped messages, here's what to expect and how to handle them.
What your endpoint receives
The HTTP sink sends a POST request with Content-Type: application/json for each message (or an array if batching is enabled). Your endpoint just needs to accept JSON and return a 2xx status.
Express example
import express from "express";
const app = express();
app.use(express.json());
app.post("/messages", (req, res) => {
const envelope = req.body;
console.log(`[${envelope.sequence}] ${envelope.type}${envelope.subtype ? `:${envelope.subtype}` : ""}`);
// Route by type
switch (envelope.type) {
case "assistant":
console.log("Model response:", envelope.message.message.content);
break;
case "result":
console.log("Session finished. Cost: $" + envelope.message.total_cost_usd);
break;
case "system":
if (envelope.subtype === "init") {
console.log("Session started with", envelope.message.tools.length, "tools");
}
break;
}
res.sendStatus(200);
});
app.listen(8080);Handling batched messages
If the sender uses batchSize > 1, your endpoint receives an array instead of a single object. Handle both:
app.post("/messages", (req, res) => {
const envelopes = Array.isArray(req.body) ? req.body : [req.body];
for (const envelope of envelopes) {
// process each envelope
console.log(`[${envelope.sequence}] ${envelope.type}`);
}
res.sendStatus(200);
});Storing messages
Each envelope includes a sequence number (monotonically increasing per session) and a timestamp, so you can reconstruct the full message timeline. A simple approach is to append to a JSONL file or insert into a database:
import { appendFile } from "fs/promises";
app.post("/messages", async (req, res) => {
const envelopes = Array.isArray(req.body) ? req.body : [req.body];
for (const envelope of envelopes) {
await appendFile(
`session-${envelope.session_id}.jsonl`,
JSON.stringify(envelope) + "\n",
);
}
res.sendStatus(200);
});Message types you'll see
Here's a quick reference for the most common type values and what they mean:
| type | subtype | What it is |
|---|---|---|
| assistant | — | A model response (contains message.content with text and tool use blocks) |
| user | — | A user message or replayed user message |
| result | — | Session complete — contains total_cost_usd, duration_ms, num_turns |
| stream_event | — | Partial streaming chunk from the model |
| tool_progress | — | Progress update from a running tool |
| tool_use_summary | — | Summary after a tool finishes |
| system | init | Session started — contains available tools and session info |
| system | status | Status update (e.g. "thinking", "running tool") |
| system | api_retry | The SDK is retrying an API call |
| system | hook_started | A hook began executing |
| system | hook_response | A hook finished |
| system | task_started | A sub-task was spawned |
| system | task_progress | Sub-task progress update |
Python receiver example
from flask import Flask, request
app = Flask(__name__)
@app.post("/messages")
def receive():
body = request.json
envelopes = body if isinstance(body, list) else [body]
for env in envelopes:
msg_type = env["type"]
subtype = env.get("subtype")
label = f"{msg_type}:{subtype}" if subtype else msg_type
print(f"[{env['sequence']}] {label}")
if msg_type == "result":
print(f" Cost: ${env['message']['total_cost_usd']}")
return "", 200Re-exports
For convenience, this package re-exports query and all SDK message types from @anthropic-ai/claude-agent-sdk, so you can import everything from one place:
import { tappedQuery, query, type SDKMessage } from "@mrgeoffrich/claude-agent-sdk-tap";License
MIT
