@loro-dev/streams-crdt
v0.6.1
Published
Transport/runtime layer for synchronizing CRDT state over Durable Streams.
Readme
@loro-dev/streams-crdt
Transport layer for synchronizing CRDT state over Loro Streams (Durable Streams).
Core Concepts
Two-class model
| Class | Responsibility |
| --- | --- |
| StreamsCrdt | Transport runtime bound to one stream URL. Handles auth, reconnect, bootstrap, SSE/long-poll, binary framing, and remote cursor persistence. |
| CrdtAdapter | Binding for one local CRDT instance. Handles snapshot export/import, version tracking, local update export, remote update application, and optional isolated 410 recovery inference. |
Operations
| Method | Purpose |
| --- | --- |
| createStream() | Creates the stream on the server. Returns { created: true } on first call, { created: false } if it already exists. |
| deleteStream() | Deletes the stream. Closes any active join() first. |
| sync() | One-shot bootstrap/catch-up cycle. Syncs remote state into local CRDT and pushes any local changes to the server. Does not enter live mode. |
| join() | Initial sync plus live subscription. Internally performs the same sync as sync(), then enters a persistent SSE/long-poll read loop and starts forwarding local writes. No need to call sync() before join(). |
| close() | Closes any active join and releases resources. |
join()includes an initial sync. You do not need to callsync()beforejoin(). Whenjoin()resolves, the local CRDT is already caught up with the server and the live subscription is active.
Error handling — Result, not exceptions
All transport operations return a Result<T, TransportError> instead of
throwing. Callers must check .ok before accessing the value:
const result = await transport.join();
if (!result.ok) {
// result.error is a TransportError with a discriminated `code` field.
switch (result.error.code) {
case "stream_not_found":
// The stream doesn't exist yet — create it first.
break;
case "auth_failed":
// Token expired or unauthorized.
break;
case "protocol_error":
case "internal_error":
// Non-retryable: bad server response or local runtime/adapter failure.
break;
case "network_error":
case "timeout":
// Transient — safe to retry.
break;
default:
console.error("transport error:", result.error);
}
return;
}
const sub = result.value; // TransportSubscriptionEvery error carries a retryable flag so callers can decide whether to retry
without hard-coding error codes:
if (!result.ok && result.error.retryable) {
// safe to retry after a delay
}Why
Resultinstead ofthrow? Transport failures (network errors, auth expiry, missing streams) are expected at runtime, not exceptional. Wrapping them inResultmakes the error path explicit in the type system so callers cannot accidentally ignore failures with a missingtry/catch.
Quick Start (Loro)
import { LoroDoc } from "loro-crdt";
import {
createStreamUrl,
createLoroDocAdapter,
StreamsCrdt,
} from "@loro-dev/streams-crdt/loro";
const doc = new LoroDoc();
const transport = new StreamsCrdt({
streamUrl: createStreamUrl({
bucketId: "my-bucket",
streamId: "doc-1",
}),
auth: async () => "<gateway-jwt>",
adapter: createLoroDocAdapter(doc),
// Enable automatic snapshot upload so new readers bootstrap faster.
snapshotUpload: { canUpload: async () => true },
});
// Create the stream (idempotent).
const created = await transport.createStream();
if (!created.ok) throw created.error;
// Join enters live collaborative editing mode.
// This performs an initial sync automatically before entering live mode.
const joined = await transport.join({
onStatusChange(status) {
console.log("room status:", status);
},
});
if (!joined.ok) throw joined.error;
const sub = joined.value;
// The doc is now syncing in real time.
// Call sub.unsubscribe() or transport.close() to leave.createLoroDocAdapter(doc) requires an attached LoroDoc. If the document was
checked out or detached, call checkoutToLatest() or attach() before passing
it to StreamsCrdt.
Quick Start (Flock)
import { Flock } from "@loro-dev/flock-wasm";
import {
createStreamUrl,
createFlockAdapter,
StreamsCrdt,
} from "@loro-dev/streams-crdt/flock";
const flock = new Flock();
const transport = new StreamsCrdt({
streamUrl: createStreamUrl({
bucketId: "my-bucket",
streamId: "room-1",
}),
auth: async () => "<gateway-jwt>",
adapter: createFlockAdapter(flock),
});
await transport.createStream();
const joined = await transport.join();
if (!joined.ok) throw joined.error;
const sub = joined.value;When to Use sync() vs join()
Use sync() when you need a one-shot catch-up without entering live mode:
// Offline-first: catch up on app start, then work offline.
const synced = await transport.sync();
if (!synced.ok) throw synced.error;
// synced.value.cursor contains the replay progress after sync.Use join() for real-time collaboration. It does everything sync() does
and then keeps the connection alive for continuous bidirectional sync:
const joined = await transport.join();Delete Stream
const deleted = await transport.deleteStream();
if (!deleted.ok) throw deleted.error;
console.log(deleted.value.deleted); // true when deleted now, false when already missingdeleteStream() closes any active join first. When the configured
remoteCursorStore implements delete(streamUrl) (the built-in stores do),
the transport also clears the stored replay cursor for that stream.
Auth
auth may be:
- a static string
- a sync callback returning a string
- an async callback returning a string
StreamsCrdt asks for a token before each request. If a request comes back
with 401 or 403, it calls auth one more time with
{ reason: "unauthorized", status, previousToken } and retries that request
once.
This lets the caller keep a cached token in the normal path, and only refresh after the server rejects it:
let token = readCachedGatewayJwt();
const transport = new StreamsCrdt({
streamUrl,
adapter,
auth: async (context) => {
if (context?.reason === "unauthorized") {
token = await refreshGatewayJwt();
}
return token;
},
});If your callback already returns a fresh token every time, you can ignore the
context argument.
Remote Cursor Store
The RemoteCursorStore persists replay progress metadata only — it does
not store any CRDT update data. A remote cursor contains:
nextOffset— where the transport should resume reading fromserverLowerBoundVersion— the CRDT version that the server data representsstreamUrl— identity of the stream
Important: cursor-only persistence is unsafe
If RemoteCursorStore.load() returns a saved cursor, the transport resumes
from nextOffset and assumes the caller has already restored the matching
local CRDT state represented by serverLowerBoundVersion.
If the local CRDT was reset to empty but the cursor was persisted, the
transport will do delta-only catch-up from nextOffset — applying those deltas
against an empty document will fail or produce an invalid state.
Rule of thumb
| Local CRDT state | Cursor store | Safe? |
| --- | --- | --- |
| Not persisted (ephemeral) | InMemoryRemoteCursorStore (default) | Yes |
| Persisted (IndexedDB, SQLite, etc.) | IndexedDbRemoteCursorStore + beforeRemoteCursorSave hook | Yes |
| Not persisted | IndexedDbRemoteCursorStore | No — data loss risk |
If you do not persist the local CRDT payload across reloads, keep the remote
cursor ephemeral too. The default InMemoryRemoteCursorStore is the safe
choice.
Hooking Local Durability
Use beforeRemoteCursorSave to couple local payload persistence with remote
cursor advancement. The transport advances the cursor only after:
- Remote data was applied locally
- The durability hook succeeded
- The cursor store write succeeded
const transport = new StreamsCrdt({
streamUrl: createStreamUrl({
bucketId: "my-bucket",
streamId: "doc-1",
}),
adapter: createLoroDocAdapter(doc),
beforeRemoteCursorSave: async ({ cursor, source }) => {
// Persist local CRDT state before cursor advances.
// `source` is "local" | "remote" | "bootstrap".
await persistLocalPayload(doc);
console.log(source, cursor.nextOffset);
},
});persistLocalPayload(doc) must make the local durable state correspond to the
cursor that is about to be stored. On the next app start, restore that local
state before calling sync() or join().
Persistent Cursor Example
Only use a persisted RemoteCursorStore such as IndexedDbRemoteCursorStore
when you also persist and restore the local CRDT state for the same stream.
import { LoroDoc } from "loro-crdt";
import {
createStreamUrl,
createLoroDocAdapter,
IndexedDbRemoteCursorStore,
StreamsCrdt,
} from "@loro-dev/streams-crdt/loro";
// Step 1: restore local CRDT state first.
const doc = (await restorePersistedDoc("doc-1")) ?? new LoroDoc();
const transport = new StreamsCrdt({
streamUrl: createStreamUrl({
bucketId: "my-bucket",
streamId: "doc-1",
}),
auth: async () => "<gateway-jwt>",
adapter: createLoroDocAdapter(doc),
// Step 2: use a persistent cursor store.
remoteCursorStore: new IndexedDbRemoteCursorStore({
dbName: "my-app-ds-cursors",
}),
// Step 3: persist local state before cursor advances.
beforeRemoteCursorSave: async () => {
await persistLocalPayload("doc-1", doc);
},
});
// join() handles initial sync + live mode.
const joined = await transport.join();The required ordering is:
- Restore the local CRDT state first
- Let the transport apply remote updates
- Persist the updated local CRDT state (via
beforeRemoteCursorSave) - Only then persist the remote cursor
If you cannot provide steps 1 and 3, do not use a persisted remote cursor store.
Snapshot Codec Hooks
Use snapshotCodec when you want transport to transform full snapshots during
upload and bootstrap:
const transport = new StreamsCrdt({
streamUrl,
adapter: createLoroDocAdapter(doc),
snapshotCodec: {
// Sync or async are both allowed.
compress: async (snapshot) => snapshot,
decompress: (snapshot) => snapshot,
},
});Behavior
snapshotCodec.compress(snapshot)runs right before a snapshot is uploaded to Loro StreamssnapshotCodec.decompress(snapshot)runs right after a non-empty snapshot is downloaded from Loro Streams and right beforeadapter.applySnapshot(...)- Both hooks must be provided together
- Both hooks may return either
Uint8ArrayorPromise<Uint8Array> - These hooks affect full snapshots only. Incremental update batches are unchanged
Compatibility
Transport does not version or negotiate snapshot codecs for you.
If you enable snapshotCodec, you must make sure decompress can still read
any older snapshots that are already stored for that stream. This is especially
important when:
- existing snapshots were uploaded without compression
- you change the compression format later
- you roll out the codec gradually across clients
@loro-dev/streams-crdt/zstd
@loro-dev/streams-crdt/zstd is a built-in helper entry point backed by
@bokuweb/zstd-wasm.
pnpm add @loro-dev/streams-crdtThen wire the exported hooks directly into snapshotCodec:
import { LoroDoc } from "loro-crdt";
import {
createLoroDocAdapter,
createStreamUrl,
StreamsCrdt,
} from "@loro-dev/streams-crdt/loro";
import {
compress,
decompress,
} from "@loro-dev/streams-crdt/zstd";
const doc = new LoroDoc();
const transport = new StreamsCrdt({
streamUrl: createStreamUrl({
bucketId: "my-bucket",
streamId: "doc-1",
}),
adapter: createLoroDocAdapter(doc),
snapshotCodec: { compress, decompress },
});@loro-dev/streams-crdt/zstd exports:
compress(snapshot)— async; uses an extra Worker when available and falls back to the current thread otherwisecompressInWorker(snapshot)— async; worker-only compression helpercompressOnCurrentThread(snapshot)— async; current-thread compression helperdecompress(snapshot)— async; current-thread decompression helper
Automatic Snapshot Upload
The transport can upload snapshots automatically during join() when callers
opt in with snapshotUpload. This feature is disabled by default — if you
do not pass snapshotUpload, no snapshots will ever be uploaded.
const transport = new StreamsCrdt({
streamUrl,
adapter: createLoroDocAdapter(doc),
// Enable automatic snapshot upload:
snapshotUpload: {
canUpload: async () => true,
// debounceMs: 10_000, // default: 10 seconds
// minBytesSinceRemoteSnapshot: 102400, // default: 100 KB
},
});Configuration
| Option | Default | Description |
| --- | --- | --- |
| canUpload | (required) | Authorization gate called before each upload attempt. Return false to skip. |
| debounceMs | 10_000 (10 s) | Debounce window after the last local write before attempting upload. |
| minBytesSinceRemoteSnapshot | 102400 (100 KB) | Minimum byte delta between the last remote snapshot offset and the current stream tail. Upload is skipped when the delta is smaller. |
Behavior
- Automatic snapshot upload is considered only during
join(), neversync() - The feature is opt-in: callers must pass
snapshotUploadwith at leastcanUploadto enable it - Successful local appends while
join()is active refresh a debounce window (default 10 s) for a possible snapshot upload check - If a remote apply advances the inferred remote version during that window, the transport treats that as another writer becoming active and skips that snapshot attempt
- At debounce expiry, the transport may enqueue a best-effort
maybeUploadSnapshottask onto the same serialized operation queue used for local append and remote apply - When that task executes, it fetches the latest remote snapshot offset, uses
the latest confirmed local
cursor.nextOffsetas the candidatesnapshot_offset, and uploads only ifcurrent_tail_offset - remote_snapshot_offsetexceeds the configured byte threshold
Entry Points
| Import path | What it adds |
| --- | --- |
| @loro-dev/streams-crdt | Transport core, cursor stores, ID helpers, all shared types |
| @loro-dev/streams-crdt/loro | Everything above + createLoroDocAdapter(doc) |
| @loro-dev/streams-crdt/flock | Everything above + createFlockAdapter(flock) |
| @loro-dev/streams-crdt/zstd | Snapshot compress / decompress hooks backed by @bokuweb/zstd-wasm |
Public API Reference
Root Entry Point
StreamsCrdt— transport runtime classStreamsCrdtOptions— constructor optionsCrdtAdapter— adapter contract interfaceIsolatedCrdtAdapter— optional apply-only adapter used for safe 410 recoveryCrdtUpdateBatch— local update batch typeResult— Rust-styleOk | ErrresultStreamsAuthProvider— auth callback typeTransportError— discriminated error unionTransportCreateStreamSuccess,TransportDeleteStreamSuccess,TransportSyncSuccess— result payloadsTransportJoinParams—join()optionsTransportSubscription— active live subscription handleTransportRoomStatus—"joined" | "reconnecting" | "disconnected" | "error"SnapshotCodec,SnapshotTransformHook— full snapshot encode/decode hooksSnapshotUploadOptions— snapshot upload configurationRemoteCursor— replay progress metadataRemoteCursorStore— abstract cursor storage interfaceRemoteCursorSaveSource—"local" | "remote" | "bootstrap"BeforeRemoteCursorSaveContext,BeforeRemoteCursorSaveHook— durability hook typesInMemoryRemoteCursorStore— ephemeral cursor store (default, always safe)IndexedDbRemoteCursorStore— persistent cursor store (requires local CRDT persistence)IndexedDbRemoteCursorStoreOptions— IndexedDB store configcreateInitialRemoteCursor(...)— seed a cursor storecreateStreamUrl(...)— build stream URL from bucket/stream IDsisValidBucketId(...),isValidRillId(...)— ID validation helpers
Loro Entry Point
@loro-dev/streams-crdt/loro re-exports everything from the root plus:
createLoroDocAdapter(doc)— creates aCrdtAdapterfor oneLoroDoc
Flock Entry Point
@loro-dev/streams-crdt/flock re-exports everything from the root plus:
createFlockAdapter(flock)— creates aCrdtAdapterfor oneFlockreplicaVersionVector,ExportBundle— re-exported from@loro-dev/flock-wasm
Zstd Entry Point
@loro-dev/streams-crdt/zstd exports:
compress(snapshot)— async snapshot compression helpercompressInWorker(snapshot)— async worker-only snapshot compression helpercompressOnCurrentThread(snapshot)— async current-thread snapshot compression helperdecompress(snapshot)— async snapshot decompression helper
Notes
createStreamIfMissingdefaults tofalseand only affects the initial sync/join bootstrapdeleteStream()returns{ deleted: false }when the target stream is already missingsync()andjoin()returnResultobjects instead of implicitly creating missing streamsjoin()resolves only after the initial replay has succeeded; without a stored remote cursor it bootstraps once, then prefers ordinary live SSE and falls back sticky to long-poll- One transport instance maps to one DS stream
- One adapter instance maps to one local CRDT instance
- Built-in remote cursor stores implement
delete(streamUrl)sodeleteStream()can reset replay state cleanly
Repository-only Scripts
The published package only includes built runtime artifacts under dist/.
For repository verification against the hosted backend, run:
pnpm --dir packages/streams-crdt run test:e2e:streams-api
