npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

@mrgeoffrich/claude-agent-sdk-tap

v0.1.9

Published

Transparent tap/observer for the Claude Agent SDK message stream with strongly-typed callbacks

Readme

claude-agent-sdk-tap

Transparent tap/observer for the Claude Agent SDK message stream. Calls strongly-typed callbacks for each message type without modifying the stream — useful for diagnostics, debugging, logging, and forwarding messages to a collection server.

Install

npm install @mrgeoffrich/claude-agent-sdk-tap

Requires @anthropic-ai/claude-agent-sdk >=0.2.0 as a peer dependency.

Multi-turn agent with HTTP forwarding

The most common production pattern: a long-running agent (e.g. a sidecar service) that accepts user messages over an API, streams them to the SDK via an AsyncIterable, and forwards all tap messages to a collector.

import { tappedQuery } from "@mrgeoffrich/claude-agent-sdk-tap";
import { createHttpSink } from "@mrgeoffrich/claude-agent-sdk-tap/transport";
import type { SDKUserMessage } from "@mrgeoffrich/claude-agent-sdk-tap";

// --- Message queue: an async iterable that yields user messages on demand ---

class MessageQueue {
  private pending: Array<{
    resolve: (result: IteratorResult<SDKUserMessage>) => void;
  }> = [];
  private messages: SDKUserMessage[] = [];
  private done = false;

  /** Enqueue a user message (called when your API receives a request). */
  push(text: string) {
    const msg: SDKUserMessage = {
      type: "user",
      message: { role: "user", content: text } as any,
      parent_tool_use_id: null,
      session_id: "", // left empty — the tap library backfills this
    };
    if (this.pending.length > 0) {
      this.pending.shift()!.resolve({ value: msg, done: false });
    } else {
      this.messages.push(msg);
    }
  }

  /** Signal that no more messages will be sent. */
  close() {
    this.done = true;
    for (const p of this.pending) {
      p.resolve({ value: undefined as any, done: true });
    }
    this.pending = [];
  }

  [Symbol.asyncIterator](): AsyncIterator<SDKUserMessage> {
    return {
      next: () => {
        if (this.messages.length > 0) {
          return Promise.resolve({ value: this.messages.shift()!, done: false });
        }
        if (this.done) {
          return Promise.resolve({ value: undefined as any, done: true });
        }
        return new Promise((resolve) => this.pending.push({ resolve }));
      },
    };
  }
}

// --- Agent runner ---

async function runAgent(initialMessage: string) {
  const queue = new MessageQueue();
  queue.push(initialMessage);

  // Set up HTTP sink for forwarding tap messages to a collector
  const sink = createHttpSink("http://collector:8080/messages", {
    batchSize: 10,
    flushIntervalMs: 2000,
    onError: (err) => console.error("Sink error:", err),
  });

  const stream = tappedQuery(
    {
      prompt: queue,              // AsyncIterable — multi-turn input
      options: {
        model: "claude-sonnet-4-6",
        systemPrompt: "You are a helpful assistant.",
      },
    },
    {},                           // handlers (optional — add typed handlers here)
    { onMessage: sink.send },     // every message is forwarded to the collector
  );

  // Consume the output stream
  for await (const message of stream) {
    switch (message.type) {
      case "result":
        if (message.subtype === "success") {
          console.log("Turn complete:", message.result);
        }
        break;
      case "assistant":
        console.log("Response:", message.message.content);
        break;
    }
  }

  await sink.flush();
}

Session ID handling

The SDK assigns a session_id in the system:init message, but the first user message is sent before system:init arrives — so it has an empty session_id. The tap library handles this automatically:

  • Output stream: Messages with an empty session_id are buffered until a message with a real session_id arrives (typically system:init). Buffered messages are backfilled and released in original order.
  • Input stream (AsyncIterable prompt / streamInput()): Once system:init populates the session ID, subsequent user messages are backfilled before handlers fire.
  • tappedQuery.sessionId: Always reflects the latest captured session ID. Available as a property on the returned object.

