npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

@hs-uix/stream-relay

v0.2.0

Published

Resumable, pollable stream proxy for environments where the client can't hold long connections (HubSpot UI extensions, iframe sandboxes, serverless edges).

Readme

stream-relay

npm

Resumable, pollable stream proxy for clients that can't hold long connections.

If your frontend lives somewhere with a short fetch ceiling (HubSpot UI extensions, sandboxed iframes, embedded SaaS surfaces) and your backend takes longer than that ceiling allows (LLM calls, agent runs, slow APIs), this package sits between them. The client polls a small endpoint at 400ms; the server buffers the upstream response in memory; reloads pick up exactly where they left off.

Drop-in by design

You don't change how your client makes requests, you just point at the relay instead of the upstream and use a hook for the response. You don't change how your backend works either; the upstream handler receives a write function, you call it with chunks, the relay does the rest. Two endpoints (POST /streams, GET /streams/:id), one React hook, no SSE plumbing, no WebSocket configuration, no sticky load balancing.

When to use it

Reach for stream-relay when the client environment caps individual fetch calls and you can't switch transports. HubSpot extensions are the canonical case: holding an SSE connection through hubspot.fetch is unreliable, page reloads kill the stream, and you have no good answer for resume.

If you control both ends and can use plain SSE or streamed fetch responses, you don't need this. Use those instead.

How it works

+------------------+      poll every 400ms       +------------------+    long upstream     +------------------+
|   React client   | --------------------------> |   stream-relay   | -----------------> |   LLM / agent    |
|   useStream()    | <-- { append, events }----- |     (server)     | <-- tokens ------- |  / any slow API  |
+------------------+                             +------------------+                    +------------------+

The relay buffers upstream text and structured events in memory. The client polls GET /streams/:id?since=N&eventSince=M and asks for data past its last-known offsets. On reload, the client sends the persisted offsets and gets exactly the text/events it missed. No SSE, no WebSockets, no sticky load balancing required.

Install

Published on npm as @hs-uix/stream-relay:

npm install @hs-uix/stream-relay

One package, four entry points. Bundlers only pull in what you import.

Quickstart

Client: React

import { useStream } from "@hs-uix/stream-relay/client";
import { useState } from "react";

function MyCard() {
  const [streamId, setStreamId] = useState(null);
  const [text, setText] = useState("");

  // Polls while streamId is set; each response includes only new text.
  const { isStreaming, reconnecting } = useStream({
    proxyUrl: "https://your-relay.workers.dev",
    streamId,
    fetcher: hubspot.fetch,
    onChunk: (append) => setText((t) => t + append),
    onDone: ({ meta }) => console.log("usage:", meta),
  });

  const start = async () => {
    setText("");

    // Starts the upstream work and returns immediately with a streamId.
    const res = await hubspot.fetch(`${RELAY_URL}/streams`, {
      method: "POST",
      body: JSON.stringify({ payload: { prompt: "Tell me a joke" } }),
    });
    const { streamId } = await res.json();

    // Setting streamId kicks off the polling loop above.
    setStreamId(streamId);
  };

  return (
    <>
      <button onClick={start} disabled={isStreaming}>
        {reconnecting ? "Reconnecting..." : isStreaming ? "Running..." : "Run"}
      </button>
      <div>{text}</div>
    </>
  );
}

A complete HubSpot CRM card example with serverless-function-backed property persistence lives in examples/hubspot-extension/.

Server: Cloudflare Workers

// worker.ts
import { RelayBuffer, createRelayWorker } from "@hs-uix/stream-relay/worker";

export class MyRelay extends RelayBuffer {
  constructor(state, env) {
    super(state, env, {
      upstream: async ({ payload, write }) => {
        for await (const token of callYourLLM(payload.prompt)) {
          write(token);
        }
      },
    });
  }
}

export default createRelayWorker();
# wrangler.toml
[[durable_objects.bindings]]
name = "RELAY"
class_name = "MyRelay"

[[migrations]]
tag = "v1"
new_classes = ["MyRelay"]

wrangler deploy and the relay is live.

Server: Node, Bun, or Deno via Hono

import { createRelayApp } from "@hs-uix/stream-relay/hono";
import { serve } from "@hono/node-server";

const { app } = createRelayApp({
  upstream: async ({ payload, write }) => {
    for await (const token of callYourLLM(payload.prompt)) {
      write(token);
    }
  },
});

serve({ fetch: app.fetch, port: 8787 });

Bring your own stream IDs

If you already track a runId, jobId, or any other stable identifier in your app, pass it as streamId on the start call and the relay uses it directly:

