npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

@loro-dev/streams-crdt

v0.6.1

Published

Transport/runtime layer for synchronizing CRDT state over Durable Streams.

Readme

@loro-dev/streams-crdt

Transport layer for synchronizing CRDT state over Loro Streams (Durable Streams).

Core Concepts

Two-class model

| Class | Responsibility | | --- | --- | | StreamsCrdt | Transport runtime bound to one stream URL. Handles auth, reconnect, bootstrap, SSE/long-poll, binary framing, and remote cursor persistence. | | CrdtAdapter | Binding for one local CRDT instance. Handles snapshot export/import, version tracking, local update export, remote update application, and optional isolated 410 recovery inference. |

Operations

| Method | Purpose | | --- | --- | | createStream() | Creates the stream on the server. Returns { created: true } on first call, { created: false } if it already exists. | | deleteStream() | Deletes the stream. Closes any active join() first. | | sync() | One-shot bootstrap/catch-up cycle. Syncs remote state into local CRDT and pushes any local changes to the server. Does not enter live mode. | | join() | Initial sync plus live subscription. Internally performs the same sync as sync(), then enters a persistent SSE/long-poll read loop and starts forwarding local writes. No need to call sync() before join(). | | close() | Closes any active join and releases resources. |

join() includes an initial sync. You do not need to call sync() before join(). When join() resolves, the local CRDT is already caught up with the server and the live subscription is active.

Error handling — Result, not exceptions

All transport operations return a Result<T, TransportError> instead of throwing. Callers must check .ok before accessing the value:

const result = await transport.join();
if (!result.ok) {
  // result.error is a TransportError with a discriminated `code` field.
  switch (result.error.code) {
    case "stream_not_found":
      // The stream doesn't exist yet — create it first.
      break;
    case "auth_failed":
      // Token expired or unauthorized.
      break;
    case "protocol_error":
    case "internal_error":
      // Non-retryable: bad server response or local runtime/adapter failure.
      break;
    case "network_error":
    case "timeout":
      // Transient — safe to retry.
      break;
    default:
      console.error("transport error:", result.error);
  }
  return;
}

const sub = result.value; // TransportSubscription

Every error carries a retryable flag so callers can decide whether to retry without hard-coding error codes:

if (!result.ok && result.error.retryable) {
  // safe to retry after a delay
}

Why Result instead of throw? Transport failures (network errors, auth expiry, missing streams) are expected at runtime, not exceptional. Wrapping them in Result makes the error path explicit in the type system so callers cannot accidentally ignore failures with a missing try/catch.

Quick Start (Loro)

import { LoroDoc } from "loro-crdt";
import {
  createStreamUrl,
  createLoroDocAdapter,
  StreamsCrdt,
} from "@loro-dev/streams-crdt/loro";

const doc = new LoroDoc();
const transport = new StreamsCrdt({
  streamUrl: createStreamUrl({
    bucketId: "my-bucket",
    streamId: "doc-1",
  }),
  auth: async () => "<gateway-jwt>",
  adapter: createLoroDocAdapter(doc),
  // Enable automatic snapshot upload so new readers bootstrap faster.
  snapshotUpload: { canUpload: async () => true },
});

// Create the stream (idempotent).
const created = await transport.createStream();
if (!created.ok) throw created.error;

// Join enters live collaborative editing mode.
// This performs an initial sync automatically before entering live mode.
const joined = await transport.join({
  onStatusChange(status) {
    console.log("room status:", status);
  },
});
if (!joined.ok) throw joined.error;

const sub = joined.value;

// The doc is now syncing in real time.
// Call sub.unsubscribe() or transport.close() to leave.

createLoroDocAdapter(doc) requires an attached LoroDoc. If the document was checked out or detached, call checkoutToLatest() or attach() before passing it to StreamsCrdt.

Quick Start (Flock)

import { Flock } from "@loro-dev/flock-wasm";
import {
  createStreamUrl,
  createFlockAdapter,
  StreamsCrdt,
} from "@loro-dev/streams-crdt/flock";

const flock = new Flock();
const transport = new StreamsCrdt({
  streamUrl: createStreamUrl({
    bucketId: "my-bucket",
    streamId: "room-1",
  }),
  auth: async () => "<gateway-jwt>",
  adapter: createFlockAdapter(flock),
});

