npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

@vibes.diy/call-ai-v2

v2.4.3

Published

A composable stream processing pipeline for parsing OpenRouter/OpenAI streaming API responses. Each transform stream handles one layer of parsing and emits typed messages that flow through the pipeline.

Readme

call-ai/v2 Stream Pipeline

A composable stream processing pipeline for parsing OpenRouter/OpenAI streaming API responses. Each transform stream handles one layer of parsing and emits typed messages that flow through the pipeline.

Architecture Overview

Uint8Array (HTTP response body)
    │
    ▼
┌─────────────────────┐
│  StatsCollector     │  Injects stats.collect triggers at intervals
└─────────────────────┘
    │
    ▼
┌─────────────────────┐
│  LineStream         │  Bytes → line.begin/line/end (SSE wire format)
└─────────────────────┘
    │
    ▼
┌─────────────────────┐
│  DataStream         │  Lines → data.begin/line/end (parses "data: {json}")
└─────────────────────┘
    │
    ▼
┌─────────────────────┐
│  SseStream          │  JSON → sse.begin/line/end (validates OpenRouter schema)
└─────────────────────┘
    │
    ▼
┌─────────────────────┐
│  DeltaStream        │  SSE chunks → delta.begin/line/image/usage/end
└─────────────────────┘
    │
    ▼
┌─────────────────────────────────────────────────────────┐
│  SectionsStream(streamId, createId)                     │
│  ┌────────────────────────────────────────────────────┐ │
│  │  On delta.begin, creates inner pipeline:           │ │
│  │                                                    │ │
│  │  delta.line content ──► LineStream ──► BlockStream │ │
│  │         (bytes)          (markdown)    (sections)  │ │
│  │                                                    │ │
│  │  BlockStream is not passthrough (prevents duplication) │
│  └────────────────────────────────────────────────────┘ │
│  Outputs: block.begin/toplevel/code/image/end           │
└─────────────────────────────────────────────────────────┘

createId: () => string  — any function returning unique IDs,
                          passed to DeltaStream, SectionsStream, BlockStream

CLI Tool

Test the pipeline with captured SSE files or live API calls.

cd call-ai/v2

# Capture raw SSE to file for replay/testing
pnpm cli --prompt "Write a React counter app" --model openai/gpt-4o-mini --raw > captured.sse

# Live API call with delta output
pnpm cli --prompt "Hello world" --model openai/gpt-4o-mini --delta

# From captured file
pnpm cli --src captured.sse --block

# All events unfiltered
pnpm cli --src captured.sse --all

# Save images from vision model responses
pnpm cli --src captured-image.sse --image --image-dir ./output

Flags:

  • --line - Show line-level events
  • --data - Show data-level events
  • --sse - Show SSE-level events
  • --delta - Show delta-level events
  • --block - Show block-level events
  • --full - Accumulate and show complete sections
  • --all - Show all events unfiltered
  • --stats - Show stats events
  • --image - Fetch and display image metadata
  • --image-dir <path> - Save images to directory

Pipeline Usage

import {
  createStatsCollector,
  createLineStream,
  createDataStream,
  createSseStream,
  createDeltaStream,
  createSectionsStream,
  isDeltaLine,
  isCodeLine,
  isBlockImage,
} from "call-ai/v2";
import { ensureSuperThis } from "@fireproof/core-runtime";

const sthis = ensureSuperThis();
const streamId = sthis.nextId().str;
const createId = () => sthis.nextId().str;

const pipeline = response.body
  .pipeThrough(createStatsCollector(streamId, 1000))
  .pipeThrough(createLineStream(streamId))
  .pipeThrough(createDataStream(streamId))
  .pipeThrough(createSseStream(streamId))
  .pipeThrough(createDeltaStream(streamId, createId))
  .pipeThrough(createSectionsStream(streamId, createId));

for await (const msg of pipeline) {
  if (isDeltaLine(msg)) {
    // Raw content delta from LLM
    process.stdout.write(msg.content);
  }
  if (isCodeLine(msg)) {
    // Code block line with language info
    console.log(`[${msg.lang}] ${msg.line}`);
  }
  if (isBlockImage(msg)) {
    // Image URL from vision model
    console.log(`Image: ${msg.url}`);
  }
}

Stream Modules

stats-stream.ts

Injects stats.collect trigger messages at configurable intervals. Each downstream stream responds by emitting its own *.stats message with current counts.

Messages: stats.collect

line-stream.ts

Converts raw bytes into newline-delimited lines.

Input: Uint8Array | string Messages: line.begin, line.line, line.end, line.stats

data-stream.ts

Parses SSE format, extracting JSON from data: {json} lines.

Input: LineStreamMsg Messages: data.begin, data.line, data.error, data.end, data.stats

sse-stream.ts

Validates JSON against OpenRouter's streaming chunk schema using Arktype.

Input: DataStreamMsg Messages: sse.begin, sse.line, sse.error, sse.end, sse.stats

Key types:

  • SseChunk - Validated OpenRouter response chunk
  • SseUsage - Token usage stats
  • SSeImage - Image URL from vision models

delta-stream.ts

Extracts content deltas, images, and usage from validated SSE chunks.

