npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

@loro-dev/streams-crdt

v0.11.1

Published

Transport/runtime layer for synchronizing CRDT state over Durable Streams.

Readme

@loro-dev/streams-crdt

Transport layer for synchronizing CRDT state over Loro Streams (Durable Streams).

Core Concepts

Two-class model

| Class | Responsibility | | --- | --- | | StreamsCrdt | Transport runtime bound to one stream URL. Handles auth, reconnect, bootstrap, SSE/long-poll, binary framing, and remote cursor persistence. | | CrdtAdapter | Binding for one local CRDT instance. Handles snapshot export/import, version tracking, local update export, remote update application, and optional isolated 410 recovery inference. |

Operations

| Method | Purpose | | --- | --- | | createStream() | Creates the stream on the server. Returns { created: true } on first call, { created: false } if it already exists. | | deleteStream() | Deletes the stream. Closes any active join() first. | | sync() | One-shot bootstrap/catch-up cycle. Syncs remote state into local CRDT and pushes any local changes to the server. Does not enter live mode. | | catchup() | Pull-only remote catch-up. Applies remote stream updates to the local CRDT but does not intentionally append new local updates. If live reads from join() are already healthy, returns immediately unless { force: true } is passed. | | join() | Initial sync plus live subscription. Internally performs the same sync as sync(), then enters a persistent SSE/long-poll read loop and starts forwarding local writes. No need to call sync() before join(). | | close() | Closes any active join and releases resources. If this instance was using appendWriteOnly(), that write-only path is also torn down and cannot be used again on the same instance. |

join() includes an initial sync. You do not need to call sync() before join(). When join() resolves, the local CRDT is already caught up with the server and the live subscription is active.

Error handling — Result, not exceptions

All transport operations return a Result<T, TransportError> instead of throwing. Callers must check .ok before accessing the value:

const result = await transport.join();
if (!result.ok) {
  // result.error is a TransportError with a discriminated `code` field.
  switch (result.error.code) {
    case "stream_not_found":
      // 404 — the stream doesn't exist yet. Create it first.
      break;
    case "auth_failed":
      // 401/403 from the server — refresh credentials and retry.
      break;
    case "auth_provider_error":
      // Your auth callback threw. The live loop will retry automatically.
      // For permanent logout, return undefined from the callback instead.
      break;
    case "server_error":
      // 5xx — transient backend failure. The live loop retries automatically;
      // for one-shot calls (sync/createStream/etc.) you decide whether to
      // retry. `retryAfterMs` honors the server's Retry-After hint.
      break;
    case "network_error":
    case "timeout":
      // Transient — safe to retry.
      break;
    case "protocol_error":
    case "internal_error":
      // Non-retryable: bad server response or local runtime/adapter failure.
      break;
    default:
      console.error("transport error:", result.error);
  }
  return;
}

const sub = result.value; // TransportSubscription

Every error carries a retryable flag so callers can decide whether to retry without hard-coding error codes:

if (!result.ok && result.error.retryable) {
  // safe to retry after a delay
}

Surfacing errors to end users

HTTP-derived variants (server_error, auth_failed, protocol_error, stream_not_found, gone) carry the response details so apps can show actionable messages without parsing the message string:

const err = result.error;
if (err.code === "server_error") {
  toast.error(`Server error ${err.status}${err.requestId ? ` (req: ${err.requestId})` : ""}`);
  console.error("backend body:", err.body);
  // err.retryAfterMs may suggest a backoff if the server set Retry-After.
}

| Code | status | message | body | Other | | ---------------------- | -------- | --------- | ------ | ------------------------------ | | stream_not_found | 404 | yes | yes | — | | auth_failed | 401/403 | yes | yes | — | | protocol_error | yes | yes | yes | — | | gone | 410 | yes | yes | — | | server_error | 5xx | yes | yes | requestId?, retryAfterMs? | | timeout | — | yes | — | phase: "connect" \| "poll" | | network_error | — | yes | — | — | | auth_provider_error | — | yes | — | cause? (original thrown value) | | internal_error | — | yes | — | — | | apply_incomplete | — | yes | — | reason: "loro_pending_updates" |

Operation-level retries vs. live-mode retries

  • sync(), catchup(), createStream(), deleteStream(), appendWriteOnly() return after a single attempt. The application is responsible for retrying retryable errors.
  • After join() succeeds, the live read/write loops automatically reconnect on every retryable: true error using exponential backoff ([0, 0.5s, 1s, 2s, 4s, 8s, 15s, 30s] with ±20% jitter; cap 30s). When a 5xx response includes Retry-After, the runtime waits at least that long. Non-retryable errors transition the subscription to status: "error"; call transport.rejoin() after fixing the cause (e.g., refreshing the auth token) to reset retry state and reconnect.