This means your onMessage handler and collector always receive messages with a valid session_id — no special handling needed.

Forward all messages to your server

The simplest way to use this library is to forward every SDK message to an HTTP endpoint. Three steps:

  1. Create a sink pointed at your server
  2. Pass it as onMessage when you call the SDK
  3. Call flush() when you're done to make sure everything is sent
import { tappedQuery } from "@mrgeoffrich/claude-agent-sdk-tap";
import { createHttpSink } from "@mrgeoffrich/claude-agent-sdk-tap/transport";

// 1. Point the sink at your server
const sink = createHttpSink("http://localhost:8080/messages");

// 2. Use tappedQuery instead of query — every message gets POSTed to your server
for await (const msg of tappedQuery(
  { prompt: "Hello", options: {} },
  {},
  { onMessage: sink.send },
)) {
  // your app logic here — messages pass through unchanged
}

// 3. Flush to ensure nothing is lost
await sink.flush();

Your server receives a JSON POST for each message with this shape:

{
  "sequence": 1,
  "timestamp": "2026-03-19T08:00:00.000Z",
  "type": "assistant",
  "subtype": null,
  "session_id": "abc-123",
  "uuid": "msg-456",
  "message": { /* the raw SDK message */ }
}

That's it. Every message the SDK produces — assistant responses, tool calls, system events, results — gets forwarded to your endpoint in real time.

Quick start — typed callbacks

If you don't need to forward messages and just want to react to specific message types locally:

import { tappedQuery } from "@mrgeoffrich/claude-agent-sdk-tap";

for await (const msg of tappedQuery(
  { prompt: "Hello", options: {} },
  {
    assistant: (msg) => console.log("model:", msg.message.model),
    result: (msg) => console.log("cost:", msg.total_cost_usd),
    "system:init": (msg) => console.log("tools:", msg.tools),
  },
)) {
  // messages pass through unchanged
}

You can also combine both — use typed handlers for local logging while forwarding everything to your server. See Combining handlers and sinks below.

API

tap(source, handlers?, options?)

Wraps any AsyncIterable<SDKMessage> (from query(), or any other source) and calls handlers for each message type. Returns an AsyncGenerator<SDKMessage> that yields every message unchanged.

import { query } from "@anthropic-ai/claude-agent-sdk";
import { tap } from "@mrgeoffrich/claude-agent-sdk-tap";

const stream = query({ prompt: "Hello", options: {} });

for await (const msg of tap(stream, {
  assistant: (msg) => console.log(msg.message.content),
  stream_event: (msg) => process.stdout.write("."),
})) {
  // process messages as normal
}

tappedQuery(params, handlers?, options?)

Convenience function that calls query() and tap() in one step. Equivalent to tap(query(params), handlers, options).

Handlers

All handlers are optional. Non-system messages use their type field as the key. System messages use system:<subtype> to disambiguate.

interface TapHandlers {
  // Non-system types
  assistant?: TapCallback<SDKAssistantMessage>;
  user?: TapCallback<SDKUserMessage | SDKUserMessageReplay>;
  result?: TapCallback<SDKResultSuccess | SDKResultError>;
  stream_event?: TapCallback<SDKPartialAssistantMessage>;
  tool_progress?: TapCallback<SDKToolProgressMessage>;
  tool_use_summary?: TapCallback<SDKToolUseSummaryMessage>;
  auth_status?: TapCallback<SDKAuthStatusMessage>;
  rate_limit_event?: TapCallback<SDKRateLimitEvent>;
  prompt_suggestion?: TapCallback<SDKPromptSuggestionMessage>;