Input: SseStreamMsg Messages: delta.begin, delta.line, delta.image, delta.usage, delta.end, delta.stats

sections-stream.ts

Parses markdown structure from accumulated content, detecting code fences and images.

Input: DeltaStreamMsg Messages: block.begin, block.toplevel.begin/line/end, block.code.begin/line/end, block.image, block.end, block.stats

block.code.* carries an optional path field, derived aider-style from the most-recent non-blank toplevel line preceding the fence (if it looks like a relative path with a recognized extension). Falls back to App.jsx otherwise.

fence-body-parser.ts

Pure function that turns the lines inside a code fence into Edit[]. A body with no markers is a single create. A body with <<<<<<< SEARCH / ======= / >>>>>>> REPLACE markers becomes one or more replace edits (multiple sections allowed in one fence).

apply-edits.ts

Pure helpers applyReplace and applyEdits. applyReplace first tries an exact match; on failure it falls back to a trailing-whitespace-tolerant match. Result reports matchKind (exact | trailing-ws | ellipsis).

SEARCH blocks support ... shortcuts: a line ending in ... is a single-line prefix match, a line that is just ... (or starts with ...) is a multi-line skip, and ... appearing mid-line is treated as literal text. REPLACE bodies are always literal — ... carries no special meaning there.

filesystem-stream.ts

Aider-style virtual filesystem stage. Sits after sections-stream. Owns a VirtualFS = Map<path, string> for the life of one streamed turn, seeded from the caller-supplied seed (typically the saved App.jsx). Each block.code.end is parsed via parseFenceBody; the resulting edits are applied with applyEdits. On success, emits fs.file.snapshot. Failed sections (parse errors, missing or ambiguous SEARCH) emit fs.apply.error and leave the VFS unchanged. On block.end, emits fs.turn.end with the final files map.

Input: BlockStreamMsg Messages added: fs.file.snapshot, fs.apply.error, fs.turn.end

The Passthrough Pattern

Streams use passthrough() to automatically forward all upstream messages while adding their own:

import { passthrough } from "./passthrough.js";

new TransformStream({
  transform: passthrough((msg, controller) => {
    // Input is already enqueued by passthrough()
    // Only emit NEW messages for events you handle
    if (isSomeTrigger(msg)) {
      controller.enqueue({ type: "my.event", ... });
    }
  }),
});

This means consumers see ALL messages from every layer. Use type guards to filter:

// See only delta-level events
if (isDeltaMsg(msg)) { ... }

// See only code blocks
if (isCodeLine(msg)) { ... }

Type Guards

Every message type has a corresponding type guard with optional streamId filtering:

// Check message type
if (isDeltaLine(msg)) {
  console.log(msg.content); // TypeScript knows msg is DeltaLineMsg
}

// Filter by streamId (for multiplexed streams)
if (isDeltaLine(msg, "stream-123")) {
  // Only matches DeltaLineMsg where streamId === "stream-123"
}

Message Naming Conventions

  • *Msg suffix for message types: DeltaLineMsg, SseLineMsg
  • *Seq suffix for sequence numbers: deltaSeq, choiceSeq, imageSeq
  • *Id suffix for identifiers: streamId, imageId, sectionId
  • *Nr suffix for counts: lineNr, chunkNr, blockNr

Stats Collection

Stats flow through the pipeline:

  1. StatsCollector injects stats.collect at intervals
  2. Each stream responds with its own *.stats message
  3. Final stats.collect emitted on stream close
if (isLineStats(msg)) console.log("Line stats:", msg.stats);
if (isDataStats(msg)) console.log("Data stats:", msg.stats);
if (isSseStats(msg)) console.log("SSE stats:", msg.stats);
if (isDeltaStats(msg)) console.log("Delta stats:", msg.stats);
if (isBlockStats(msg)) console.log("Block stats:", msg.stats);

Filesystem stage usage

import { createSectionsStream, createFileSystemStream, isFsFileSnapshot, isFsTurnEnd } from "call-ai/v2";

const seed = new Map([["App.jsx", priorAppJsx]]);

const pipeline = response.body
  // …line/data/sse/delta stages…
  .pipeThrough(createSectionsStream(streamId, createId))
  .pipeThrough(createFileSystemStream({ streamId, createId, seed }));

for await (const msg of pipeline) {
  if (isFsFileSnapshot(msg)) {
    // Update live preview with msg.content for msg.path
  }
  if (isFsTurnEnd(msg)) {
    // Persist msg.files to the session doc
  }
}

Current Status

Production: this pipeline is the live streaming path for vibes.diy chat (prompt-chat-section.ts pipes the LLM response body through line → data → sse → delta → sections, and the client reducer consumes the typed block messages directly).

Also used by:

  • CLI tool (cli.ts) for replay/debugging captured SSE files
  • Unit tests across the v2 modules

TODO

  • [ ] Chunked image decoding: Add createImageDecodeStream that fetches image URLs, decodes to bytes, and emits image.begin/image.fragment/image.end with shared imageId for streaming large images in fixed-size chunks
  • [ ] Production worker: Deploy pipeline to Cloudflare Worker with events as network transport, client consumes typed events directly instead of raw SSE