Why Result instead of throw? Transport failures (network errors, auth expiry, missing streams) are expected at runtime, not exceptional. Wrapping them in Result makes the error path explicit in the type system so callers cannot accidentally ignore failures with a missing try/catch.

Migration notes

For most callers nothing changes — if (!result.ok) { handle(result.error); } keeps working, and result.error.retryable is still the right gate for "safe to retry?".

Only adjust if any of these apply:

  • Your auth callback throws on permanent failure (e.g., user is logged out). Throwing is now treated as a transient error and retried in live mode. Return undefined instead so the server returns 401 and you get the terminal auth_failed. See "What if the auth callback throws?" below.
  • You construct TransportError literals (e.g., in tests or fixtures). stream_not_found now requires status: 404 + message; gone now requires status: 410. TypeScript will flag these.
  • You exhaustively switch on code with a : never default. Add a case for auth_provider_error (and optionally server_error if you handled 5xx via unknown/internal_error before).
  • You used "message" in error to defensively narrow. All variants now carry message; the guard is no longer needed.

Quick Start (Loro)

import { LoroDoc } from "loro-crdt";
import {
  createStreamUrl,
  createLoroDocAdapter,
  StreamsCrdt,
} from "@loro-dev/streams-crdt/loro";

const doc = new LoroDoc();
const transport = new StreamsCrdt({
  streamUrl: createStreamUrl({
    bucketId: "my-bucket",
    streamId: "doc-1",
  }),
  auth: async () => "<gateway-jwt>",
  adapter: createLoroDocAdapter(doc),
  // Enable automatic snapshot upload so new readers bootstrap faster.
  snapshotUpload: { canUpload: async () => true },
});

// Create the stream (idempotent).
const created = await transport.createStream();
if (!created.ok) throw created.error;

// Join enters live collaborative editing mode.
// This performs an initial sync automatically before entering live mode.
const joined = await transport.join({
  onStatusChange(status) {
    console.log("room status:", status);
  },
});
if (!joined.ok) throw joined.error;

const sub = joined.value;

// The doc is now syncing in real time.
// Call sub.unsubscribe() or transport.close() to leave.

createLoroDocAdapter(doc) requires an attached LoroDoc. If the document was checked out or detached, call checkoutToLatest() or attach() before passing it to StreamsCrdt.

Request shard origins

Browsers can still show head-of-line blocking symptoms when large bootstrap/catch-up downloads and normal app requests contend on the same connection. shardUrls lets the transport replace only the request origin for selected operations while preserving the stream path and query:

const transport = new StreamsCrdt({
  streamUrl,
  auth: async () => "<gateway-jwt>",
  adapter: createLoroDocAdapter(doc),
  shardUrls: {
    bootstrap: [
      "https://control-a.streams-api-proxy.loro.dev",
      "https://control-b.streams-api-proxy.loro.dev",
    ],
    catchup: [
      "https://control-a.streams-api-proxy.loro.dev",
      "https://control-b.streams-api-proxy.loro.dev",
    ],
    largePost: [
      "https://write-a.streams-api-proxy.loro.dev",
      "https://write-b.streams-api-proxy.loro.dev",
    ],
    largePostMinBytes: 64 * 1024,
  },
});

Shard entries must be origin URLs with no path, query, or hash. Each transport instance rotates through the configured origins per operation, with a randomized starting point so multiple tabs are less likely to all begin on the same shard. Bootstrap applies to GET <stream>/bootstrap; catch-up applies to non-SSE offset reads, including long-poll fallback; SSE live reads stay on the original streamUrl. largePost applies to non-empty append POST bodies at or above largePostMinBytes bytes. The default threshold is 0, so every non-empty append POST can use largePost when that pool is configured. Verify actual connection separation with browser NetLog: browsers may coalesce compatible HTTP/2 or HTTP/3 origins when DNS, certificate, and transport settings allow it.

Loro pending imports and cursor safety

Loro can accept an update whose dependencies are missing and keep it as a pending import. Pending imports are not represented in doc.version() and are not included in snapshots exported from that document. For that reason, the Loro adapter must not persist a RemoteCursor until all pending imports are resolved and the local document is complete at the cursor offset.

The transport may continue reading with an in-memory volatile cursor to find the missing dependencies. If the server reports up-to-date while Loro still has unresolved imports, the client must bootstrap again and only save the bootstrap cursor after the batch import is clean. Snapshot upload is skipped while Loro has unresolved imports.

See specs/loro-pending.md for the full behavior and implementation checklist.

Quick Start (Loro EphemeralStore)

Use a separate EphemeralStreamCrdt for presence-like Loro EphemeralStore data. It bootstraps through the first SSE event, posts local ephemeral updates, and never republishes state imported from other peers. If a client needs to keep its own presence alive, update that client's own keys with store.set() so the store emits a fresh local update.

