y-multiplex-websocket-server
v0.3.0
Published
multiplex y-websocket
Readme
y-websocket-multiplex-server
A multiplex-first Yjs websocket server and client provider.
- One physical websocket can carry multiple routed Yjs docs (
docName) in the same namespace. - Works with awareness, optional BroadcastChannel sync, optional persistence, and optional NATS cluster sync.
- Compatible with the bundled CLI server (
y-websocket-multiplex-server) and custom server wiring.
Install
npm i y-multiplex-websocket-server yjsQuick Start
Start the bundled server:
HOST=localhost PORT=1234 npx y-websocket-multiplex-serverPORT defaults to 1234, HOST defaults to localhost.
Client (single routed doc):
import * as Y from 'yjs'
import { MultiplexProvider } from 'y-multiplex-websocket-server/multiplex-provider'
const doc = new Y.Doc()
const provider = new MultiplexProvider('ws://localhost:1234/connect/doc', 'ticket', {
params: { token: 'demo-token' }
})
const binding = provider.attach('version', doc, { awareness: true })
binding.on('status', evt => {
console.log(evt.status)
})Client API (MultiplexProvider)
MultiplexProvider is the supported client protocol.
new MultiplexProvider(serverUrl, namespace, opts?)serverUrl: base websocket URL (example:ws://localhost:1234/connect/doc)namespace: logical namespace segment appended to the URLopts.connect(defaulttrue): provider-level auto connectopts.params: URL query paramsopts.protocols: websocket subprotocol(s)opts.WebSocketPolyfill: custom websocket constructor (Node/non-browser)opts.maxBackoffTime(default10000)
Attach docs:
const binding = provider.attach(docName, ydoc, {
awareness: true, // false by default
connect: true,
disableBc: false, // BroadcastChannel + localStorage fallback via lib0
resyncInterval: -1 // ms; -1 disables periodic resubscribe
})Useful methods:
provider.getWebSocket()-> shared physical websocket (ornullbefore connected)provider.detach(docNameOrBinding)provider.connect()/provider.disconnect()provider.destroy()binding.connect()/binding.disconnect()/binding.destroy()
Common binding events:
status(connected/disconnected)sync(boolean)connection-errorconnection-close
Namespace and docName
There are two levels:
namespace: provider/server isolation scopedocName: multiplex sub-route inside a namespace
If client does:
provider.attach('version', doc)under namespace ticket, server-side doc identity is ticket + version.
Bundled Server
The package ships with a CLI runtime (src/server.js):
It uses setupWSConnection(...) internally and serves a basic HTTP 200 health response.
Also, debounced HTTP callback on document updates.
Environment variables:
NATS_SERVERS: comma-separated serversNATS_NODE_ID: optional node id (defaults tohost:port:pid)NATS_RESYNC_INTERVAL: periodic resync interval in ms (default-1, disabled)CALLBACK_URLCALLBACK_DEBOUNCE_WAIT(default2000)CALLBACK_DEBOUNCE_MAXWAIT(default10000)CALLBACK_TIMEOUT(default5000)CALLBACK_OBJECTS(JSON map of shared object name -> type)
Example:
npx y-websocket-multiplex-serverCALLBACK_URL=http://localhost:3000/ \
CALLBACK_OBJECTS='{"prosemirror":"XmlFragment"}' \
npx y-websocket-multiplex-serverNATS_SERVERS=nats://127.0.0.1:4222
NATS_NODE_ID=node-a
npx y-websocket-multiplex-serverCustom Server Wiring
import http from 'http'
import WebSocket from 'ws'
import { setupWSConnection } from 'y-multiplex-websocket-server/utils-connection'
import { getDoc, getConnectionsForDoc } from 'y-multiplex-websocket-server/utils-docs'
const server = http.createServer((_req, res) => {
res.writeHead(200, { 'Content-Type': 'text/plain' })
res.end('okay')
})
const wss = new WebSocket.Server({ noServer: true })
wss.on('connection', (ws, req) => {
setupWSConnection('ticket', ws, req)
const doc = getDoc('ticket', 'version')
console.log(doc.docName)
console.log(getConnectionsForDoc('ticket', 'version').length)
})
server.on('upgrade', (req, socket, head) => {
wss.handleUpgrade(req, socket, head, ws => {
wss.emit('connection', ws, req)
})
})
server.listen(1234)Cluster Sync with NATS
This project includes server-to-server ydoc sync feature.
Setup cluster mode when host project boot:
import { setupYdocCluster } from 'y-multiplex-websocket-server/cluster'
setupYdocCluster({
nodeId: hostNodeId,
nats: {
connection: hostNatsConnection,
subjectTemplate: {
broadcast: 'myapp.ydoc.broadcast.{topic}.{channel}.{event}',
unicast: 'myapp.ydoc.unicast.{nodeId}.{method}'
}
},
resyncInterval: 30000, // ms; default is -1 (disabled), enable as an anti-entropy fallback for missed sync
chooseSyncNode: (docKey, aliveNodes, currentSyncNode) => host.pickSyncNode(docKey, aliveNodes, currentSyncNode)
});
And host project pass cluster nodes changed info at its nodes changed event:
import { getYdocCluster } from 'y-multiplex-websocket-server/cluster'
host.onMembershipChanged(({ aliveNodeIds, leaderNodeId, removedNodeIds }) => {
const ydocCluster = getYdocCluster();
if (!ydocCluster) return;
removedNodeIds.forEach(nodeId => ydocCluster.removeNode(nodeId));
ydocCluster.setNodes(leaderNodeId, aliveNodeIds);
})setupYdocCluster(options) configurable Fields
nodeId: string(required): current cluster node id.bus?: ClusterBus: custom cluster bus implementation (when set, built-in NATS bus creation is skipped).nats?: { ... }: built-in NATS bus options.connection?: reuse an existing NATS connection.connectOptions?: NATS connection options when creating a new connection.subjectTemplate?: custom subject templates for broadcast/unicast.requestTimeoutMs?: request timeout for unicast sync RPC.maxRetries?: retry count for unicast sync RPC.closeNatsOnClose?: whether to close shared NATS connection on cluster close (connection mode only).
resyncInterval?: number: periodic cluster anti-entropy interval in milliseconds. Default-1(disabled).chooseSyncNode?: (docKey, aliveNodes, currentSyncNode) => string | null: custom sync target picker per doc.
NATS Subjects and Methods
Logical names used by cluster sync:
doc.{namespace}-{docName}.updatedoc.{namespace}-{docName}.awarenessdoc.{namespace}-{docName}.anti-entropybus.awareness.anti-entropy(shared by awareness anti-entropy solicit + shard payloads)
Purpose of each logical name:
doc.{namespace}-{docName}.update: broadcasts Yjs document updates between nodes.doc.{namespace}-{docName}.awareness: broadcasts real-time awareness deltas (presence/cursor/user state).doc.{namespace}-{docName}.anti-entropy: unicast request/reply method for document anti-entropy catch-up (state-vector diff pull).bus.awareness.anti-entropy: cluster-wide awareness anti-entropy bus; carries both solicitation signals and awareness shard payloads for reconciliation.
Default NATS subjects (without subjectTemplate):
broadcast.doc.{namespace}-{docName}.updatebroadcast.doc.{namespace}-{docName}.awarenessunicast.doc.{nodeId}.{namespace}-{docName}.anti-entropybroadcast.bus.awareness.anti-entropy
With subjectTemplate configured:
broadcasttemplate maps{topic}.{channel}.{event}from logical namesunicasttemplate maps{nodeId}+{method}where method isdoc.{namespace}-{docName}.anti-entropy
Subject template validation:
broadcastmust include{topic},{channel},{event}unicastmust include{nodeId},{method}- unknown tokens are rejected
Persistence
Set persistence once at startup using setPersistence(...):
import { RedisPersistence } from 'y-redis'
import { setPersistence } from 'y-multiplex-websocket-server/utils-docs'
const redisPersistence = new RedisPersistence({
redisOpts: { host: '127.0.0.1', port: 6379 }
})
setPersistence({
bindState: async (docName, ydoc) => redisPersistence.bindState(docName, ydoc),
unbindState: async (docName) => redisPersistence.closeDoc(docName)
})Adapter shape:
bindState(name, doc)unbindState(name, doc)
Origin Conventions
ydoc.on('update', (update, origin) => { ... }) forwards the Yjs origin value.
This project normalizes all update origins to a shared shape:
origin = {
source,
meta: { docId, receivedAt, updateId }
}To reliably receive the structured { source, meta } origin in ydoc.on('update') for local changes, wrap local mutations in doc.transact(...).
The transaction origin is then normalized by this project.
If you call Y.applyUpdate(doc, update, origin) directly, Yjs forwards that origin value as-is.
For example, Y.applyUpdate(doc, update, 'persister') will emit 'persister' (not a structured origin object).
If you need structured origin on direct apply, pass an object origin explicitly.
source values by path:
- Cluster sync updates:
source: 'cluster' - Cluster catch-up updates:
source: 'catchup' - Websocket client sync updates:
source: 'client' - Local updates without explicit origin (
doc.transact(fn)):source: 'local' - Local updates with explicit origin (
doc.transact(fn, origin)):sourceis set to the passedoriginvalue as-is
Cluster mode keeps extra metadata in meta:
senderNodeIdreceiverNodeId
Persistence guidance for host applications:
- Persist only business/live updates (for example
source: 'cluster'and your local business sources). - Do not persist
source: 'catchup'updates. - In other words, catch-up updates are transport diffs and should be ignored by persistence in the host project.
For example, you can use source: 'replay_from_db' when replaying persisted updates and skip re-persisting those updates in your adapter.
Cluster Synchronous Mechanism
In cluster mode, we use NATS for synchronous, there are two sync paths running together:
ydoccontent syncawarenesspresence sync
Realtime broadcast timing:
- On local Yjs doc update, broadcast immediately to
doc.{namespace}-{docName}.update. - On local awareness change, broadcast immediately to
doc.{namespace}-{docName}.awareness.
Anti-entropy timing:
ydocanti-entropy:- runs once in background after
bindDoc(eager catch-up), - runs periodically when
resyncInterval > 0, - can be triggered by host via
resyncDoc/resyncAllDocs.
- runs once in background after
awarenessanti-entropy:- runs after
bindDoc, - runs periodically when
resyncInterval > 0, - runs again when
setNodes(...)detects remote node join.
- runs after
Host/runtime responsibility:
- Host is responsible for membership detection (alive nodes, removed nodes, optional sync node/leader).
- Host should push topology updates into this project by calling:
setNodes(syncNode, aliveNodeIds)removeNode(nodeId)for explicit removals.
What this project does after host topology updates:
- removes awareness owned by down nodes,
- drops stale snapshots for removed nodes,
- runs awareness reconciliation when needed to converge presence state.
bindDoc in cluster mode:
bindDoc(namespace, docName, doc, awareness)attaches one doc to cluster sync.- After binding, this doc starts participating in cross-node
update/awareness/ anti-entropy. - Without
bindDoc, a doc does not join server-to-server cluster synchronization. - In non-cluster mode,
bindDocwill NOT be called.
Awareness ownership map:
- Each bound doc keeps
ownedAwarenessClientsByNode: Map<nodeId, Set<clientId>>. - On awareness changes, changed clients are reassigned to the sender node (and removed from other owners).
- On topology down events (
setNodes/removeNode), awareness owned by removed nodes is cleaned up.
Package Exports
y-multiplex-websocket-server/servery-multiplex-websocket-server/utils-docsy-multiplex-websocket-server/utils-connectiony-multiplex-websocket-server/multiplex-providery-multiplex-websocket-server/callbacky-multiplex-websocket-server/cluster
License
MIT © Cyrus NG