const res = await fetch(`${RELAY_URL}/streams`, {
  method: "POST",
  body: JSON.stringify({
    streamId: existingRunId,
    payload: { prompt: "..." },
  }),
});

The call is idempotent while the old record is still retained: re-posting the same streamId returns the existing stream's metadata without restarting the upstream. Saves you from persisting an extra ID alongside whatever you already track.

Resuming after reload

The hook returns the current string offset (a JavaScript UTF-16 string offset). Persist it alongside streamId (in localStorage, HubSpot card properties, your DB, anywhere). On the next mount, pass the saved offset back as resumeFrom and the hook delivers only the data that arrived while you were gone.

const { offset } = useStream({
  streamId: persisted.streamId,
  resumeFrom: persisted.offset,
  // ...
});

If the relay garbage-collected the buffer before you came back, onError fires with a StreamNotFoundError. Your app decides whether to restart the upstream or surrender.

If nobody polls a still-running stream for streamTtlMs (10 minutes by default), the relay aborts the upstream via ctx.signal and marks the stream as error. Upstream handlers should pass signal into fetch/SDK calls when possible or check signal.aborted in long loops.

Progress updates

Keep protocol lifecycle separate from app-level progress. status stays one of streaming, complete, error, or not_found; your upstream can publish custom progress through progress() without appending anything to the visible text buffer:

upstream: async ({ payload, write, progress }) => {
  progress({ phase: "retrieval", message: "Searching CRM records..." });
  const records = await searchCrm(payload.contactId);

  progress({
    phase: "generation",
    message: "Generating summary...",
    data: { recordsFound: records.length },
  });

  for await (const token of callYourLLM(records)) {
    write(token);
  }
}

The latest progress update is returned on polls and surfaced by the React hook:

const { progress } = useStream({
  streamId,
  onProgress: ({ message }) => setStatus(message ?? "Working..."),
});

Progress is also persisted by the bundled KV and Durable Object storage helpers, so reloads can restore the latest known phase/message alongside the text offset.

Structured events

Use emit() when your app needs ordered structured messages alongside the visible text stream: tool-call lifecycle events, JSON patches, usage metadata, questions, document snapshots, or app-specific UI updates.

write() and emit() are intentionally separate channels:

  • write(chunk) appends to the plain text buffer and is consumed with onChunk.
  • emit(event) appends to the structured event log and is consumed with onEvent.
  • progress(update) stores the latest progress snapshot and is consumed with onProgress.
upstream: async ({ payload, write, emit, progress }) => {
  progress({ phase: "retrieval", message: "Searching records..." });

  emit({ type: "tool.start", name: "search_knowledge" });
  const records = await searchKnowledge(payload.query);
  emit({ type: "tool.end", name: "search_knowledge", count: records.length });

  emit({
    type: "patch",
    op: { op: "add", path: "/sources", value: records },
  });

  write("Here is the answer: ");
  write(await summarize(records));

  emit({ type: "usage", inputTokens: 123, outputTokens: 45 });
}

The relay adds timestamp when an event omits it. Events are replayable with their own event offset, so reconnects do not duplicate already-consumed events:

type AppEvent =
  | { type: "tool.start"; name: string; timestamp?: number }
  | { type: "patch"; op: JsonPatchOperation; timestamp?: number };

const { offset, eventOffset } = useStream<unknown, unknown, AppEvent>({
  streamId,
  resumeFrom: persisted.offset,
  resumeEventsFrom: persisted.eventOffset,
  onChunk: (append, nextOffset) => {
    setText((text) => text + append);
    save({ offset: nextOffset });
  },
  onEvent: (event, nextEventOffset) => {
    switch (event.type) {
      case "tool.start":
        showTool(event.name);
        break;
      case "patch":
        applyPatch(event.op);
        break;
    }
    save({ eventOffset: nextEventOffset });
  },
});

Event ordering is guaranteed relative to other events, and text ordering is guaranteed relative to other text. Events are retained in memory until the stream is garbage-collected, so prefer write()/onChunk for high-frequency token text unless you specifically need text to be part of the structured event log.

If your UI requires text deltas to participate in the same ordered log as tool events, emit text as events yourself:

emit({ type: "text.delta", text: "Hello" });
emit({ type: "tool.start", name: "search" });
emit({ type: "text.delta", text: " world" });

Persistence (optional)

Default behavior is in-memory. If the server restarts, in-flight streams die with it. For most uses (single-region edge deploy, LLM proxying) this is fine. When you need completed stream output to survive deploys, evictions, or multi-day resume windows, opt into one of the bundled persistence helpers.

Cloudflare: Durable Object storage

import { RelayBuffer, withDurableStorage } from "@hs-uix/stream-relay/worker";