Ephemeral live delivery is latest-state oriented. The server may keep only a tiny live fanout queue for current SSE subscribers and may disconnect slow subscribers instead of buffering every intermediate cursor or presence update. The client reconnects and receives a fresh bootstrap, so the local store converges to current state without requiring offset replay.

import { EphemeralStore } from "loro-crdt";
import {
  EphemeralStreamCrdt,
  EphemeralStoreAdaptor,
  createStreamUrl,
} from "@loro-dev/streams-crdt/loro";

const store = new EphemeralStore();
const docStreamUrl = createStreamUrl({
  bucketId: "my-bucket",
  streamId: "doc-1",
});

const presence = new EphemeralStreamCrdt({
  streamUrl: `${docStreamUrl}?ephemeral=presence`,
  auth: async () => "<gateway-jwt>",
  adaptor: EphemeralStoreAdaptor(store),
});

const joinedPresence = await presence.join();
if (!joinedPresence.ok) throw joinedPresence.error;

Ephemeral stream URL format

An ephemeral stream URL is the durable stream URL for the same document plus a channel query parameter:

{baseUrl}/ds/{bucketId}/{streamId}?ephemeral={channel}

For example, if the durable Flock document stream is:

https://streams-api.loro.dev/ds/my-bucket/flock-cm9vbS0x

then the matching presence stream is:

https://streams-api.loro.dev/ds/my-bucket/flock-cm9vbS0x?ephemeral=presence

Use the same bucketId and streamId as the durable StreamsCrdt room. The ephemeral value names a side-channel for that room, such as presence, cursor, or selection. Different channels under the same durable stream are separate in-memory rooms. The compatibility spelling ?awareness=presence is also accepted, but new code should use ?ephemeral=presence.

When an app exposes a room link such as /rooms/design-review-42, the link normally maps to one durable stream id first, and both durable and ephemeral sync derive from that same id:

const roomId = "design-review-42";
const durableStreamUrl = createStreamUrl({
  bucketId: "my-bucket",
  streamId: `flock-${roomId}`,
});

const presenceStreamUrl = new URL(durableStreamUrl);
presenceStreamUrl.searchParams.set("ephemeral", "presence");

const flockTransport = new StreamsCrdt({
  streamUrl: durableStreamUrl,
  auth,
  adapter: createFlockAdapter(flock),
});

const presenceTransport = new EphemeralStreamCrdt({
  streamUrl: presenceStreamUrl.toString(),
  auth,
  adaptor: EphemeralStoreAdaptor(store),
});

EphemeralStreamCrdt does not create or delete the durable stream. It posts updates to the ephemeral URL and opens SSE on the same URL with live=sse. Ephemeral state is not persisted with the Flock document stream; it is scoped by the URL path plus channel and is meant for current live state only.

Quick Start (Flock)

import { Flock } from "@loro-dev/flock-wasm";
import {
  createStreamUrl,
  createFlockAdapter,
  StreamsCrdt,
} from "@loro-dev/streams-crdt/flock";

const flock = new Flock();
const transport = new StreamsCrdt({
  streamUrl: createStreamUrl({
    bucketId: "my-bucket",
    streamId: "room-1",
  }),
  auth: async () => "<gateway-jwt>",
  adapter: createFlockAdapter(flock),
});

await transport.createStream();
const joined = await transport.join();
if (!joined.ok) throw joined.error;

const sub = joined.value;

When to Use catchup(), sync(), and join()

Use catchup() when you only want to pull remote updates into the local CRDT:

// Pull remote changes without uploading local pending changes.
const caughtUp = await transport.catchup();
if (!caughtUp.ok) throw caughtUp.error;
// caughtUp.value.cursor contains the replay progress after the pull.

catchup() is safe to call while a room is live. When join() has a healthy SSE/long-poll read loop, catchup() returns immediately because the live loop is already catching up continuously. Pass { force: true } to issue one explicit non-SSE offset read anyway, for example after app foreground or device wake:

await transport.catchup({ force: true });

If the live read side of the room is reconnecting, disconnected, or in error, catchup() skips the live backoff path and tries one immediate non-SSE catch-up read from the room's current cursor. A successful call updates the local cursor and nudges the live loops to reconnect, but it does not by itself guarantee that local pending writes have reached the server.

Use sync() when you need a one-shot bidirectional sync without entering live mode:

// Offline-first: pull remote changes, upload local changes, then work offline.
const synced = await transport.sync();
if (!synced.ok) throw synced.error;
// synced.value.cursor contains the replay progress after sync.

Use sync() or subscription.waitUntilSynced() when the caller needs local pending changes uploaded to the server.

Use join() for real-time collaboration. It does everything sync() does and then keeps the connection alive for continuous bidirectional sync:

const joined = await transport.join();

Delete Stream

const deleted = await transport.deleteStream();
if (!deleted.ok) throw deleted.error;