await transport.createStream();
const joined = await transport.join();
if (!joined.ok) throw joined.error;

const sub = joined.value;

When to Use sync() vs join()

Use sync() when you need a one-shot catch-up without entering live mode:

// Offline-first: catch up on app start, then work offline.
const synced = await transport.sync();
if (!synced.ok) throw synced.error;
// synced.value.cursor contains the replay progress after sync.

Use join() for real-time collaboration. It does everything sync() does and then keeps the connection alive for continuous bidirectional sync:

const joined = await transport.join();

Delete Stream

const deleted = await transport.deleteStream();
if (!deleted.ok) throw deleted.error;

console.log(deleted.value.deleted); // true when deleted now, false when already missing

deleteStream() closes any active join first. When the configured remoteCursorStore implements delete(streamUrl) (the built-in stores do), the transport also clears the stored replay cursor for that stream.

Auth

auth may be:

  • a static string
  • a sync callback returning a string
  • an async callback returning a string

StreamsCrdt asks for a token before each request. If a request comes back with 401 or 403, it calls auth one more time with { reason: "unauthorized", status, previousToken } and retries that request once.

This lets the caller keep a cached token in the normal path, and only refresh after the server rejects it:

let token = readCachedGatewayJwt();

const transport = new StreamsCrdt({
  streamUrl,
  adapter,
  auth: async (context) => {
    if (context?.reason === "unauthorized") {
      token = await refreshGatewayJwt();
    }
    return token;
  },
});

If your callback already returns a fresh token every time, you can ignore the context argument.

Remote Cursor Store

The RemoteCursorStore persists replay progress metadata only — it does not store any CRDT update data. A remote cursor contains:

  • nextOffset — where the transport should resume reading from
  • serverLowerBoundVersion — the CRDT version that the server data represents
  • streamUrl — identity of the stream

Important: cursor-only persistence is unsafe

If RemoteCursorStore.load() returns a saved cursor, the transport resumes from nextOffset and assumes the caller has already restored the matching local CRDT state represented by serverLowerBoundVersion.

If the local CRDT was reset to empty but the cursor was persisted, the transport will do delta-only catch-up from nextOffset — applying those deltas against an empty document will fail or produce an invalid state.

Rule of thumb

| Local CRDT state | Cursor store | Safe? | | --- | --- | --- | | Not persisted (ephemeral) | InMemoryRemoteCursorStore (default) | Yes | | Persisted (IndexedDB, SQLite, etc.) | IndexedDbRemoteCursorStore + beforeRemoteCursorSave hook | Yes | | Not persisted | IndexedDbRemoteCursorStore | No — data loss risk |

If you do not persist the local CRDT payload across reloads, keep the remote cursor ephemeral too. The default InMemoryRemoteCursorStore is the safe choice.

Hooking Local Durability

Use beforeRemoteCursorSave to couple local payload persistence with remote cursor advancement. The transport advances the cursor only after:

  1. Remote data was applied locally
  2. The durability hook succeeded
  3. The cursor store write succeeded
const transport = new StreamsCrdt({
  streamUrl: createStreamUrl({
    bucketId: "my-bucket",
    streamId: "doc-1",
  }),
  adapter: createLoroDocAdapter(doc),
  beforeRemoteCursorSave: async ({ cursor, source }) => {
    // Persist local CRDT state before cursor advances.
    // `source` is "local" | "remote" | "bootstrap".
    await persistLocalPayload(doc);
    console.log(source, cursor.nextOffset);
  },
});

persistLocalPayload(doc) must make the local durable state correspond to the cursor that is about to be stored. On the next app start, restore that local state before calling sync() or join().

Persistent Cursor Example

Only use a persisted RemoteCursorStore such as IndexedDbRemoteCursorStore when you also persist and restore the local CRDT state for the same stream.

import { LoroDoc } from "loro-crdt";
import {
  createStreamUrl,
  createLoroDocAdapter,
  IndexedDbRemoteCursorStore,
  StreamsCrdt,
} from "@loro-dev/streams-crdt/loro";

// Step 1: restore local CRDT state first.
const doc = (await restorePersistedDoc("doc-1")) ?? new LoroDoc();

