@loro-dev/streams-client
v0.3.1
Published
Stream-scoped Durable Streams client for TypeScript.
Downloads
956
Readme
@loro-dev/streams-client
TypeScript client for a single Durable Stream URL.
Use this package when you already know the stream URL and want to:
- create or delete the stream
- append bytes or close the stream
- read from an offset
- tail new data with SSE or long-poll
- bootstrap from a snapshot, then continue from the returned offset
This package is protocol-level on purpose. It does not encode CRDT updates for you or manage CRDT state in memory.
Install
pnpm add @loro-dev/streams-clientRuntime requirements:
- modern browsers and worker runtimes are supported
- Node.js
>=18is supported - if your runtime does not expose standard Web APIs like
fetch,Headers,ReadableStream, andDOMException, you need to provide compatible polyfills before using this package
Start Here
The main entry is:
import { StreamsClient } from "@loro-dev/streams-client";Create one client per stream URL:
const client = new StreamsClient({
url: "https://streams-api.loro.dev/ds/my-bucket/my-stream",
auth: async () => accessToken,
});Notes:
urlis the full stream URL, not justbaseUrlorbucketId.authshould return the raw token string. The client addsBearerfor you.- Import entry:
@loro-dev/streams-client - Repo source entry:
src/index.ts - Main client implementation in this repo:
src/client.ts
30-Second Example
import { StreamsClient } from "@loro-dev/streams-client";
const client = new StreamsClient({
url: "https://streams-api.loro.dev/ds/my-bucket/my-stream",
});
const created = await client.create({
contentType: "application/json",
});
if (!created.ok) throw created.result;
const appended = await client.append({
part: {
contentType: "application/json",
body: JSON.stringify({ type: "hello", value: 1 }),
},
});
if (!appended.ok) throw appended.result;
const read = await client.read({ offset: "-1" });
if (!read.ok) throw read.result;
console.log(read.result.payload.json());
for await (const event of client.live({ offset: appended.result.nextOffset })) {
if (event.type === "data") {
console.log(event.payload.json());
}
if (event.type === "eof") {
break;
}
}What You Usually Need
| Task | API | What it returns | Spec |
| -------------------------------- | ---------------------------------------- | --------------------------------------------------------------- | -------------------------------------------------------------------------------------------------------------------- |
| Create or ensure a stream exists | client.create() | created, contentType, nextOffset, closed | docs/ds-protocol.md -> 5.1 Create Stream |
| Append data | client.append() | nextOffset, closed | docs/ds-protocol.md -> 5.2 Append to Stream |
| Close the stream | client.close() | same shape as append | docs/ds-protocol.md -> 5.3 Close Stream |
| Read current stream metadata | client.head() | contentType, nextOffset, closed, optional snapshot info | docs/ds-protocol.md -> 5.5 Stream Metadata |
| Read once from an offset | client.read() | payload plus nextOffset | docs/ds-protocol.md -> 5.6 Read Stream - Catch-up |
| Do one long-poll round trip | client.readOnce({ live: "long-poll" }) | 200 with payload or 204 with no payload | docs/ds-protocol.md -> 5.7 Read Stream - Live (Long-poll) |
| Tail the stream continuously | client.live() | async iterable of data, up_to_date, eof, reconnect events | docs/ds-protocol.md -> 5.7 and 5.8 |
| Open a raw SSE session | client.openSseSession() | lower-level SSE session object | docs/ds-protocol.md -> 5.8 Read Stream - Live (SSE) |
| Delete the stream | client.delete() | deleted | docs/ds-protocol.md -> 5.4 Delete Stream |
For most callers:
- use
read()for one catch-up read - use
live()for long-running tailing - use
openSseSession()only if you need raw SSE control
Result Shape
Every network method returns:
type Result<T, E> = { ok: true; result: T } | { ok: false; result: E };That means your normal flow is:
const result = await client.append(...);
if (!result.ok) {
console.error(result.result);
return;
}
console.log(result.result.nextOffset);If you need request and response details for debugging, pass
includeMeta: true.
Payloads and Offsets
Read APIs return a StreamPart:
const read = await client.read({ offset: "-1" });
if (!read.ok) throw read.result;
read.result.payload.body;
read.result.payload.text();
read.result.payload.json();Offset rules:
- use
-1to read from the earliest retained data - use
nowto start from the current tail - otherwise, reuse the server-returned
nextOffset - treat offsets as opaque strings
Spec: docs/ds-protocol.md -> 6. Offsets
Live Reads
client.live() is the high-level live API:
for await (const event of client.live({ offset: "now" })) {
if (event.type === "data") {
console.log(event.payload.text());
}
}Defaults:
modedefaults to"auto""auto"tries SSE first and falls back to long-poll when needed
Spec:
docs/ds-protocol.md-> 5.7 Read Stream - Live (Long-poll)docs/ds-protocol.md-> 5.8 Read Stream - Live (SSE)
Snapshot and Bootstrap
If your stream supports snapshots, these are the key methods:
| Task | API | Spec |
| ------------------------------------------- | -------------------------------------- | ------------------------------------------------------------------------------------------------------ |
| Read latest visible snapshot | client.readLatestSnapshot() | docs/ds-extensions.md -> 2.3 Read Latest Snapshot |
| Read a specific snapshot | client.readSnapshot(offset) | docs/ds-extensions.md -> 2.5 Read Snapshot |
| Publish a snapshot | client.putSnapshot({ offset, part }) | docs/ds-extensions.md -> 2.2 Publish Snapshot |
| Delete a snapshot | client.deleteSnapshot(offset) | docs/ds-extensions.md -> 2.6 Delete Snapshot |
| Get snapshot + retained updates in one call | client.bootstrap() | docs/ds-extensions.md -> 3. Bootstrap |
Typical recovery flow:
- Call
client.bootstrap() - Apply
snapshotif it exists - Apply
updatesin order - Continue with
client.live({ offset: bootstrap.result.nextOffset })
If a normal read returns 410 Gone, you usually need bootstrap() before you
continue. Spec: docs/ds-extensions.md -> 3.4 Compatibility
Producer Coordination
If you use idempotent writers, pass producer to append() or close():
await client.append({
part: {
contentType: "application/octet-stream",
body: bytes,
},
producer: {
producerId: "writer-1",
epoch: 1,
seq: 42,
},
});Spec: docs/ds-protocol.md -> 5.2.1 Idempotent Producers
Other Exports
These helpers are useful when you need them, but most users only need
StreamsClient:
createStreamPart(contentType, body): build aStreamPartfrom bytesnormalizeStreamBody(body): normalize string,Uint8Array,ArrayBuffer, or view intoUint8ArrayDEFAULT_RETRY_CONFIGDEFAULT_TIMEOUT_CONFIG
Spec Map
This package ships the protocol docs in docs/, so after install you can read:
node_modules/@loro-dev/streams-client/docs/ds-protocol.mdnode_modules/@loro-dev/streams-client/docs/ds-extensions.md
Use this map when you need exact wire semantics:
- stream URL shape and bucket rules:
docs/ds-extensions.md-> 1. Buckets - create, append, close, delete, head, read, long-poll, SSE:
docs/ds-protocol.md - offsets and JSON mode:
docs/ds-protocol.md-> 6. Offsets,docs/ds-protocol.md-> 7.1 JSON Mode - snapshots and bootstrap:
docs/ds-extensions.md-> 2. Snapshots,docs/ds-extensions.md-> 3. Bootstrap