  // System subtypes
  "system:init"?: TapCallback<SDKSystemMessage>;
  "system:api_retry"?: TapCallback<SDKAPIRetryMessage>;
  "system:compact_boundary"?: TapCallback<SDKCompactBoundaryMessage>;
  "system:status"?: TapCallback<SDKStatusMessage>;
  "system:hook_started"?: TapCallback<SDKHookStartedMessage>;
  "system:hook_progress"?: TapCallback<SDKHookProgressMessage>;
  "system:hook_response"?: TapCallback<SDKHookResponseMessage>;
  "system:task_started"?: TapCallback<SDKTaskStartedMessage>;
  "system:task_progress"?: TapCallback<SDKTaskProgressMessage>;
  "system:task_notification"?: TapCallback<SDKTaskNotificationMessage>;
  "system:local_command_output"?: TapCallback<SDKLocalCommandOutputMessage>;
  "system:files_persisted"?: TapCallback<SDKFilesPersistedEvent>;
  "system:elicitation_complete"?: TapCallback<SDKElicitationCompleteMessage>;
}

Options

interface TapOptions {
  /** Called for every message before the specific handler. */
  onMessage?: TapCallback<SDKMessage>;

  /** Called when a handler throws. Defaults to swallowing errors silently. */
  onError?: (error: unknown, message: SDKMessage) => void;

  /** When true, async callbacks are awaited before yielding. Default: false. */
  awaitCallbacks?: boolean;
}

Transport sinks

The transport module provides ready-made sinks for forwarding messages to a collection server over HTTP or gRPC.

HTTP sink

Zero additional dependencies. Supports batching.

import { tappedQuery } from "@mrgeoffrich/claude-agent-sdk-tap";
import { createHttpSink } from "@mrgeoffrich/claude-agent-sdk-tap/transport";

const sink = createHttpSink("http://localhost:8080/messages");

for await (const msg of tappedQuery(
  { prompt: "Hello", options: {} },
  {},
  { onMessage: sink.send },
)) {
  // process as normal
}

await sink.flush(); // ensure all messages are sent

HTTP sink options

createHttpSink(url, {
  headers: { Authorization: "Bearer ..." },  // extra headers
  batchSize: 10,        // buffer up to N messages before sending (default: 1)
  flushIntervalMs: 500, // flush partial batches after this delay (default: 1000)
  onError: (err) => {},  // error handler (default: console.error)
});

When batchSize is 1 (default), each message is POSTed individually as a JSON object. When batchSize > 1, messages are POSTed as a JSON array.

gRPC sink

Requires @grpc/grpc-js as a peer dependency (optional — only needed if you use gRPC).

npm install @grpc/grpc-js
import { tappedQuery } from "@mrgeoffrich/claude-agent-sdk-tap";
import { createGrpcSink } from "@mrgeoffrich/claude-agent-sdk-tap/transport";

const sink = await createGrpcSink("localhost:50051");

for await (const msg of tappedQuery(
  { prompt: "Hello", options: {} },
  {},
  { onMessage: sink.send },
)) {
  // process as normal
}

await sink.flush(); // end stream and close connection

The gRPC sink streams messages over a client-side streaming call. Your server should implement:

service AgentMessages {
  rpc StreamMessages (stream MessageEnvelope) returns (Ack);
}

Message envelope

Both sinks wrap each message in an envelope:

interface MessageEnvelope {
  sequence: number;       // monotonically increasing per sink
  timestamp: string;      // ISO-8601
  type: string;           // e.g. "assistant", "system", "result"
  subtype: string | null; // e.g. "init", "api_retry" (system messages only)
  session_id: string;
  uuid: string;
  message: SDKMessage;    // the raw message, unmodified
}

Combining handlers and sinks

You can use typed handlers for local processing while simultaneously forwarding everything to a collection server:

import { tappedQuery } from "@mrgeoffrich/claude-agent-sdk-tap";
import { createHttpSink } from "@mrgeoffrich/claude-agent-sdk-tap/transport";

const sink = createHttpSink("http://collector:8080/messages", {
  batchSize: 20,
  flushIntervalMs: 2000,
});