export class MyRelay extends RelayBuffer {
  constructor(state, env) {
    super(state, env, withDurableStorage(state, {
      upstream: async ({ payload, write }) => { /* ... */ },
    }));
  }
}

Every chunk is debounced into the DO's storage. On rehydrate (eviction, deploy, cold start), completed or errored buffers are read back transparently. If eviction interrupts an actively-running upstream, the stream rehydrates as an error because arbitrary upstream work cannot be resumed generically.

Hono / Node / anywhere: KV interface

import { createRelayApp, withKvStorage } from "@hs-uix/stream-relay/hono";

const myKv = {
  get: (k) => redis.get(k),
  set: (k, v) => redis.set(k, v),
  delete: (k) => redis.del(k),
};

const { app } = createRelayApp(withKvStorage(myKv, {
  upstream: async ({ payload, write }) => { /* ... */ },
}));

withKvStorage accepts any { get, set, delete? } shape. Drop in Redis, Upstash, Cloudflare KV (via kvFromCloudflare), or your own database. Writes are debounced (default 500ms); set flushIntervalMs: 0 for synchronous-per-chunk durability. Use prefix to namespace keys when sharing a store.

Local SQLite

For a local Node/Hono relay (not Cloudflare Workers), use withSqliteStorage. The package does not bundle a sqlite driver; pass an opened database from better-sqlite3, node:sqlite, or an async wrapper that exposes run/get.

import Database from "better-sqlite3";
import { createRelayApp, withSqliteStorage } from "@hs-uix/stream-relay/hono";

const db = new Database("stream-relay.db");

const { app } = createRelayApp(withSqliteStorage(db, {
  tableName: "stream_relay",
  ttlSeconds: 60 * 60 * 24, // optional: expire rows after 1 day
  upstream: async ({ payload, write, emit }) => { /* ... */ },
}));

withSqliteStorage creates a simple table on first use by default and stores the same persisted stream snapshots as the KV helper. It supports the same flushIntervalMs, ttlSeconds, and deleteOnGc options. It is exported from /server and /hono; Workers should use withDurableStorage() instead.

Roll your own

The persistence helpers are thin wrappers over optional callbacks on RelayOptions. If you want different semantics (chunked storage, compression, multi-tenant key prefixing), wire onAppend, onProgress, onComplete, onError, and hydrate directly. The relay core never reaches into storage.

Auth

This package does not ship built-in auth. Every host has different requirements (HubSpot install verification, JWT, mTLS, API keys), so we leave the choice to you:

createRelayWorker({
  auth: ({ request, env, streamId, method }) => {
    if (!isValidHubSpotInstall(request, env, streamId, method)) {
      return new Response("unauthorized", { status: 401 });
    }
  },
});

The same auth option works on both the Worker and Hono adapters. Worker auth receives { request, env, streamId, method }; Hono auth receives the Hono Context.

API reference

useStream(options) from @hs-uix/stream-relay/client

| Option | Default | Purpose | |---|---|---| | proxyUrl | required | Base URL of your deployed relay | | streamId | required | Stream to subscribe to (null = inert) | | fetcher | global fetch | HTTP client (use hubspot.fetch in extensions) | | intervalMs | 400 | Poll cadence | | resumeFrom | 0 | JavaScript string offset, or "live" to skip text backlog | | resumeEventsFrom | 0 ("live" when resumeFrom is "live") | Structured event offset, or "live" to skip event backlog | | reconnect.serverStallMs | 90000 | Give-up threshold for silent server | | reconnect.staleResyncMs | 3000 | Reconnecting UI hint threshold | | onChunk | optional | Receives text appended since the previous text offset | | onEvent | optional | Receives structured events and their next event offset | | onProgress | optional | Receives latest app-level progress updates |

Full JSDoc on every option in packages/client/index.ts.

createRelay(options) from @hs-uix/stream-relay/server

Framework-agnostic core if you're rolling your own HTTP layer. Returns { handleStart, handlePoll } as pure async functions. streamTtlMs controls both finished-buffer retention and inactivity aborts for still-running streams.

createRelayApp(options) from @hs-uix/stream-relay/hono

Hono app with POST /streams and GET /streams/:id mounted. Runs on Node, Bun, Deno, Vercel, AWS Lambda, or Cloudflare Pages.

RelayBuffer, createRelayWorker(), withDurableStorage() from @hs-uix/stream-relay/worker

Cloudflare Workers + Durable Object. Subclass RelayBuffer to wire your upstream; createRelayWorker() returns the routing fetch handler; withDurableStorage() opts into DO-backed durability.