console.log(deleted.value.deleted); // true when deleted now, false when already missing

deleteStream() closes any active join first. When the configured remoteCursorStore implements delete(streamUrl) (the built-in stores do), the transport also clears the stored replay cursor for that stream.

Auth

auth may be:

  • a static string
  • a sync callback returning a string
  • an async callback returning a string

StreamsCrdt asks for a token before each request. If a request comes back with 401 or 403, it calls auth one more time with { reason: "unauthorized", status, previousToken } and retries that request once.

This lets the caller keep a cached token in the normal path, and only refresh after the server rejects it:

let token = readCachedGatewayJwt();

const transport = new StreamsCrdt({
  streamUrl,
  adapter,
  auth: async (context) => {
    if (context?.reason === "unauthorized") {
      token = await refreshGatewayJwt();
    }
    return token;
  },
});

If your callback already returns a fresh token every time, you can ignore the context argument.

What if the auth callback throws?

Throwing from the callback is treated as a transient failure. The runtime wraps the error as auth_provider_error (retryable: true) so a hiccupping auth backend doesn't terminate the live read/write loops — they retry under the same exponential backoff used for server_error and network_error.

For the "user is permanently logged out" case, return undefined (or null/empty string) instead of throwing. The request goes out without an Authorization header, the server returns 401, and you receive the non-retryable auth_failed instead — which the application typically routes to a sign-in page.

auth: async () => {
  try {
    return await fetchTokenFromAuthBackend(); // network → throw → retry
  } catch (e) {
    if (isPermanentLogoutError(e)) {
      return undefined; // → server 401 → auth_failed (terminal)
    }
    throw e; // → auth_provider_error (retryable)
  }
}

End-to-End Encryption (E2EE)

e2ee enables end-to-end encryption for CRDT update batches and snapshots before their bytes are appended to Durable Streams. Auth stays separate: gateway tokens authorize HTTP requests, while document keys protect document bytes from the server and unauthorized readers.

payloadProtection remains available as a deprecated alias for compatibility. Do not pass both e2ee and payloadProtection on the same StreamsCrdt instance.

When e2ee is omitted, streams-crdt keeps the current plaintext wire format. When it is present, the transport still uses application/octet-stream, but update batches and snapshots may carry an encrypted envelope inside the existing length-prefixed framing.

For the design rationale and wire format, see docs/plans/2026-04-09-streams-crdt-payload-protection-design.md.

const roomKey = new Uint8Array(32); // Load this from your own key store.
crypto.getRandomValues(roomKey);

const transport = new StreamsCrdt({
  streamUrl,
  adapter: createLoroDocAdapter(doc),
  e2ee: {
    // Defaults when e2ee is present:
    readPolicy: "encrypted-only",
    writePolicy: "encrypt",
    encryption: {
      // Stable application identity. Prefer bucket/stream identity over a full
      // gateway URL so proxy/host changes do not break decryption.
      scope: { bucketId: "my-bucket", streamId: "doc-1" },
      writeKey: { id: "room-key-v1", key: roomKey },
      // Optional. When omitted, writeKey is also the only read key.
      readKeys: async () => [{ id: "room-key-v1", key: roomKey }],
    },
  },
});

Encrypted streams still use application/octet-stream. The transport keeps a plaintext length-prefix frame around each encrypted envelope, so catch-up reads can concatenate several appends and clients can still recover payload boundaries.

E2EE Protocol Summary

E2EE wraps the existing transport framing; it does not replace it.

Encrypted update appends use this pipeline:

adapter.exportUpdates()
  -> encodeItems(clearUpdates)
  -> encrypt(kind = "update_batch")
  -> encodeItems([encryptedEnvelope])
  -> POST body

Encrypted snapshots use:

exportSnapshot -> snapshotCodec.compress -> encrypt(kind = "snapshot")
decrypt(kind = "snapshot") -> snapshotCodec.decompress -> applySnapshot

Envelope v1 is:

magic               4 bytes   "LSCE"
version             1 byte    0x01
kind                1 byte    0x01 update_batch, 0x02 snapshot
suite               1 byte    0x01 aes_256_gcm
key_id_len          1 byte
nonce_len           1 byte    expected 12
reserved            1 byte    0
key_id              key_id_len bytes, UTF-8
nonce               nonce_len bytes
ciphertext_and_tag  remaining bytes

AEAD additional authenticated data (AAD) is constructed from stable local context:

{
  "protocol": "loro-streams-crdt-payload-protection",
  "version": 1,
  "kind": "update_batch",
  "suite": "aes_256_gcm",
  "keyId": "room-key-v1",
  "scope": { "bucketId": "my-bucket", "streamId": "doc-1" }
}

AAD is not stored as a separate field on the wire. Parts of it live in the envelope header; scope comes from the local e2ee.encryption.scope config. Keep scope stable and avoid full gateway URLs, otherwise old encrypted history may become unreadable after host or proxy changes.

