npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

y-durablestream

v0.3.0

Published

Yjs document synchronization between Cloudflare Durable Objects via broadcast buffer and pull-based ReadableStream with pluggable storage backends

Downloads

268

Readme

y-durablestream

npm version CI License: MIT

Yjs document synchronization between Cloudflare Durable Objects via TransformStream.

Overview

y-durablestream enables Durable Object-to-Durable Object Yjs document synchronization using the Cloudflare TransformStream API. An upstream provider Durable Object hosts the authoritative document and streams updates to any number of downstream subscriber Durable Objects over ReadableStream. Subscribers push local changes back via direct RPC.

┌──────────────────────┐                                ┌──────────────────────┐
│  Subscriber DO       │    RPC: subscribe()            │  Provider DO         │
│  (YStreamClient)     │ ──────────────────────────▶    │  (YStreamProvider)   │
│                      │   ◀── ReadableStream<Uint8Array>│                      │
│  local Y.Doc         │        (sync + updates)        │  authoritative Y.Doc │
│                      │                                │  pluggable storage   │
│                      │    RPC: update(Uint8Array)     │                      │
│                      │ ──────────────────────────▶    │  broadcasts to all   │
└──────────────────────┘        (local changes)         └──────────────────────┘

Features

  • Stream-based sync — provider streams Yjs protocol messages to subscribers via ReadableStream, subscribers send changes back via RPC
  • Full Yjs sync protocol — SyncStep1/SyncStep2 handshake on connect, incremental updates after
  • Pluggable storage — swap persistence backends by implementing the YDocStorage interface
  • Two built-in backends — async KV API (default) and synchronous SQLite API (cheaper, atomic)
  • Length-prefixed framing — reliable message delivery over arbitrarily chunked streams
  • No WebSockets required — pure DO-to-DO communication, no browser in the loop

Installation

npm install y-durablestream

yjs, y-protocols, and lib0 are included as dependencies and do not need to be installed separately.

Quick Start

1. Define the provider Durable Object

// src/provider.ts
import { YStreamProvider } from "y-durablestream";

export class DocProvider extends YStreamProvider<Env> {}

2. Define a subscriber Durable Object

// src/subscriber.ts
import { DurableObject } from "cloudflare:workers";
import { Doc } from "yjs";
import { YStreamClient } from "y-durablestream";

export class Subscriber extends DurableObject<Env> {
  private doc = new Doc();
  private client: YStreamClient | null = null;

  async connectToDoc(docName: string): Promise<void> {
    const stub = this.env.DOC_PROVIDER.get(
      this.env.DOC_PROVIDER.idFromName(docName),
    );
    this.client = new YStreamClient(this.doc, { stub });
    this.ctx.waitUntil(this.client.connect());
  }

  async getText(field: string): Promise<string> {
    return this.doc.getText(field).toString();
  }

  async disconnect(): Promise<void> {
    this.client?.disconnect();
    this.client = null;
  }
}

3. Configure wrangler

# wrangler.toml
name = "my-worker"
main = "src/index.ts"
compatibility_date = "2025-01-01"
compatibility_flags = ["nodejs_compat"]

[[durable_objects.bindings]]
name = "DOC_PROVIDER"
class_name = "DocProvider"

[[durable_objects.bindings]]
name = "SUBSCRIBER"
class_name = "Subscriber"

[[migrations]]
tag = "v1"
new_sqlite_classes = ["DocProvider"]
new_classes = ["Subscriber"]

Note: The provider class must be declared in new_sqlite_classes (not new_classes) to enable the Durable Object storage APIs used by both built-in storage backends.

API Reference

YStreamProvider<E>

A Durable Object base class that hosts an authoritative Yjs document and streams updates to subscribers.

import { YStreamProvider } from "y-durablestream";

export class DocProvider extends YStreamProvider<Env> {}

RPC Methods

| Method | Signature | Description | |---|---|---| | subscribe() | () => Promise<ReadableStream<Uint8Array>> | Subscribe to document updates. Returns a stream of length-framed Yjs sync messages. | | update(data) | (data: Uint8Array) => Promise<void> | Receive a Yjs sync protocol message from a subscriber. | | applyUpdate(update) | (update: Uint8Array) => Promise<void> | Apply a raw Yjs update directly (for server-side mutations). | | getYDoc() | () => Promise<Uint8Array> | Return the full document state as Y.encodeStateAsUpdate(). |

Protected Properties

| Property | Type | Default | Description | |---|---|---|---| | doc | Doc | new Doc({ gc: true }) | The authoritative Yjs document. | | storage | YDocStorage | DurableObjectKvStorage | The pluggable storage backend. | | maxBytes | number | 10240 | Max incremental update bytes before compaction. | | maxUpdates | number | 500 | Max incremental update count before compaction. |

createStorage()

Override this factory method in a subclass to use a different storage backend:

import { YStreamProvider, DurableObjectSqlStorage } from "y-durablestream";
import type { YDocStorage } from "y-durablestream";

export class SqlDocProvider extends YStreamProvider<Env> {
  protected override createStorage(): YDocStorage {
    return new DurableObjectSqlStorage(this.ctx.storage, {
      maxBytes: 20 * 1024,
      maxUpdates: 1000,
    });
  }
}

onStart()

Override this lifecycle hook (called inside blockConcurrencyWhile) to run additional initialization. Always call super.onStart():

export class DocProvider extends YStreamProvider<Env> {
  protected override async onStart(): Promise<void> {
    await super.onStart();
    // Custom initialization here
  }
}