withKvStorage(kv, options) from @hs-uix/stream-relay/server

Persistence helper for any KV-shaped store. Accepts { get, set, delete? }. Use directly with createRelay or wrap a Hono app's options.

withSqliteStorage(db, options) from @hs-uix/stream-relay/server

Local SQLite persistence helper for Node/Hono relays. Accepts a driver-neutral database object with prepare() (better-sqlite3 / node:sqlite) or run/get methods. Use directly with createRelay or wrap a Hono app's options. Not exported from the Worker entry point.

Wire protocol

The contract is small enough to reimplement in any language:

POST /streams          { streamId?, payload? }
                       -> { protocolVersion: 2, streamId, startedAt }

GET  /streams/:id?since=N&eventSince=M
                       -> { protocolVersion: 2, streamId, status, complete,
                            completed_at?, progress?, append, nextOffset,
                            events, nextEventOffset,
                            lastEventAt, serverNow,
                            final?: { text, events?, meta? }, error?,
                            errorInfo?: { code, message } }

status is one of streaming, complete, error, not_found. complete is true only after the upstream resolves successfully; completed_at is populated at that point with server time in ms since epoch. progress is the latest app-level progress update, shaped as { phase?, message?, data?, updated_at }. since and nextOffset are JavaScript string offsets (UTF-16 code units), matching the package's string-in/string-out API. eventSince and nextEventOffset are event-log offsets (counts, not byte/string offsets). Clients use serverNow - lastEventAt > serverStallMs to detect dead streams without trusting their own clock.

What we tested

The package was built test-driven. Validation runs through five layers, each catching a different class of bug.

Unit tests against the framework-agnostic core

Direct calls into handleStart and handlePoll, no HTTP. Covers happy-path streaming, upstream throws surfaced as status=error, unknown streamId returning not_found, out-of-range since clamped to buffer length, idempotent start (second POST with same id returns existing metadata, upstream runs once), heartbeat advancing lastEventAt without polluting the buffer, persistence hooks firing in order with correct chunk and offset arguments, rehydrate hook supplying state when a stream is missing from memory, resume from arbitrary offset returning only new bytes, and concurrent polls returning identical state.

Hono integration tests over real HTTP

A real @hono/node-server on a random port, real fetch calls. Validates: end-to-end POST plus poll loop until complete with byte-perfect accumulated content, not_found returned for unknown stream id, mid-stream disconnect-then-resume yielding zero gaps and zero overlap, three concurrent clients on the same stream observing identical content, and idempotent start where the second POST reuses the existing stream rather than restarting the upstream.

Stress tests

Bursty 25-token stream with periodic 800ms silent phases protected by server heartbeats, completing without false stall detection. 50 concurrent streams started in parallel, each verified for byte-perfect content and unique stream id. Five-cycle reload chaos on a single stream (poll for 50ms, pause 40ms, repeat five times, then drain to completion) reconstructing the full expected output.

Live test against Hono

Spawns the built dist/hono.mjs artifact, runs basic streaming, mid-stream resume, and 10 concurrent streams. Catches packaging bugs that pre-build tests miss.

Live test against Cloudflare Workers

Against wrangler dev --local with real Durable Objects. Same three scenarios as the Hono live test, plus DO routing via idFromName (10 concurrent streams across 10 distinct DO instances, each with isolated buffer state).

Soak test

A single long-running stream of 720 tokens at 1-second cadence, with 4-second silent pauses every 10 tokens (during which the server emits heartbeats), targeting roughly 12 minutes of total runtime. The client polls at 400ms throughout, persists its offset at the 5-minute mark, simulates a 3-second reload, then resumes. Cross-checked locally past the 5-minute reload mark with withDurableStorage enabled and zero data loss across the resume; server-reported lastEventAt gap stayed under 2 seconds throughout, well below the 90-second client stall threshold. Longer runtimes (10+ minute windows, real LLM upstreams) have been validated in deployed environments outside wrangler dev.

Comparisons

A few things in adjacent territory and why they don't fit this problem.

Vercel's resumable-stream package solves resume after a dropped SSE connection using Redis pub/sub. It assumes the client environment can hold an SSE connection in the first place, which doesn't help when the fetch surface caps you at 15 seconds.

Inngest, Trigger.dev, Defer, and similar workflow engines durably execute long-running jobs. They're full backends with their own opinions, and their clients are built for "render the final result" rather than token-by-token UI streaming.

Convex, Liveblocks, and Replicache are sync engines. They solve a superset of this problem at the cost of adopting an entire backend paradigm.

Status

Pre-alpha. The wire protocol and hook surface may shift before 1.0. Issues and feedback welcome.

License

MIT