Mode Matrix

e2ee supports a few distinct shapes. Choose one explicitly:

| Use case | readPolicy | writePolicy | Required encryption config | Remote plaintext accepted? | Local writes sent as | | --- | --- | --- | --- | --- | --- | | Existing plaintext stream | omitted | omitted | none | Yes | plaintext | | Private reader/writer | encrypted-only (default) | encrypt (default) | scope + writeKey | No | encrypted | | Mixed private reader/writer | allow-plaintext | encrypt | scope + writeKey | Yes | encrypted | | Public write-only writer | allow-plaintext | plaintext | none | Yes | plaintext via appendWriteOnly() | | Encrypted reader without a write key | encrypted-only | plaintext | scope + readKeys | No | plaintext if local writes are exported |

There is no separate transport-level "read-only mode" in v1. The last row is only appropriate when the local CRDT is treated as read-only or the caller's auth rejects writes. writePolicy: "plaintext" avoids requiring a write key; it does not suppress local export attempts by itself.

Adapter Compatibility: Loro and Flock

e2ee is adapter-agnostic at the transport layer: the same options shape works with createLoroDocAdapter(doc) and createFlockAdapter(flock). Encryption wraps the CRDT payload bytes that the adapter already exports.

That also means one stream must use exactly one CRDT family:

  • A stream written with the Loro adapter must be read with the Loro adapter.
  • A stream written with the Flock adapter must be read with the Flock adapter.
  • Do not mix Loro and Flock replicas against the same stream, whether payload protection is enabled or not. The decrypted inner update bytes and snapshots are CRDT-specific and are not cross-compatible.

Protected Loro room:

import { LoroDoc } from "loro-crdt";
import {
  createLoroDocAdapter,
  StreamsCrdt,
} from "@loro-dev/streams-crdt/loro";

const doc = new LoroDoc();
const loroTransport = new StreamsCrdt({
  streamUrl,
  adapter: createLoroDocAdapter(doc),
  e2ee: {
    encryption: {
      scope: { bucketId: "my-bucket", streamId: "doc-1" },
      writeKey: { id: "room-key-v1", key: roomKey },
    },
  },
});

Protected Flock room:

import { Flock } from "@loro-dev/flock-wasm";
import {
  createFlockAdapter,
  StreamsCrdt,
} from "@loro-dev/streams-crdt/flock";

const flock = new Flock();
const flockTransport = new StreamsCrdt({
  streamUrl,
  adapter: createFlockAdapter(flock),
  e2ee: {
    encryption: {
      scope: { bucketId: "my-bucket", streamId: "room-1" },
      writeKey: { id: "room-key-v1", key: roomKey },
    },
  },
});

If one application stack uses Loro and another uses Flock, give them separate streams. Reusing the same key material across those streams is an application decision; it does not make the CRDT payloads interoperable.

Private E2EE Room

This is the default protected mode: all retained updates and snapshots are encrypted, and readers reject any plaintext payload that appears in the stream.

const privateReaderWriter = new StreamsCrdt({
  streamUrl,
  adapter: createLoroDocAdapter(doc),
  e2ee: {
    encryption: {
      scope: { bucketId: "my-bucket", streamId: "doc-1" },
      writeKey: { id: "private-v1", key: roomKey },
      // Optional. Keep old keys here during rotation.
      readKeys: async () => [{ id: "private-v1", key: roomKey }],
    },
  },
});

Mixed Plaintext and Encrypted Updates

Some applications intentionally allow public writers to append CRDT updates but not read private history. Configure readers explicitly before accepting those plaintext updates:

const privateReaderWriter = new StreamsCrdt({
  streamUrl,
  adapter: createLoroDocAdapter(doc),
  e2ee: {
    readPolicy: "allow-plaintext",
    writePolicy: "encrypt",
    encryption: {
      scope: { bucketId: "my-bucket", streamId: "doc-1" },
      writeKey: { id: "private-v1", key: roomKey },
    },
  },
});

Plaintext writers should opt in just as explicitly:

const publicWriter = new StreamsCrdt({
  streamUrl,
  adapter: createLoroDocAdapter(publicDoc),
  auth: async () => getWriteOnlyToken(),
  e2ee: {
    readPolicy: "allow-plaintext",
    writePolicy: "plaintext",
  },
});

publicDoc.getText("comments").insert(0, "public note");
const appended = await publicWriter.appendWriteOnly();

