@nebutra/collab
v0.2.1
Published
Multi-tenant, transport-agnostic real-time collaborative sync layer: tenant-partitioned CRDT rooms (Yjs) with pluggable snapshot store + transport seams
Readme
@nebutra/collab
Multi-tenant, transport-agnostic real-time collaborative sync layer for Nebutra-Sailor. Sailor already had Pusher pub/sub (fire-and-forget broadcast) but no conflict-free concurrent editing. This package fills that gap with tenant-partitioned CRDT rooms built on Yjs (MIT).
It is generic: a node-graph canvas, a rich-text document, or any shared
structure binds to a CollabRoom and gets convergence for free.
Zero-config quickstart
No env, no credentials — real (non-mock) CRDT behaviour out of the box:
import { getCollab } from "@nebutra/collab";
const hub = await getCollab();
const room = hub.room("org_123", "doc-1"); // tenantId is mandatory
room.doc.getText("body").insert(0, "Hello");
room.onUpdate((update) => relayToPeers(update));
await room.snapshot();Public API
| Export | Purpose |
| --- | --- |
| getCollab(config?) / createCollab(config?) | Build a CollabHub (async / sync). |
| CollabHub.room(tenantId, roomId) | Get/create a tenant-scoped CRDT room. Hard-partitioned by tenant. |
| CollabHub.roomRestored(tenantId, roomId) | Like room, but hydrates from the snapshot store first. |
| CollabHub.doctor() | Structured health report (Yjs + store + transport), < 3s. |
| CollabHub.destroy() | Destroy every live room. |
| CollabRoom | doc: Y.Doc, applyUpdate, encodeState, onUpdate (returns unsubscribe), snapshot, destroy. |
| SnapshotStore | Pluggable persistence interface. In-memory default. |
| CollabTransport | Pluggable fan-out interface. In-process loopback default. |
| CollabError | Every thrown error — carries mandatory .code and .suggestion. |
Tenant isolation (security-critical)
Rooms are stored in a Map keyed by a composite tenantId<NUL>roomId
string. The separator is a real NUL (�) which cannot occur in a
normal id, so ("a","bc") and ("ab","c") can never collide into the same
room key. There is no API that takes only a roomId — a room handle is
only ever produced by passing an explicit tenantId, and the snapshot store
and transport are likewise addressed by (tenantId, roomId). The default
in-memory store composes @nebutra/tenant-store's InMemoryTenantStore,
which adds a defense-in-depth tenantId equality check on top of its own
composite key. The isolation property a Prisma adapter would get from RLS is
here provided structurally by the composite key — not trusted from payload.
Snapshot persistence
room.snapshot() encodes the doc and persists it via the injected
SnapshotStore, serialized through withTenantLock(tenantId, roomId, …)
borrowed from @nebutra/tenant-store (same primitive used by canvas/reel —
a future swap to a distributed lock changes one place).
Prisma adapter shape (interface-only — no migration run here)
import type { SnapshotStore } from "@nebutra/collab";
// Suggested table: collab_snapshot(tenant_id, room_id, state Bytes,
// PRIMARY KEY (tenant_id, room_id)) — RLS scoped by tenant_id.
class PrismaSnapshotStore implements SnapshotStore {
constructor(private prisma: PrismaClient) {}
async load(tenantId: string, roomId: string) {
const row = await this.prisma.collabSnapshot.findUnique({
where: { tenantId_roomId: { tenantId, roomId } },
});
return row ? new Uint8Array(row.state) : null;
}
async save(tenantId: string, roomId: string, state: Uint8Array) {
await this.prisma.collabSnapshot.upsert({
where: { tenantId_roomId: { tenantId, roomId } },
create: { tenantId, roomId, state: Buffer.from(state) },
update: { state: Buffer.from(state) },
});
}
}Redis adapter shape
class RedisSnapshotStore implements SnapshotStore {
constructor(private redis: Redis) {}
private k(t: string, r: string) { return `collab:${t}:${r}`; }
async load(t: string, r: string) {
const buf = await this.redis.getBuffer(this.k(t, r));
return buf ? new Uint8Array(buf) : null;
}
async save(t: string, r: string, s: Uint8Array) {
await this.redis.set(this.k(t, r), Buffer.from(s));
}
}Transport seam
CollabTransport is a transport-agnostic fan-out interface
(broadcast(tenantId, roomId, update) + subscribe(tenantId, roomId, cb)).
The default is an in-process loopback so zero-config single-process usage
converges immediately. A Pusher or WebSocket adapter implements the
same two methods and is injected via createCollab({ transport }) — channel
name MUST be derived from (tenantId, roomId), e.g.
collab-${tenantId}-${roomId}, so a subscriber for tenant A's room is never
reached by tenant B's broadcast. This package intentionally ships no
network code (interface + loopback only).
Examples
Runnable under examples/:
zero-config-convergence.ts— real CRDT convergence, no config (also the in-package caller keeping this module in theactivetier).tenant-isolation.ts— proves the tenant partition holds.snapshot-restore.ts— persist then reload across hub lifetimes.
Scripts
pnpm --filter @nebutra/collab test # vitest run
pnpm --filter @nebutra/collab test:coverage # with coverage
pnpm --filter @nebutra/collab typecheck # tsc --noEmit