for await (const msg of tappedQuery(
  { prompt: "Analyze this codebase", options: {} },
  {
    assistant: (msg) => console.log(`[${msg.message.model}]`, msg.message.content),
    result: (msg) => {
      if (msg.type === "result" && "total_cost_usd" in msg) {
        console.log(`Done. Cost: $${msg.total_cost_usd}`);
      }
    },
    "system:init": (msg) => console.log(`Session started with ${msg.tools.length} tools`),
  },
  { onMessage: sink.send },
)) {
  // your app logic here
}

await sink.flush();

Receiving messages on your server

If you're building the server that receives tapped messages, here's what to expect and how to handle them.

What your endpoint receives

The HTTP sink sends a POST request with Content-Type: application/json for each message (or an array if batching is enabled). Your endpoint just needs to accept JSON and return a 2xx status.

Express example

import express from "express";

const app = express();
app.use(express.json());

app.post("/messages", (req, res) => {
  const envelope = req.body;

  console.log(`[${envelope.sequence}] ${envelope.type}${envelope.subtype ? `:${envelope.subtype}` : ""}`);

  // Route by type
  switch (envelope.type) {
    case "assistant":
      console.log("Model response:", envelope.message.message.content);
      break;
    case "result":
      console.log("Session finished. Cost: $" + envelope.message.total_cost_usd);
      break;
    case "system":
      if (envelope.subtype === "init") {
        console.log("Session started with", envelope.message.tools.length, "tools");
      }
      break;
  }

  res.sendStatus(200);
});

app.listen(8080);

Handling batched messages

If the sender uses batchSize > 1, your endpoint receives an array instead of a single object. Handle both:

app.post("/messages", (req, res) => {
  const envelopes = Array.isArray(req.body) ? req.body : [req.body];

  for (const envelope of envelopes) {
    // process each envelope
    console.log(`[${envelope.sequence}] ${envelope.type}`);
  }

  res.sendStatus(200);
});

Storing messages

Each envelope includes a sequence number (monotonically increasing per session) and a timestamp, so you can reconstruct the full message timeline. A simple approach is to append to a JSONL file or insert into a database:

import { appendFile } from "fs/promises";

app.post("/messages", async (req, res) => {
  const envelopes = Array.isArray(req.body) ? req.body : [req.body];

  for (const envelope of envelopes) {
    await appendFile(
      `session-${envelope.session_id}.jsonl`,
      JSON.stringify(envelope) + "\n",
    );
  }

  res.sendStatus(200);
});

Message types you'll see

Here's a quick reference for the most common type values and what they mean:

| type | subtype | What it is | |---|---|---| | assistant | — | A model response (contains message.content with text and tool use blocks) | | user | — | A user message or replayed user message | | result | — | Session complete — contains total_cost_usd, duration_ms, num_turns | | stream_event | — | Partial streaming chunk from the model | | tool_progress | — | Progress update from a running tool | | tool_use_summary | — | Summary after a tool finishes | | system | init | Session started — contains available tools and session info | | system | status | Status update (e.g. "thinking", "running tool") | | system | api_retry | The SDK is retrying an API call | | system | hook_started | A hook began executing | | system | hook_response | A hook finished | | system | task_started | A sub-task was spawned | | system | task_progress | Sub-task progress update |

Python receiver example

from flask import Flask, request

app = Flask(__name__)

@app.post("/messages")
def receive():
    body = request.json
    envelopes = body if isinstance(body, list) else [body]

    for env in envelopes:
        msg_type = env["type"]
        subtype = env.get("subtype")
        label = f"{msg_type}:{subtype}" if subtype else msg_type
        print(f"[{env['sequence']}] {label}")

        if msg_type == "result":
            print(f"  Cost: ${env['message']['total_cost_usd']}")

    return "", 200

Re-exports

For convenience, this package re-exports query and all SDK message types from @anthropic-ai/claude-agent-sdk, so you can import everything from one place:

import { tappedQuery, query, type SDKMessage } from "@mrgeoffrich/claude-agent-sdk-tap";

License

MIT