@loro-dev/streams-crdt
v0.11.1
Published
Transport/runtime layer for synchronizing CRDT state over Durable Streams.
Readme
@loro-dev/streams-crdt
Transport layer for synchronizing CRDT state over Loro Streams (Durable Streams).
Core Concepts
Two-class model
| Class | Responsibility |
| --- | --- |
| StreamsCrdt | Transport runtime bound to one stream URL. Handles auth, reconnect, bootstrap, SSE/long-poll, binary framing, and remote cursor persistence. |
| CrdtAdapter | Binding for one local CRDT instance. Handles snapshot export/import, version tracking, local update export, remote update application, and optional isolated 410 recovery inference. |
Operations
| Method | Purpose |
| --- | --- |
| createStream() | Creates the stream on the server. Returns { created: true } on first call, { created: false } if it already exists. |
| deleteStream() | Deletes the stream. Closes any active join() first. |
| sync() | One-shot bootstrap/catch-up cycle. Syncs remote state into local CRDT and pushes any local changes to the server. Does not enter live mode. |
| catchup() | Pull-only remote catch-up. Applies remote stream updates to the local CRDT but does not intentionally append new local updates. If live reads from join() are already healthy, returns immediately unless { force: true } is passed. |
| join() | Initial sync plus live subscription. Internally performs the same sync as sync(), then enters a persistent SSE/long-poll read loop and starts forwarding local writes. No need to call sync() before join(). |
| close() | Closes any active join and releases resources. If this instance was using appendWriteOnly(), that write-only path is also torn down and cannot be used again on the same instance. |
join()includes an initial sync. You do not need to callsync()beforejoin(). Whenjoin()resolves, the local CRDT is already caught up with the server and the live subscription is active.
Error handling — Result, not exceptions
All transport operations return a Result<T, TransportError> instead of
throwing. Callers must check .ok before accessing the value:
const result = await transport.join();
if (!result.ok) {
// result.error is a TransportError with a discriminated `code` field.
switch (result.error.code) {
case "stream_not_found":
// 404 — the stream doesn't exist yet. Create it first.
break;
case "auth_failed":
// 401/403 from the server — refresh credentials and retry.
break;
case "auth_provider_error":
// Your auth callback threw. The live loop will retry automatically.
// For permanent logout, return undefined from the callback instead.
break;
case "server_error":
// 5xx — transient backend failure. The live loop retries automatically;
// for one-shot calls (sync/createStream/etc.) you decide whether to
// retry. `retryAfterMs` honors the server's Retry-After hint.
break;
case "network_error":
case "timeout":
// Transient — safe to retry.
break;
case "protocol_error":
case "internal_error":
// Non-retryable: bad server response or local runtime/adapter failure.
break;
default:
console.error("transport error:", result.error);
}
return;
}
const sub = result.value; // TransportSubscriptionEvery error carries a retryable flag so callers can decide whether to retry
without hard-coding error codes:
if (!result.ok && result.error.retryable) {
// safe to retry after a delay
}Surfacing errors to end users
HTTP-derived variants (server_error, auth_failed, protocol_error,
stream_not_found, gone) carry the response details so apps can show
actionable messages without parsing the message string:
const err = result.error;
if (err.code === "server_error") {
toast.error(`Server error ${err.status}${err.requestId ? ` (req: ${err.requestId})` : ""}`);
console.error("backend body:", err.body);
// err.retryAfterMs may suggest a backoff if the server set Retry-After.
}| Code | status | message | body | Other |
| ---------------------- | -------- | --------- | ------ | ------------------------------ |
| stream_not_found | 404 | yes | yes | — |
| auth_failed | 401/403 | yes | yes | — |
| protocol_error | yes | yes | yes | — |
| gone | 410 | yes | yes | — |
| server_error | 5xx | yes | yes | requestId?, retryAfterMs? |
| timeout | — | yes | — | phase: "connect" \| "poll" |
| network_error | — | yes | — | — |
| auth_provider_error | — | yes | — | cause? (original thrown value) |
| internal_error | — | yes | — | — |
| apply_incomplete | — | yes | — | reason: "loro_pending_updates" |
Operation-level retries vs. live-mode retries
sync(),catchup(),createStream(),deleteStream(),appendWriteOnly()return after a single attempt. The application is responsible for retryingretryableerrors.- After
join()succeeds, the live read/write loops automatically reconnect on everyretryable: trueerror using exponential backoff ([0, 0.5s, 1s, 2s, 4s, 8s, 15s, 30s]with ±20% jitter; cap 30s). When a 5xx response includesRetry-After, the runtime waits at least that long. Non-retryable errors transition the subscription tostatus: "error"; calltransport.rejoin()after fixing the cause (e.g., refreshing the auth token) to reset retry state and reconnect.
Why
Resultinstead ofthrow? Transport failures (network errors, auth expiry, missing streams) are expected at runtime, not exceptional. Wrapping them inResultmakes the error path explicit in the type system so callers cannot accidentally ignore failures with a missingtry/catch.
Migration notes
For most callers nothing changes — if (!result.ok) { handle(result.error); }
keeps working, and result.error.retryable is still the right gate for
"safe to retry?".
Only adjust if any of these apply:
- Your auth callback throws on permanent failure (e.g., user is logged
out). Throwing is now treated as a transient error and retried in live
mode. Return
undefinedinstead so the server returns 401 and you get the terminalauth_failed. See "What if the auth callback throws?" below. - You construct
TransportErrorliterals (e.g., in tests or fixtures).stream_not_foundnow requiresstatus: 404+message;gonenow requiresstatus: 410. TypeScript will flag these. - You exhaustively switch on
codewith a: neverdefault. Add a case forauth_provider_error(and optionallyserver_errorif you handled 5xx viaunknown/internal_errorbefore). - You used
"message" in errorto defensively narrow. All variants now carrymessage; the guard is no longer needed.
Quick Start (Loro)
import { LoroDoc } from "loro-crdt";
import {
createStreamUrl,
createLoroDocAdapter,
StreamsCrdt,
} from "@loro-dev/streams-crdt/loro";
const doc = new LoroDoc();
const transport = new StreamsCrdt({
streamUrl: createStreamUrl({
bucketId: "my-bucket",
streamId: "doc-1",
}),
auth: async () => "<gateway-jwt>",
adapter: createLoroDocAdapter(doc),
// Enable automatic snapshot upload so new readers bootstrap faster.
snapshotUpload: { canUpload: async () => true },
});
// Create the stream (idempotent).
const created = await transport.createStream();
if (!created.ok) throw created.error;
// Join enters live collaborative editing mode.
// This performs an initial sync automatically before entering live mode.
const joined = await transport.join({
onStatusChange(status) {
console.log("room status:", status);
},
});
if (!joined.ok) throw joined.error;
const sub = joined.value;
// The doc is now syncing in real time.
// Call sub.unsubscribe() or transport.close() to leave.createLoroDocAdapter(doc) requires an attached LoroDoc. If the document was
checked out or detached, call checkoutToLatest() or attach() before passing
it to StreamsCrdt.
Request shard origins
Browsers can still show head-of-line blocking symptoms when large
bootstrap/catch-up downloads and normal app requests contend on the same
connection. shardUrls lets the transport replace only the request origin for
selected operations while preserving the stream path and query:
const transport = new StreamsCrdt({
streamUrl,
auth: async () => "<gateway-jwt>",
adapter: createLoroDocAdapter(doc),
shardUrls: {
bootstrap: [
"https://control-a.streams-api-proxy.loro.dev",
"https://control-b.streams-api-proxy.loro.dev",
],
catchup: [
"https://control-a.streams-api-proxy.loro.dev",
"https://control-b.streams-api-proxy.loro.dev",
],
largePost: [
"https://write-a.streams-api-proxy.loro.dev",
"https://write-b.streams-api-proxy.loro.dev",
],
largePostMinBytes: 64 * 1024,
},
});Shard entries must be origin URLs with no path, query, or hash. Each transport
instance rotates through the configured origins per operation, with a randomized
starting point so multiple tabs are less likely to all begin on the same shard.
Bootstrap applies to GET <stream>/bootstrap; catch-up applies to non-SSE
offset reads, including long-poll fallback; SSE live reads stay on the original
streamUrl. largePost applies to non-empty append POST bodies at or above
largePostMinBytes bytes. The default threshold is 0, so every non-empty
append POST can use largePost when that pool is configured. Verify actual
connection separation with browser NetLog: browsers may coalesce compatible
HTTP/2 or HTTP/3 origins when DNS, certificate, and transport settings allow it.
Loro pending imports and cursor safety
Loro can accept an update whose dependencies are missing and keep it as a
pending import. Pending imports are not represented in doc.version() and are
not included in snapshots exported from that document. For that reason, the Loro
adapter must not persist a RemoteCursor until all pending imports are resolved
and the local document is complete at the cursor offset.
The transport may continue reading with an in-memory volatile cursor to find the missing dependencies. If the server reports up-to-date while Loro still has unresolved imports, the client must bootstrap again and only save the bootstrap cursor after the batch import is clean. Snapshot upload is skipped while Loro has unresolved imports.
See specs/loro-pending.md for the full behavior
and implementation checklist.
Quick Start (Loro EphemeralStore)
Use a separate EphemeralStreamCrdt for presence-like Loro EphemeralStore
data. It bootstraps through the first SSE event, posts local ephemeral updates,
and never republishes state imported from other peers. If a client needs to keep
its own presence alive, update that client's own keys with store.set() so the
store emits a fresh local update.
Ephemeral live delivery is latest-state oriented. The server may keep only a tiny live fanout queue for current SSE subscribers and may disconnect slow subscribers instead of buffering every intermediate cursor or presence update. The client reconnects and receives a fresh bootstrap, so the local store converges to current state without requiring offset replay.
import { EphemeralStore } from "loro-crdt";
import {
EphemeralStreamCrdt,
EphemeralStoreAdaptor,
createStreamUrl,
} from "@loro-dev/streams-crdt/loro";
const store = new EphemeralStore();
const docStreamUrl = createStreamUrl({
bucketId: "my-bucket",
streamId: "doc-1",
});
const presence = new EphemeralStreamCrdt({
streamUrl: `${docStreamUrl}?ephemeral=presence`,
auth: async () => "<gateway-jwt>",
adaptor: EphemeralStoreAdaptor(store),
});
const joinedPresence = await presence.join();
if (!joinedPresence.ok) throw joinedPresence.error;Ephemeral stream URL format
An ephemeral stream URL is the durable stream URL for the same document plus a channel query parameter:
{baseUrl}/ds/{bucketId}/{streamId}?ephemeral={channel}For example, if the durable Flock document stream is:
https://streams-api.loro.dev/ds/my-bucket/flock-cm9vbS0xthen the matching presence stream is:
https://streams-api.loro.dev/ds/my-bucket/flock-cm9vbS0x?ephemeral=presenceUse the same bucketId and streamId as the durable StreamsCrdt room. The
ephemeral value names a side-channel for that room, such as presence,
cursor, or selection. Different channels under the same durable stream are
separate in-memory rooms. The compatibility spelling ?awareness=presence is
also accepted, but new code should use ?ephemeral=presence.
When an app exposes a room link such as /rooms/design-review-42, the link
normally maps to one durable stream id first, and both durable and ephemeral
sync derive from that same id:
const roomId = "design-review-42";
const durableStreamUrl = createStreamUrl({
bucketId: "my-bucket",
streamId: `flock-${roomId}`,
});
const presenceStreamUrl = new URL(durableStreamUrl);
presenceStreamUrl.searchParams.set("ephemeral", "presence");
const flockTransport = new StreamsCrdt({
streamUrl: durableStreamUrl,
auth,
adapter: createFlockAdapter(flock),
});
const presenceTransport = new EphemeralStreamCrdt({
streamUrl: presenceStreamUrl.toString(),
auth,
adaptor: EphemeralStoreAdaptor(store),
});EphemeralStreamCrdt does not create or delete the durable stream. It posts
updates to the ephemeral URL and opens SSE on the same URL with live=sse.
Ephemeral state is not persisted with the Flock document stream; it is scoped by
the URL path plus channel and is meant for current live state only.
Quick Start (Flock)
import { Flock } from "@loro-dev/flock-wasm";
import {
createStreamUrl,
createFlockAdapter,
StreamsCrdt,
} from "@loro-dev/streams-crdt/flock";
const flock = new Flock();
const transport = new StreamsCrdt({
streamUrl: createStreamUrl({
bucketId: "my-bucket",
streamId: "room-1",
}),
auth: async () => "<gateway-jwt>",
adapter: createFlockAdapter(flock),
});
await transport.createStream();
const joined = await transport.join();
if (!joined.ok) throw joined.error;
const sub = joined.value;When to Use catchup(), sync(), and join()
Use catchup() when you only want to pull remote updates into the local
CRDT:
// Pull remote changes without uploading local pending changes.
const caughtUp = await transport.catchup();
if (!caughtUp.ok) throw caughtUp.error;
// caughtUp.value.cursor contains the replay progress after the pull.catchup() is safe to call while a room is live. When join() has a healthy
SSE/long-poll read loop, catchup() returns immediately because the live loop
is already catching up continuously. Pass { force: true } to issue one
explicit non-SSE offset read anyway, for example after app foreground or device
wake:
await transport.catchup({ force: true });If the live read side of the room is reconnecting, disconnected, or in error,
catchup() skips the live backoff path and tries one immediate non-SSE
catch-up read from the room's current cursor. A successful call updates the
local cursor and nudges the live loops to reconnect, but it does not by itself
guarantee that local pending writes have reached the server.
Use sync() when you need a one-shot bidirectional sync without entering
live mode:
// Offline-first: pull remote changes, upload local changes, then work offline.
const synced = await transport.sync();
if (!synced.ok) throw synced.error;
// synced.value.cursor contains the replay progress after sync.Use sync() or subscription.waitUntilSynced() when the caller needs local
pending changes uploaded to the server.
Use join() for real-time collaboration. It does everything sync() does
and then keeps the connection alive for continuous bidirectional sync:
const joined = await transport.join();Delete Stream
const deleted = await transport.deleteStream();
if (!deleted.ok) throw deleted.error;
console.log(deleted.value.deleted); // true when deleted now, false when already missingdeleteStream() closes any active join first. When the configured
remoteCursorStore implements delete(streamUrl) (the built-in stores do),
the transport also clears the stored replay cursor for that stream.
Auth
auth may be:
- a static string
- a sync callback returning a string
- an async callback returning a string
StreamsCrdt asks for a token before each request. If a request comes back
with 401 or 403, it calls auth one more time with
{ reason: "unauthorized", status, previousToken } and retries that request
once.
This lets the caller keep a cached token in the normal path, and only refresh after the server rejects it:
let token = readCachedGatewayJwt();
const transport = new StreamsCrdt({
streamUrl,
adapter,
auth: async (context) => {
if (context?.reason === "unauthorized") {
token = await refreshGatewayJwt();
}
return token;
},
});If your callback already returns a fresh token every time, you can ignore the
context argument.
What if the auth callback throws?
Throwing from the callback is treated as a transient failure. The runtime
wraps the error as auth_provider_error (retryable: true) so a hiccupping
auth backend doesn't terminate the live read/write loops — they retry under
the same exponential backoff used for server_error and network_error.
For the "user is permanently logged out" case, return undefined (or
null/empty string) instead of throwing. The request goes out without an
Authorization header, the server returns 401, and you receive the
non-retryable auth_failed instead — which the application typically routes
to a sign-in page.
auth: async () => {
try {
return await fetchTokenFromAuthBackend(); // network → throw → retry
} catch (e) {
if (isPermanentLogoutError(e)) {
return undefined; // → server 401 → auth_failed (terminal)
}
throw e; // → auth_provider_error (retryable)
}
}End-to-End Encryption (E2EE)
e2ee enables end-to-end encryption for CRDT update batches and snapshots
before their bytes are appended to Durable Streams. Auth stays separate:
gateway tokens authorize HTTP requests, while document keys protect document
bytes from the server and unauthorized readers.
payloadProtection remains available as a deprecated alias for compatibility.
Do not pass both e2ee and payloadProtection on the same StreamsCrdt
instance.
When e2ee is omitted, streams-crdt keeps the current plaintext
wire format. When it is present, the transport still uses
application/octet-stream, but update batches and snapshots may carry an
encrypted envelope inside the existing length-prefixed framing.
For the design rationale and wire format, see
docs/plans/2026-04-09-streams-crdt-payload-protection-design.md.
const roomKey = new Uint8Array(32); // Load this from your own key store.
crypto.getRandomValues(roomKey);
const transport = new StreamsCrdt({
streamUrl,
adapter: createLoroDocAdapter(doc),
e2ee: {
// Defaults when e2ee is present:
readPolicy: "encrypted-only",
writePolicy: "encrypt",
encryption: {
// Stable application identity. Prefer bucket/stream identity over a full
// gateway URL so proxy/host changes do not break decryption.
scope: { bucketId: "my-bucket", streamId: "doc-1" },
writeKey: { id: "room-key-v1", key: roomKey },
// Optional. When omitted, writeKey is also the only read key.
readKeys: async () => [{ id: "room-key-v1", key: roomKey }],
},
},
});Encrypted streams still use application/octet-stream. The transport keeps a
plaintext length-prefix frame around each encrypted envelope, so catch-up reads
can concatenate several appends and clients can still recover payload
boundaries.
E2EE Protocol Summary
E2EE wraps the existing transport framing; it does not replace it.
Encrypted update appends use this pipeline:
adapter.exportUpdates()
-> encodeItems(clearUpdates)
-> encrypt(kind = "update_batch")
-> encodeItems([encryptedEnvelope])
-> POST bodyEncrypted snapshots use:
exportSnapshot -> snapshotCodec.compress -> encrypt(kind = "snapshot")
decrypt(kind = "snapshot") -> snapshotCodec.decompress -> applySnapshotEnvelope v1 is:
magic 4 bytes "LSCE"
version 1 byte 0x01
kind 1 byte 0x01 update_batch, 0x02 snapshot
suite 1 byte 0x01 aes_256_gcm
key_id_len 1 byte
nonce_len 1 byte expected 12
reserved 1 byte 0
key_id key_id_len bytes, UTF-8
nonce nonce_len bytes
ciphertext_and_tag remaining bytesAEAD additional authenticated data (AAD) is constructed from stable local context:
{
"protocol": "loro-streams-crdt-payload-protection",
"version": 1,
"kind": "update_batch",
"suite": "aes_256_gcm",
"keyId": "room-key-v1",
"scope": { "bucketId": "my-bucket", "streamId": "doc-1" }
}AAD is not stored as a separate field on the wire. Parts of it live in the
envelope header; scope comes from the local e2ee.encryption.scope
config. Keep scope stable and avoid full gateway URLs, otherwise old encrypted
history may become unreadable after host or proxy changes.
Mode Matrix
e2ee supports a few distinct shapes. Choose one explicitly:
| Use case | readPolicy | writePolicy | Required encryption config | Remote plaintext accepted? | Local writes sent as |
| --- | --- | --- | --- | --- | --- |
| Existing plaintext stream | omitted | omitted | none | Yes | plaintext |
| Private reader/writer | encrypted-only (default) | encrypt (default) | scope + writeKey | No | encrypted |
| Mixed private reader/writer | allow-plaintext | encrypt | scope + writeKey | Yes | encrypted |
| Public write-only writer | allow-plaintext | plaintext | none | Yes | plaintext via appendWriteOnly() |
| Encrypted reader without a write key | encrypted-only | plaintext | scope + readKeys | No | plaintext if local writes are exported |
There is no separate transport-level "read-only mode" in v1. The last row is
only appropriate when the local CRDT is treated as read-only or the caller's
auth rejects writes. writePolicy: "plaintext" avoids requiring a write key; it
does not suppress local export attempts by itself.
Adapter Compatibility: Loro and Flock
e2ee is adapter-agnostic at the transport layer: the same
options shape works with createLoroDocAdapter(doc) and createFlockAdapter(flock).
Encryption wraps the CRDT payload bytes that the adapter already exports.
That also means one stream must use exactly one CRDT family:
- A stream written with the Loro adapter must be read with the Loro adapter.
- A stream written with the Flock adapter must be read with the Flock adapter.
- Do not mix Loro and Flock replicas against the same stream, whether payload protection is enabled or not. The decrypted inner update bytes and snapshots are CRDT-specific and are not cross-compatible.
Protected Loro room:
import { LoroDoc } from "loro-crdt";
import {
createLoroDocAdapter,
StreamsCrdt,
} from "@loro-dev/streams-crdt/loro";
const doc = new LoroDoc();
const loroTransport = new StreamsCrdt({
streamUrl,
adapter: createLoroDocAdapter(doc),
e2ee: {
encryption: {
scope: { bucketId: "my-bucket", streamId: "doc-1" },
writeKey: { id: "room-key-v1", key: roomKey },
},
},
});Protected Flock room:
import { Flock } from "@loro-dev/flock-wasm";
import {
createFlockAdapter,
StreamsCrdt,
} from "@loro-dev/streams-crdt/flock";
const flock = new Flock();
const flockTransport = new StreamsCrdt({
streamUrl,
adapter: createFlockAdapter(flock),
e2ee: {
encryption: {
scope: { bucketId: "my-bucket", streamId: "room-1" },
writeKey: { id: "room-key-v1", key: roomKey },
},
},
});If one application stack uses Loro and another uses Flock, give them separate streams. Reusing the same key material across those streams is an application decision; it does not make the CRDT payloads interoperable.
Private E2EE Room
This is the default protected mode: all retained updates and snapshots are encrypted, and readers reject any plaintext payload that appears in the stream.
const privateReaderWriter = new StreamsCrdt({
streamUrl,
adapter: createLoroDocAdapter(doc),
e2ee: {
encryption: {
scope: { bucketId: "my-bucket", streamId: "doc-1" },
writeKey: { id: "private-v1", key: roomKey },
// Optional. Keep old keys here during rotation.
readKeys: async () => [{ id: "private-v1", key: roomKey }],
},
},
});Mixed Plaintext and Encrypted Updates
Some applications intentionally allow public writers to append CRDT updates but not read private history. Configure readers explicitly before accepting those plaintext updates:
const privateReaderWriter = new StreamsCrdt({
streamUrl,
adapter: createLoroDocAdapter(doc),
e2ee: {
readPolicy: "allow-plaintext",
writePolicy: "encrypt",
encryption: {
scope: { bucketId: "my-bucket", streamId: "doc-1" },
writeKey: { id: "private-v1", key: roomKey },
},
},
});Plaintext writers should opt in just as explicitly:
const publicWriter = new StreamsCrdt({
streamUrl,
adapter: createLoroDocAdapter(publicDoc),
auth: async () => getWriteOnlyToken(),
e2ee: {
readPolicy: "allow-plaintext",
writePolicy: "plaintext",
},
});
publicDoc.getText("comments").insert(0, "public note");
const appended = await publicWriter.appendWriteOnly();Behavior and Validation Rules
- If
e2eeis present and no policies are specified, the defaults arereadPolicy: "encrypted-only"andwritePolicy: "encrypt". writePolicy: "encrypt"requirese2ee.encryption.scopeande2ee.encryption.writeKey.- Any client that needs to decrypt encrypted payloads must provide
e2ee.encryption.scopeplus at least one read key. IfreadKeysis omitted, the currentwriteKeyis also used as the only read key. payloadProtectionis a deprecated alias fore2ee. Passing both is an error.appendWriteOnly()is the write-only path. It only posts local CRDT batches that this transport observed after construction. It does not bootstrap, catch up, join SSE, upload snapshots, apply remote updates, or persist a remote cursor.appendWriteOnly()must not be mixed withsync()orjoin()on the same transport instance.- After
close(),appendWriteOnly()is permanently unavailable on that instance because the write-only subscription has been released. Create a newStreamsCrdtinstance for later write-only use. - Snapshot uploads are encrypted when
writePolicyis"encrypt". Snapshot upload is rejected whenwritePolicyis"plaintext"; a public writer that cannot read private state should not publish full-document snapshots. - Protected read failures stop before
applySnapshot(),applyRemoteUpdates(), local durability hooks, or remote cursor persistence. - Writers always encrypt with the current
writeKey. Keep old keys inreadKeysuntil retained encrypted history and encrypted snapshots no longer reference them. - Encryption adds envelope overhead. The transport enforces the 256 KB batch cap against the encoded on-wire bytes, so a batch near the limit may split earlier under encryption than it would in plaintext mode.
If e2ee is configured and readPolicy is left at the default
"encrypted-only", remote plaintext updates fail with
code: "payload_protection_error" and reason: "plaintext_forbidden".
E2EE Errors
E2EE-related transport failures surface as
code: "payload_protection_error" with one of these reasons:
| Reason | Meaning |
| --- | --- |
| plaintext_forbidden | A reader configured as encrypted-only saw plaintext data in the stream. |
| missing_read_key | The envelope keyId does not exist in the configured readKeys. |
| decrypt_failed | AES-GCM authentication failed. This usually means the wrong key, wrong scope, or tampered/corrupted bytes. |
| invalid_envelope | The payload was not a supported streams-crdt encrypted envelope. |
| wrong_payload_kind | An encrypted snapshot was used where an update batch was expected, or the reverse. |
| encrypt_failed | Local write-side encryption could not proceed because the key/config/input was invalid. |
Callers usually treat these as operator or configuration errors, not transient
network failures. They are always returned as retryable: false.
E2EE does not hide stream URLs, offsets, payload sizes, timing or auth metadata, and it does not detect a malicious server rolling back or forking history.
Remote Cursor Store
The RemoteCursorStore persists replay progress metadata only — it does
not store any CRDT update data. A remote cursor contains:
nextOffset— where the transport should resume reading fromserverLowerBoundVersion— the CRDT version that the server data representsstreamUrl— identity of the stream
Important: cursor-only persistence is unsafe
If RemoteCursorStore.load() returns a saved cursor, the transport resumes
from nextOffset and assumes the caller has already restored the matching
local CRDT state represented by serverLowerBoundVersion.
If the local CRDT was reset to empty but the cursor was persisted, the
transport will do delta-only catch-up from nextOffset — applying those deltas
against an empty document will fail or produce an invalid state.
Rule of thumb
| Local CRDT state | Cursor store | Safe? |
| --- | --- | --- |
| Not persisted (ephemeral) | InMemoryRemoteCursorStore (default) | Yes |
| Persisted (IndexedDB, SQLite, etc.) | IndexedDbRemoteCursorStore + beforeRemoteCursorSave hook | Yes |
| Not persisted | IndexedDbRemoteCursorStore | No — data loss risk |
If you do not persist the local CRDT payload across reloads, keep the remote
cursor ephemeral too. The default InMemoryRemoteCursorStore is the safe
choice.
Hooking Local Durability
Use beforeRemoteCursorSave to couple local payload persistence with remote
cursor advancement. The transport advances the cursor only after:
- Remote data was applied locally
- The durability hook succeeded
- The cursor store write succeeded
const transport = new StreamsCrdt({
streamUrl: createStreamUrl({
bucketId: "my-bucket",
streamId: "doc-1",
}),
adapter: createLoroDocAdapter(doc),
beforeRemoteCursorSave: async ({ cursor, source }) => {
// Persist local CRDT state before cursor advances.
// `source` is "local" | "remote" | "bootstrap".
await persistLocalPayload(doc);
console.log(source, cursor.nextOffset);
},
});persistLocalPayload(doc) must make the local durable state correspond to the
cursor that is about to be stored. On the next app start, restore that local
state before calling sync() or join().
Persistent Cursor Example
Only use a persisted RemoteCursorStore such as IndexedDbRemoteCursorStore
when you also persist and restore the local CRDT state for the same stream.
import { LoroDoc } from "loro-crdt";
import {
createStreamUrl,
createLoroDocAdapter,
IndexedDbRemoteCursorStore,
StreamsCrdt,
} from "@loro-dev/streams-crdt/loro";
// Step 1: restore local CRDT state first.
const doc = (await restorePersistedDoc("doc-1")) ?? new LoroDoc();
const transport = new StreamsCrdt({
streamUrl: createStreamUrl({
bucketId: "my-bucket",
streamId: "doc-1",
}),
auth: async () => "<gateway-jwt>",
adapter: createLoroDocAdapter(doc),
// Step 2: use a persistent cursor store.
remoteCursorStore: new IndexedDbRemoteCursorStore({
dbName: "my-app-ds-cursors",
}),
// Step 3: persist local state before cursor advances.
beforeRemoteCursorSave: async () => {
await persistLocalPayload("doc-1", doc);
},
});
// join() handles initial sync + live mode.
const joined = await transport.join();The required ordering is:
- Restore the local CRDT state first
- Let the transport apply remote updates
- Persist the updated local CRDT state (via
beforeRemoteCursorSave) - Only then persist the remote cursor
If you cannot provide steps 1 and 3, do not use a persisted remote cursor store.
Snapshot Codec Hooks
Use snapshotCodec when you want transport to transform full snapshots during
upload and bootstrap:
const transport = new StreamsCrdt({
streamUrl,
adapter: createLoroDocAdapter(doc),
snapshotCodec: {
// Sync or async are both allowed.
compress: async (snapshot) => snapshot,
decompress: (snapshot) => snapshot,
},
});Behavior
snapshotCodec.compress(snapshot)runs right before a snapshot is uploaded to Loro StreamssnapshotCodec.decompress(snapshot)runs right after a non-empty snapshot is downloaded from Loro Streams and right beforeadapter.applySnapshot(...)- Both hooks must be provided together
- Both hooks may return either
Uint8ArrayorPromise<Uint8Array> - These hooks affect full snapshots only. Incremental update batches are unchanged
Compatibility
Transport does not version or negotiate snapshot codecs for you.
If you enable snapshotCodec, you must make sure decompress can still read
any older snapshots that are already stored for that stream. This is especially
important when:
- existing snapshots were uploaded without compression
- you change the compression format later
- you roll out the codec gradually across clients
@loro-dev/streams-crdt/zstd
@loro-dev/streams-crdt/zstd is a built-in helper entry point backed by
@bokuweb/zstd-wasm.
pnpm add @loro-dev/streams-crdtThen wire the exported hooks directly into snapshotCodec:
import { LoroDoc } from "loro-crdt";
import {
createLoroDocAdapter,
createStreamUrl,
StreamsCrdt,
} from "@loro-dev/streams-crdt/loro";
import {
compress,
decompress,
} from "@loro-dev/streams-crdt/zstd";
const doc = new LoroDoc();
const transport = new StreamsCrdt({
streamUrl: createStreamUrl({
bucketId: "my-bucket",
streamId: "doc-1",
}),
adapter: createLoroDocAdapter(doc),
snapshotCodec: { compress, decompress },
});@loro-dev/streams-crdt/zstd exports:
compress(snapshot)— async; uses an extra Worker when available and falls back to the current thread otherwisecompressInWorker(snapshot)— async; worker-only compression helpercompressOnCurrentThread(snapshot)— async; current-thread compression helperdecompress(snapshot)— async; current-thread decompression helper
Automatic Snapshot Upload
The transport can upload snapshots automatically during join() when callers
opt in with snapshotUpload. This feature is disabled by default — if you
do not pass snapshotUpload, no snapshots will ever be uploaded.
const transport = new StreamsCrdt({
streamUrl,
adapter: createLoroDocAdapter(doc),
// Enable automatic snapshot upload:
snapshotUpload: {
canUpload: async () => true,
// debounceMs: 10_000, // default: 10 seconds
// minBytesSinceRemoteSnapshot: 102400, // default: 100 KB
},
});Configuration
| Option | Default | Description |
| --- | --- | --- |
| canUpload | (required) | Authorization gate called before each upload attempt. Return false to skip. |
| debounceMs | 10_000 (10 s) | Debounce window after the last local write before attempting upload. |
| minBytesSinceRemoteSnapshot | 102400 (100 KB) | Minimum byte delta between the last remote snapshot offset and the current stream tail. Upload is skipped when the delta is smaller. |
Behavior
- Automatic snapshot upload is considered only during
join(), neversync() - The feature is opt-in: callers must pass
snapshotUploadwith at leastcanUploadto enable it - Successful local appends while
join()is active refresh a debounce window (default 10 s) for a possible snapshot upload check - If a remote apply advances the inferred remote version during that window, the transport treats that as another writer becoming active and skips that snapshot attempt
- At debounce expiry, the transport may enqueue a best-effort
maybeUploadSnapshottask onto the same serialized operation queue used for local append and remote apply - When that task executes, it fetches the latest remote snapshot offset, uses
the latest confirmed local
cursor.nextOffsetas the candidatesnapshot_offset, and uploads only ifcurrent_tail_offset - remote_snapshot_offsetexceeds the configured byte threshold
Entry Points
| Import path | What it adds |
| --- | --- |
| @loro-dev/streams-crdt | Transport core, cursor stores, ID helpers, all shared types |
| @loro-dev/streams-crdt/loro | Everything above + createLoroDocAdapter(doc) |
| @loro-dev/streams-crdt/flock | Everything above + createFlockAdapter(flock) |
| @loro-dev/streams-crdt/zstd | Snapshot compress / decompress hooks backed by @bokuweb/zstd-wasm |
Public API Reference
Root Entry Point
StreamsCrdt— transport runtime classStreamsCrdtOptions— constructor optionsCrdtAdapter— adapter contract interfaceIsolatedCrdtAdapter— optional apply-only adapter used for safe 410 recoveryCrdtUpdateBatch— local update batch typeCrdtApplyOutcome,CrdtApplyReturn,CrdtUnresolvedSpan— adapter apply result types used to report incomplete imports such as Loro pending updatesResult— Rust-styleOk | ErrresultStreamsAuthProvider— auth callback typeTransportError— discriminated error unionapply_incompleteis returned when remote data was read but the CRDT adapter still has unresolved dependencies and the cursor cannot be saved.TransportCatchupParams—catchup()optionsTransportCreateStreamSuccess,TransportDeleteStreamSuccess,TransportCatchupSuccess,TransportSyncSuccess— result payloadsTransportJoinParams—join()optionsTransportSubscription— active live subscription handleTransportRoomStatus—"joined" | "reconnecting" | "disconnected" | "error"SnapshotCodec,SnapshotTransformHook— full snapshot encode/decode hooksWriteOnlyAppendResult— result returned byappendWriteOnly()E2eeError— local error class thrown by low-level E2EE helpers before conversion intoTransportErrorE2eeOptions,E2eeKey,E2eeScope,E2eeEncryptionOptions— E2EE configuration typesPayloadProtectionError,PayloadProtectionOptions,PayloadProtectionKey,PayloadProtectionScope— deprecated compatibility aliasesSnapshotUploadOptions— snapshot upload configurationRemoteCursor— replay progress metadataRemoteCursorStore— abstract cursor storage interfaceRemoteCursorSaveSource—"local" | "remote" | "bootstrap"BeforeRemoteCursorSaveContext,BeforeRemoteCursorSaveHook— durability hook typesInMemoryRemoteCursorStore— ephemeral cursor store (default, always safe)IndexedDbRemoteCursorStore— persistent cursor store (requires local CRDT persistence)IndexedDbRemoteCursorStoreOptions— IndexedDB store configcreateInitialRemoteCursor(...)— seed a cursor storecreateStreamUrl(...)— build stream URL from bucket/stream IDsisValidBucketId(...),isValidRillId(...)— ID validation helpers
Loro Entry Point
@loro-dev/streams-crdt/loro re-exports everything from the root plus:
createLoroDocAdapter(doc)— creates aCrdtAdapterfor oneLoroDoc
Flock Entry Point
@loro-dev/streams-crdt/flock re-exports everything from the root plus:
createFlockAdapter(flock)— creates aCrdtAdapterfor oneFlockreplicaVersionVector,ExportBundle— re-exported from@loro-dev/flock-wasm
Zstd Entry Point
@loro-dev/streams-crdt/zstd exports:
compress(snapshot)— async snapshot compression helpercompressInWorker(snapshot)— async worker-only snapshot compression helpercompressOnCurrentThread(snapshot)— async current-thread snapshot compression helperdecompress(snapshot)— async snapshot decompression helper
Notes
createStreamIfMissingdefaults tofalseand only affects the initial sync/join bootstrapdeleteStream()returns{ deleted: false }when the target stream is already missingsync()andjoin()returnResultobjects instead of implicitly creating missing streamsjoin()resolves only after the initial replay has succeeded; without a stored remote cursor it bootstraps once, then prefers ordinary live SSE and falls back sticky to long-poll- One transport instance maps to one DS stream
- One adapter instance maps to one local CRDT instance
- Built-in remote cursor stores implement
delete(streamUrl)sodeleteStream()can reset replay state cleanly
Repository-only Scripts
The published package only includes built runtime artifacts under dist/.
For repository verification against the hosted backend, run:
pnpm --dir packages/streams-crdt run test:e2e:streams-api