YStreamClient

Synchronizes a local Y.Doc with an upstream YStreamProvider.

import { YStreamClient } from "y-durablestream";
import { Doc } from "yjs";

const doc = new Doc();
const client = new YStreamClient(doc, { stub });

Constructor

| Parameter | Type | Description | |---|---|---| | doc | Doc | The local Yjs document to synchronize. | | options.stub | YStreamProviderStub | The upstream provider's DO stub. |

Methods

| Method | Signature | Description | |---|---|---| | connect() | () => Promise<void> | Connect and start syncing. Resolves when the stream ends. Wrap in ctx.waitUntil(). | | disconnect() | () => void | Disconnect and clean up all resources. Safe to call multiple times. | | onStatusChange(handler) | (handler: StatusChangeHandler) => () => void | Register a status change listener. Returns an unsubscribe function. |

Properties

| Property | Type | Description | |---|---|---| | status | YStreamClientStatus | Current status: "disconnected" | "connecting" | "connected" | "synced" | | synced | boolean | Whether initial sync with the provider has completed. |


Storage Backends

DurableObjectKvStorage

Default backend using the Durable Object async KV API (ctx.storage.get/put/list/delete).

import { DurableObjectKvStorage } from "y-durablestream";

const storage = new DurableObjectKvStorage(ctx.storage, {
  maxBytes: 10 * 1024,  // optional, default 10KB
  maxUpdates: 500,       // optional, default 500
});

Storage layout:

| KV Key | Value | |---|---| | ydoc:state:doc | Compacted snapshot (Uint8Array) | | ydoc:state:bytes | Total incremental bytes (number) | | ydoc:state:count | Incremental update count (number) | | ydoc:update:<n> | Incremental update n (Uint8Array) |

maxBytes must not exceed 128 KB (the Durable Object KV per-value limit).

DurableObjectSqlStorage

Alternative backend using the Durable Object synchronous SQLite API (ctx.storage.sql).

import { DurableObjectSqlStorage } from "y-durablestream";

const storage = new DurableObjectSqlStorage(ctx.storage, {
  maxBytes: 10 * 1024,  // optional, default 10KB
  maxUpdates: 500,       // optional, default 500
});

Advantages over KV:

  • Lower cost — billed per-row, aggregate queries avoid reading every row
  • Synchronous transactions — uses transactionSync() for truly atomic operations
  • Efficient threshold checksCOUNT(*) / SUM() in a single pass

SQL tables created automatically:

| Table | Columns | Description | |---|---|---| | yjs_snapshot | id INTEGER PK, data BLOB | Single-row compacted snapshot | | yjs_updates | id INTEGER PK AUTOINCREMENT, data BLOB, byte_length INTEGER | Incremental updates |

Requires a SQLite-backed Durable Object (new_sqlite_classes in wrangler.toml).

Custom Storage

Implement the YDocStorage interface to create your own backend:

import type { YDocStorage } from "y-durablestream";
import { Doc } from "yjs";

class MyCustomStorage implements YDocStorage {
  async getYDoc(): Promise<Doc> {
    // Load and return a Doc with all persisted state
  }

  async storeUpdate(update: Uint8Array): Promise<void> {
    // Persist an incremental update, auto-compact when thresholds exceeded
  }

  async commit(doc: Doc): Promise<void> {
    // Force-compact all updates into a single snapshot using the given doc
  }
}

Protocol Utilities

Low-level framing utilities for the length-prefixed message protocol used over TransformStream. You only need these if building custom transport layers.

import { encodeFrame, encodeFrames, createFrameDecoder, FrameDecodeError } from "y-durablestream";

| Export | Description | |---|---| | encodeFrame(message) | Encode a single message with a 4-byte big-endian length header. | | encodeFrames(messages) | Encode multiple messages into a single concatenated buffer. | | createFrameDecoder() | Create a stateful decoder that reconstructs messages from arbitrarily chunked stream data. | | FrameDecodeError | Error class thrown when the decoder encounters invalid frame data. |

How It Works

Connection Flow

  1. Subscriber calls provider.subscribe() via RPC
  2. Provider creates a TransformStream, registers the writable side, returns the ReadableStream
  3. Provider asynchronously writes SyncStep1 + SyncStep2 to bootstrap the subscriber with full document state
  4. Subscriber processes the initial sync burst, transitions to "synced" status
  5. Subscriber sends its own SyncStep1 back via provider.update() so the provider can learn about any state it's missing

Steady-State Updates

  • Provider → Subscriber: when the provider's doc changes (via applyUpdate() or from another subscriber), the update is wrapped in a sync Update message, length-framed, and written to every active subscriber stream
  • Subscriber → Provider: when the subscriber's local doc changes, the update is wrapped in a sync Update message and sent via provider.update() RPC

Persistence

Updates are stored incrementally. When the cumulative byte size or count exceeds configurable thresholds, all incremental updates are compacted into a single snapshot. Compaction also runs automatically when the last subscriber disconnects.

Why Length-Prefixed Framing?

ReadableStream<Uint8Array> delivers data in arbitrary chunks — a single read() may return part of a message, exactly one message, or multiple messages concatenated together. The 4-byte big-endian length prefix on each frame allows the receiver to reconstruct complete Yjs protocol messages regardless of how the stream chunks the data.

TypeScript

The package ships with full TypeScript declarations (.d.ts and .d.cts). Cloudflare Workers types (DurableObjectState, etc.) are not included as a dependency — they come from your project's wrangler types output.

License

MIT © Sam McCord