Behavior and Validation Rules

  • If e2ee is present and no policies are specified, the defaults are readPolicy: "encrypted-only" and writePolicy: "encrypt".
  • writePolicy: "encrypt" requires e2ee.encryption.scope and e2ee.encryption.writeKey.
  • Any client that needs to decrypt encrypted payloads must provide e2ee.encryption.scope plus at least one read key. If readKeys is omitted, the current writeKey is also used as the only read key.
  • payloadProtection is a deprecated alias for e2ee. Passing both is an error.
  • appendWriteOnly() is the write-only path. It only posts local CRDT batches that this transport observed after construction. It does not bootstrap, catch up, join SSE, upload snapshots, apply remote updates, or persist a remote cursor.
  • appendWriteOnly() must not be mixed with sync() or join() on the same transport instance.
  • After close(), appendWriteOnly() is permanently unavailable on that instance because the write-only subscription has been released. Create a new StreamsCrdt instance for later write-only use.
  • Snapshot uploads are encrypted when writePolicy is "encrypt". Snapshot upload is rejected when writePolicy is "plaintext"; a public writer that cannot read private state should not publish full-document snapshots.
  • Protected read failures stop before applySnapshot(), applyRemoteUpdates(), local durability hooks, or remote cursor persistence.
  • Writers always encrypt with the current writeKey. Keep old keys in readKeys until retained encrypted history and encrypted snapshots no longer reference them.
  • Encryption adds envelope overhead. The transport enforces the 256 KB batch cap against the encoded on-wire bytes, so a batch near the limit may split earlier under encryption than it would in plaintext mode.

If e2ee is configured and readPolicy is left at the default "encrypted-only", remote plaintext updates fail with code: "payload_protection_error" and reason: "plaintext_forbidden".

E2EE Errors

E2EE-related transport failures surface as code: "payload_protection_error" with one of these reasons:

| Reason | Meaning | | --- | --- | | plaintext_forbidden | A reader configured as encrypted-only saw plaintext data in the stream. | | missing_read_key | The envelope keyId does not exist in the configured readKeys. | | decrypt_failed | AES-GCM authentication failed. This usually means the wrong key, wrong scope, or tampered/corrupted bytes. | | invalid_envelope | The payload was not a supported streams-crdt encrypted envelope. | | wrong_payload_kind | An encrypted snapshot was used where an update batch was expected, or the reverse. | | encrypt_failed | Local write-side encryption could not proceed because the key/config/input was invalid. |

Callers usually treat these as operator or configuration errors, not transient network failures. They are always returned as retryable: false.

E2EE does not hide stream URLs, offsets, payload sizes, timing or auth metadata, and it does not detect a malicious server rolling back or forking history.

Remote Cursor Store

The RemoteCursorStore persists replay progress metadata only — it does not store any CRDT update data. A remote cursor contains:

  • nextOffset — where the transport should resume reading from
  • serverLowerBoundVersion — the CRDT version that the server data represents
  • streamUrl — identity of the stream

Important: cursor-only persistence is unsafe

If RemoteCursorStore.load() returns a saved cursor, the transport resumes from nextOffset and assumes the caller has already restored the matching local CRDT state represented by serverLowerBoundVersion.

If the local CRDT was reset to empty but the cursor was persisted, the transport will do delta-only catch-up from nextOffset — applying those deltas against an empty document will fail or produce an invalid state.

Rule of thumb

| Local CRDT state | Cursor store | Safe? | | --- | --- | --- | | Not persisted (ephemeral) | InMemoryRemoteCursorStore (default) | Yes | | Persisted (IndexedDB, SQLite, etc.) | IndexedDbRemoteCursorStore + beforeRemoteCursorSave hook | Yes | | Not persisted | IndexedDbRemoteCursorStore | No — data loss risk |

If you do not persist the local CRDT payload across reloads, keep the remote cursor ephemeral too. The default InMemoryRemoteCursorStore is the safe choice.

Hooking Local Durability

Use beforeRemoteCursorSave to couple local payload persistence with remote cursor advancement. The transport advances the cursor only after:

  1. Remote data was applied locally
  2. The durability hook succeeded
  3. The cursor store write succeeded
const transport = new StreamsCrdt({
  streamUrl: createStreamUrl({
    bucketId: "my-bucket",
    streamId: "doc-1",
  }),
  adapter: createLoroDocAdapter(doc),
  beforeRemoteCursorSave: async ({ cursor, source }) => {
    // Persist local CRDT state before cursor advances.
    // `source` is "local" | "remote" | "bootstrap".
    await persistLocalPayload(doc);
    console.log(source, cursor.nextOffset);
  },
});

persistLocalPayload(doc) must make the local durable state correspond to the cursor that is about to be stored. On the next app start, restore that local state before calling sync() or join().

Persistent Cursor Example

Only use a persisted RemoteCursorStore such as IndexedDbRemoteCursorStore when you also persist and restore the local CRDT state for the same stream.

import { LoroDoc } from "loro-crdt";
import {
  createStreamUrl,
  createLoroDocAdapter,
  IndexedDbRemoteCursorStore,
  StreamsCrdt,
} from "@loro-dev/streams-crdt/loro";

