claude-stream-collector
v0.1.0
Published
Collect Claude streaming events into a typed, structured result. Zero dependencies. Drop-in ergonomic wrapper for the Anthropic SDK stream.
Maintainers
Readme
claude-stream-collector
Consume Anthropic Messages streaming events; return a structured result. Zero dependencies. Bring your own SDK or your own SSE parser.
import Anthropic from "@anthropic-ai/sdk";
import { collectStream } from "claude-stream-collector";
const client = new Anthropic();
const stream = client.messages.stream({
model: "claude-opus-4-7",
max_tokens: 1024,
messages: [{ role: "user", content: "Say hello in markdown." }],
});
const result = await collectStream(stream, {
onTextDelta: (d) => process.stdout.write(d),
onToolUseComplete: (t) => console.log("[tool]", t.name, t.input),
});
console.log(result.text); // full assembled text
console.log(result.toolUses); // parsed tool_use blocks with typed input
console.log(result.usage); // { input_tokens, output_tokens, cache_* }
console.log(result.stopReason); // 'end_turn' | 'tool_use' | 'max_tokens' | ...Why this exists
The Anthropic SDK's messages.stream() is great, but if you've used it in production you've hit one of these:
- Tool-use input arrives as JSON deltas — you have to manually accumulate
input_json_deltapartial strings, then parse atcontent_block_stop. Every Claude wrapper team rewrites the same loop. - Cache token fields disappear in
message_delta— the SDK overwrites instead of merges, socache_read_input_tokensfrommessage_startgets lost in the final usage. - You want a clean "give me the final structured result" function without writing 80 lines of state machine boilerplate.
This package is ~120 lines of TypeScript that handles all that, returns a typed CollectedResult, and stays out of your way.
Install
npm install claude-stream-collector
# or
pnpm add claude-stream-collectorAPI
collectStream(events, callbacks?)
Consumes any AsyncIterable<StreamEvent> and returns a CollectedResult.
CollectedResult
interface CollectedResult {
text: string; // all text blocks concatenated
toolUses: ToolUseBlock[]; // tool_use blocks with parsed input
blocks: ContentBlock[]; // all blocks in original order
usage: TokenUsage; // input/output + cache tokens, merged
stopReason: string | null; // 'end_turn' | 'tool_use' | etc.
stopSequence: string | null;
model: string;
messageId: string;
}Callbacks (all optional)
interface CollectStreamCallbacks {
onTextDelta?: (delta: string) => void;
onToolUseStart?: (block: { id: string; name: string; index: number }) => void;
onToolUseComplete?: (block: ToolUseBlock) => void; // input fully parsed
onMessageStop?: (result: CollectedResult) => void;
onError?: (err: Error) => void; // non-fatal (e.g. malformed tool JSON)
}Tool-use example
const stream = client.messages.stream({
model: "claude-opus-4-7",
max_tokens: 1024,
tools: [/* ... */],
messages: [/* ... */],
});
const result = await collectStream(stream, {
onToolUseComplete: async (toolCall) => {
// toolCall.input is already JSON-parsed and typed as Record<string, unknown>
if (toolCall.name === "search_web") {
const { query } = toolCall.input as { query: string };
const results = await searchWeb(query);
// ...feed back to Claude in a follow-up turn
}
},
});
if (result.stopReason === "tool_use") {
// Claude wants to call tools; handle them and continue the conversation
}Cache token tracking
The Anthropic SDK currently overwrites cache token fields in the final usage object. This collector merges them across message_start and message_delta, so you get the full picture:
const result = await collectStream(stream);
console.log(result.usage);
// {
// input_tokens: 100,
// output_tokens: 230,
// cache_creation_input_tokens: 1500, ← preserved
// cache_read_input_tokens: 8000, ← preserved
// }Bring your own stream
You don't need @anthropic-ai/sdk. Any AsyncIterable<StreamEvent> works — useful if you're parsing raw SSE yourself, replaying a recorded stream for tests, or proxying through your own server.
async function* fromSSE(response: Response): AsyncIterable<StreamEvent> {
// parse `data: {...}\n\n` lines from response.body...
yield event;
}
const result = await collectStream(fromSSE(response));Tests & build
npm install
npm test # vitest run
npm run build # tscLicense
MIT — © 2026 xiangnuans
Part of a series
This package is the first in a series of small, focused Claude SDK helpers built in public. Follow along: ship-log-2026.