const transport = new StreamsCrdt({
  streamUrl: createStreamUrl({
    bucketId: "my-bucket",
    streamId: "doc-1",
  }),
  auth: async () => "<gateway-jwt>",
  adapter: createLoroDocAdapter(doc),
  // Step 2: use a persistent cursor store.
  remoteCursorStore: new IndexedDbRemoteCursorStore({
    dbName: "my-app-ds-cursors",
  }),
  // Step 3: persist local state before cursor advances.
  beforeRemoteCursorSave: async () => {
    await persistLocalPayload("doc-1", doc);
  },
});

// join() handles initial sync + live mode.
const joined = await transport.join();

The required ordering is:

  1. Restore the local CRDT state first
  2. Let the transport apply remote updates
  3. Persist the updated local CRDT state (via beforeRemoteCursorSave)
  4. Only then persist the remote cursor

If you cannot provide steps 1 and 3, do not use a persisted remote cursor store.

Snapshot Codec Hooks

Use snapshotCodec when you want transport to transform full snapshots during upload and bootstrap:

const transport = new StreamsCrdt({
  streamUrl,
  adapter: createLoroDocAdapter(doc),
  snapshotCodec: {
    // Sync or async are both allowed.
    compress: async (snapshot) => snapshot,
    decompress: (snapshot) => snapshot,
  },
});

Behavior

  • snapshotCodec.compress(snapshot) runs right before a snapshot is uploaded to Loro Streams
  • snapshotCodec.decompress(snapshot) runs right after a non-empty snapshot is downloaded from Loro Streams and right before adapter.applySnapshot(...)
  • Both hooks must be provided together
  • Both hooks may return either Uint8Array or Promise<Uint8Array>
  • These hooks affect full snapshots only. Incremental update batches are unchanged

Compatibility

Transport does not version or negotiate snapshot codecs for you.

If you enable snapshotCodec, you must make sure decompress can still read any older snapshots that are already stored for that stream. This is especially important when:

  • existing snapshots were uploaded without compression
  • you change the compression format later
  • you roll out the codec gradually across clients

@loro-dev/streams-crdt/zstd

@loro-dev/streams-crdt/zstd is a built-in helper entry point backed by @bokuweb/zstd-wasm.

pnpm add @loro-dev/streams-crdt

Then wire the exported hooks directly into snapshotCodec:

import { LoroDoc } from "loro-crdt";
import {
  createLoroDocAdapter,
  createStreamUrl,
  StreamsCrdt,
} from "@loro-dev/streams-crdt/loro";
import {
  compress,
  decompress,
} from "@loro-dev/streams-crdt/zstd";

const doc = new LoroDoc();
const transport = new StreamsCrdt({
  streamUrl: createStreamUrl({
    bucketId: "my-bucket",
    streamId: "doc-1",
  }),
  adapter: createLoroDocAdapter(doc),
  snapshotCodec: { compress, decompress },
});

@loro-dev/streams-crdt/zstd exports:

  • compress(snapshot) — async; uses an extra Worker when available and falls back to the current thread otherwise
  • compressInWorker(snapshot) — async; worker-only compression helper
  • compressOnCurrentThread(snapshot) — async; current-thread compression helper
  • decompress(snapshot) — async; current-thread decompression helper

Automatic Snapshot Upload

The transport can upload snapshots automatically during join() when callers opt in with snapshotUpload. This feature is disabled by default — if you do not pass snapshotUpload, no snapshots will ever be uploaded.

const transport = new StreamsCrdt({
  streamUrl,
  adapter: createLoroDocAdapter(doc),
  // Enable automatic snapshot upload:
  snapshotUpload: {
    canUpload: async () => true,
    // debounceMs: 10_000,                 // default: 10 seconds
    // minBytesSinceRemoteSnapshot: 102400, // default: 100 KB
  },
});

Configuration

| Option | Default | Description | | --- | --- | --- | | canUpload | (required) | Authorization gate called before each upload attempt. Return false to skip. | | debounceMs | 10_000 (10 s) | Debounce window after the last local write before attempting upload. | | minBytesSinceRemoteSnapshot | 102400 (100 KB) | Minimum byte delta between the last remote snapshot offset and the current stream tail. Upload is skipped when the delta is smaller. |