// Step 1: restore local CRDT state first.
const doc = (await restorePersistedDoc("doc-1")) ?? new LoroDoc();

const transport = new StreamsCrdt({
  streamUrl: createStreamUrl({
    bucketId: "my-bucket",
    streamId: "doc-1",
  }),
  auth: async () => "<gateway-jwt>",
  adapter: createLoroDocAdapter(doc),
  // Step 2: use a persistent cursor store.
  remoteCursorStore: new IndexedDbRemoteCursorStore({
    dbName: "my-app-ds-cursors",
  }),
  // Step 3: persist local state before cursor advances.
  beforeRemoteCursorSave: async () => {
    await persistLocalPayload("doc-1", doc);
  },
});

// join() handles initial sync + live mode.
const joined = await transport.join();

The required ordering is:

  1. Restore the local CRDT state first
  2. Let the transport apply remote updates
  3. Persist the updated local CRDT state (via beforeRemoteCursorSave)
  4. Only then persist the remote cursor

If you cannot provide steps 1 and 3, do not use a persisted remote cursor store.

Snapshot Codec Hooks

Use snapshotCodec when you want transport to transform full snapshots during upload and bootstrap:

const transport = new StreamsCrdt({
  streamUrl,
  adapter: createLoroDocAdapter(doc),
  snapshotCodec: {
    // Sync or async are both allowed.
    compress: async (snapshot) => snapshot,
    decompress: (snapshot) => snapshot,
  },
});

Behavior

  • snapshotCodec.compress(snapshot) runs right before a snapshot is uploaded to Loro Streams
  • snapshotCodec.decompress(snapshot) runs right after a non-empty snapshot is downloaded from Loro Streams and right before adapter.applySnapshot(...)
  • Both hooks must be provided together
  • Both hooks may return either Uint8Array or Promise<Uint8Array>
  • These hooks affect full snapshots only. Incremental update batches are unchanged

Compatibility

Transport does not version or negotiate snapshot codecs for you.

If you enable snapshotCodec, you must make sure decompress can still read any older snapshots that are already stored for that stream. This is especially important when:

  • existing snapshots were uploaded without compression
  • you change the compression format later
  • you roll out the codec gradually across clients

@loro-dev/streams-crdt/zstd

@loro-dev/streams-crdt/zstd is a built-in helper entry point backed by @bokuweb/zstd-wasm.

pnpm add @loro-dev/streams-crdt

Then wire the exported hooks directly into snapshotCodec:

import { LoroDoc } from "loro-crdt";
import {
  createLoroDocAdapter,
  createStreamUrl,
  StreamsCrdt,
} from "@loro-dev/streams-crdt/loro";
import {
  compress,
  decompress,
} from "@loro-dev/streams-crdt/zstd";

const doc = new LoroDoc();
const transport = new StreamsCrdt({
  streamUrl: createStreamUrl({
    bucketId: "my-bucket",
    streamId: "doc-1",
  }),
  adapter: createLoroDocAdapter(doc),
  snapshotCodec: { compress, decompress },
});

@loro-dev/streams-crdt/zstd exports:

  • compress(snapshot) — async; uses an extra Worker when available and falls back to the current thread otherwise
  • compressInWorker(snapshot) — async; worker-only compression helper
  • compressOnCurrentThread(snapshot) — async; current-thread compression helper
  • decompress(snapshot) — async; current-thread decompression helper

Automatic Snapshot Upload

The transport can upload snapshots automatically during join() when callers opt in with snapshotUpload. This feature is disabled by default — if you do not pass snapshotUpload, no snapshots will ever be uploaded.

const transport = new StreamsCrdt({
  streamUrl,
  adapter: createLoroDocAdapter(doc),
  // Enable automatic snapshot upload:
  snapshotUpload: {
    canUpload: async () => true,
    // debounceMs: 10_000,                 // default: 10 seconds
    // minBytesSinceRemoteSnapshot: 102400, // default: 100 KB
  },
});

Configuration

| Option | Default | Description | | --- | --- | --- | | canUpload | (required) | Authorization gate called before each upload attempt. Return false to skip. | | debounceMs | 10_000 (10 s) | Debounce window after the last local write before attempting upload. | | minBytesSinceRemoteSnapshot | 102400 (100 KB) | Minimum byte delta between the last remote snapshot offset and the current stream tail. Upload is skipped when the delta is smaller. |

Behavior

  • Automatic snapshot upload is considered only during join(), never sync()
  • The feature is opt-in: callers must pass snapshotUpload with at least canUpload to enable it
  • Successful local appends while join() is active refresh a debounce window (default 10 s) for a possible snapshot upload check
  • If a remote apply advances the inferred remote version during that window, the transport treats that as another writer becoming active and skips that snapshot attempt
  • At debounce expiry, the transport may enqueue a best-effort maybeUploadSnapshot task onto the same serialized operation queue used for local append and remote apply
  • When that task executes, it fetches the latest remote snapshot offset, uses the latest confirmed local cursor.nextOffset as the candidate snapshot_offset, and uploads only if current_tail_offset - remote_snapshot_offset exceeds the configured byte threshold