Behavior

  • Automatic snapshot upload is considered only during join(), never sync()
  • The feature is opt-in: callers must pass snapshotUpload with at least canUpload to enable it
  • Successful local appends while join() is active refresh a debounce window (default 10 s) for a possible snapshot upload check
  • If a remote apply advances the inferred remote version during that window, the transport treats that as another writer becoming active and skips that snapshot attempt
  • At debounce expiry, the transport may enqueue a best-effort maybeUploadSnapshot task onto the same serialized operation queue used for local append and remote apply
  • When that task executes, it fetches the latest remote snapshot offset, uses the latest confirmed local cursor.nextOffset as the candidate snapshot_offset, and uploads only if current_tail_offset - remote_snapshot_offset exceeds the configured byte threshold

Entry Points

| Import path | What it adds | | --- | --- | | @loro-dev/streams-crdt | Transport core, cursor stores, ID helpers, all shared types | | @loro-dev/streams-crdt/loro | Everything above + createLoroDocAdapter(doc) | | @loro-dev/streams-crdt/flock | Everything above + createFlockAdapter(flock) | | @loro-dev/streams-crdt/zstd | Snapshot compress / decompress hooks backed by @bokuweb/zstd-wasm |

Public API Reference

Root Entry Point

  • StreamsCrdt — transport runtime class
  • StreamsCrdtOptions — constructor options
  • CrdtAdapter — adapter contract interface
  • IsolatedCrdtAdapter — optional apply-only adapter used for safe 410 recovery
  • CrdtUpdateBatch — local update batch type
  • Result — Rust-style Ok | Err result
  • StreamsAuthProvider — auth callback type
  • TransportError — discriminated error union
  • TransportCreateStreamSuccess, TransportDeleteStreamSuccess, TransportSyncSuccess — result payloads
  • TransportJoinParamsjoin() options
  • TransportSubscription — active live subscription handle
  • TransportRoomStatus"joined" | "reconnecting" | "disconnected" | "error"
  • SnapshotCodec, SnapshotTransformHook — full snapshot encode/decode hooks
  • SnapshotUploadOptions — snapshot upload configuration
  • RemoteCursor — replay progress metadata
  • RemoteCursorStore — abstract cursor storage interface
  • RemoteCursorSaveSource"local" | "remote" | "bootstrap"
  • BeforeRemoteCursorSaveContext, BeforeRemoteCursorSaveHook — durability hook types
  • InMemoryRemoteCursorStore — ephemeral cursor store (default, always safe)
  • IndexedDbRemoteCursorStore — persistent cursor store (requires local CRDT persistence)
  • IndexedDbRemoteCursorStoreOptions — IndexedDB store config
  • createInitialRemoteCursor(...) — seed a cursor store
  • createStreamUrl(...) — build stream URL from bucket/stream IDs
  • isValidBucketId(...), isValidRillId(...) — ID validation helpers

Loro Entry Point

@loro-dev/streams-crdt/loro re-exports everything from the root plus:

  • createLoroDocAdapter(doc) — creates a CrdtAdapter for one LoroDoc

Flock Entry Point

@loro-dev/streams-crdt/flock re-exports everything from the root plus:

  • createFlockAdapter(flock) — creates a CrdtAdapter for one Flock replica
  • VersionVector, ExportBundle — re-exported from @loro-dev/flock-wasm

Zstd Entry Point

@loro-dev/streams-crdt/zstd exports:

  • compress(snapshot) — async snapshot compression helper
  • compressInWorker(snapshot) — async worker-only snapshot compression helper
  • compressOnCurrentThread(snapshot) — async current-thread snapshot compression helper
  • decompress(snapshot) — async snapshot decompression helper

Notes

  • createStreamIfMissing defaults to false and only affects the initial sync/join bootstrap
  • deleteStream() returns { deleted: false } when the target stream is already missing
  • sync() and join() return Result objects instead of implicitly creating missing streams
  • join() resolves only after the initial replay has succeeded; without a stored remote cursor it bootstraps once, then prefers ordinary live SSE and falls back sticky to long-poll
  • One transport instance maps to one DS stream
  • One adapter instance maps to one local CRDT instance
  • Built-in remote cursor stores implement delete(streamUrl) so deleteStream() can reset replay state cleanly

Repository-only Scripts

The published package only includes built runtime artifacts under dist/. For repository verification against the hosted backend, run:

  • pnpm --dir packages/streams-crdt run test:e2e:streams-api