Entry Points

| Import path | What it adds | | --- | --- | | @loro-dev/streams-crdt | Transport core, cursor stores, ID helpers, all shared types | | @loro-dev/streams-crdt/loro | Everything above + createLoroDocAdapter(doc) | | @loro-dev/streams-crdt/flock | Everything above + createFlockAdapter(flock) | | @loro-dev/streams-crdt/zstd | Snapshot compress / decompress hooks backed by @bokuweb/zstd-wasm |

Public API Reference

Root Entry Point

  • StreamsCrdt — transport runtime class
  • StreamsCrdtOptions — constructor options
  • CrdtAdapter — adapter contract interface
  • IsolatedCrdtAdapter — optional apply-only adapter used for safe 410 recovery
  • CrdtUpdateBatch — local update batch type
  • CrdtApplyOutcome, CrdtApplyReturn, CrdtUnresolvedSpan — adapter apply result types used to report incomplete imports such as Loro pending updates
  • Result — Rust-style Ok | Err result
  • StreamsAuthProvider — auth callback type
  • TransportError — discriminated error union apply_incomplete is returned when remote data was read but the CRDT adapter still has unresolved dependencies and the cursor cannot be saved.
  • TransportCatchupParamscatchup() options
  • TransportCreateStreamSuccess, TransportDeleteStreamSuccess, TransportCatchupSuccess, TransportSyncSuccess — result payloads
  • TransportJoinParamsjoin() options
  • TransportSubscription — active live subscription handle
  • TransportRoomStatus"joined" | "reconnecting" | "disconnected" | "error"
  • SnapshotCodec, SnapshotTransformHook — full snapshot encode/decode hooks
  • WriteOnlyAppendResult — result returned by appendWriteOnly()
  • E2eeError — local error class thrown by low-level E2EE helpers before conversion into TransportError
  • E2eeOptions, E2eeKey, E2eeScope, E2eeEncryptionOptions — E2EE configuration types
  • PayloadProtectionError, PayloadProtectionOptions, PayloadProtectionKey, PayloadProtectionScope — deprecated compatibility aliases
  • SnapshotUploadOptions — snapshot upload configuration
  • RemoteCursor — replay progress metadata
  • RemoteCursorStore — abstract cursor storage interface
  • RemoteCursorSaveSource"local" | "remote" | "bootstrap"
  • BeforeRemoteCursorSaveContext, BeforeRemoteCursorSaveHook — durability hook types
  • InMemoryRemoteCursorStore — ephemeral cursor store (default, always safe)
  • IndexedDbRemoteCursorStore — persistent cursor store (requires local CRDT persistence)
  • IndexedDbRemoteCursorStoreOptions — IndexedDB store config
  • createInitialRemoteCursor(...) — seed a cursor store
  • createStreamUrl(...) — build stream URL from bucket/stream IDs
  • isValidBucketId(...), isValidRillId(...) — ID validation helpers

Loro Entry Point

@loro-dev/streams-crdt/loro re-exports everything from the root plus:

  • createLoroDocAdapter(doc) — creates a CrdtAdapter for one LoroDoc

Flock Entry Point

@loro-dev/streams-crdt/flock re-exports everything from the root plus:

  • createFlockAdapter(flock) — creates a CrdtAdapter for one Flock replica
  • VersionVector, ExportBundle — re-exported from @loro-dev/flock-wasm

Zstd Entry Point

@loro-dev/streams-crdt/zstd exports:

  • compress(snapshot) — async snapshot compression helper
  • compressInWorker(snapshot) — async worker-only snapshot compression helper
  • compressOnCurrentThread(snapshot) — async current-thread snapshot compression helper
  • decompress(snapshot) — async snapshot decompression helper

Notes

  • createStreamIfMissing defaults to false and only affects the initial sync/join bootstrap
  • deleteStream() returns { deleted: false } when the target stream is already missing
  • sync() and join() return Result objects instead of implicitly creating missing streams
  • join() resolves only after the initial replay has succeeded; without a stored remote cursor it bootstraps once, then prefers ordinary live SSE and falls back sticky to long-poll
  • One transport instance maps to one DS stream
  • One adapter instance maps to one local CRDT instance
  • Built-in remote cursor stores implement delete(streamUrl) so deleteStream() can reset replay state cleanly

Repository-only Scripts

The published package only includes built runtime artifacts under dist/. For repository verification against the hosted backend, run:

  • pnpm --dir packages/streams-crdt run test:e2e:streams-api