@actuate-media/realtime
v0.1.1
Published
Transport-agnostic Yjs sync + awareness primitives that power Actuate CMS real-time collaboration.
Readme
@actuate-media/realtime
Transport-agnostic Yjs collaboration primitives that power Actuate CMS' real-time editing surface (Phase 3 of the roadmap).
The package is intentionally narrow: it owns the wire protocol, the
room state machine, and the persistence contract. It knows nothing
about HTTP, WebSocket, or any specific server framework — those concerns
live in @actuate-media/cms-core (the /realtime/sync endpoint) and the
admin TipTap collaboration extension.
What you get
- Protocol layer (
./protocol). Pure encode/decode helpers for the binary envelope used byy-websocket. Read inbound frames withreadMessage; produce outbound ones withencodeSyncStep1,encodeSyncStep2,encodeUpdateMessage,encodeAwarenessUpdate,encodeQueryAwareness. The envelope is byte-compatible with the upstream y-websocket provider, so standard browser clients (including@tiptap/extension-collaboration's default transport) work without any custom wire shimming. - Room (
./room).createRoom({ documentId, initialState })returns aRoomthat owns aY.Docand anAwarenessinstance and routes bytes between attachedConnections. The transport layer creates oneConnectionper peer (a thin wrapper around a WebSocket / fetch stream / postMessage channel) and callsaddConnection,handleMessage, andremoveConnection. The room handles the sync handshake, broadcast fan-out, and awareness propagation. - Persistence (
./persistence). A narrowDocumentPersistenceinterface (load/save) plusbindPersistence(room, persistence), which debounces writes off the room'supdateevent. A reference in-memory implementation (createMemoryPersistence) ships in the box and is used by the test suite.
Quick start
import {
createRoom,
bindPersistence,
createMemoryPersistence,
type Connection,
} from '@actuate-media/realtime'
const persistence = createMemoryPersistence()
// Bootstrap a room with whatever state we have on disk.
const initialState = (await persistence.load('doc-1')) ?? undefined
const room = createRoom({ documentId: 'doc-1', initialState })
const { flush, unbind } = bindPersistence(room, persistence)
// In your WebSocket onConnection handler:
const conn: Connection = {
id: socketId,
send: (bytes) => socket.send(bytes),
close: () => socket.close(),
}
room.addConnection(conn)
socket.on('message', (bytes) => room.handleMessage(conn.id, bytes))
socket.on('close', () => room.removeConnection(conn.id))
// On graceful shutdown:
await flush()
unbind()
room.destroy()The room sends a SyncStep1 immediately on addConnection and an
awareness snapshot if any peers are already present, so the client only
needs to reply with its own state — the standard y-websocket handshake.
Persistence model
The companion Prisma model is named DocumentCRDT (see
packages/cms-core/prisma/cms-schema.prisma). The schema fragment lives
in cms-core so consumers wire it into a single migration. The shape:
| Field | Type | Purpose |
| ----------------- | -------- | ------------------------------------------------------ |
| documentId | String | PK — references Document.id at the app layer |
| state | Bytes | Y.encodeStateAsUpdate(doc) blob |
| stateVector | Bytes | Y.encodeStateVector(doc) — speeds up cold late joins |
| version | Int | Monotonic counter (optimistic locking) |
| lastUpdatedAt | DateTime | Wall-clock of most recent flush |
| lastUpdatedById | String? | Editor id if known |
| createdAt | DateTime | First-write timestamp |
@actuate-media/realtime is storage-agnostic — the cms-core layer
implements DocumentPersistence against Prisma, but you can plug
in S3, Redis, or anything else by implementing two methods.
Wire-protocol compatibility
The envelope follows the standard y-websocket layout:
[messageType: varUint][payload: bytes]
messageType 0 → MESSAGE_SYNC (y-protocols sync sub-message)
messageType 1 → MESSAGE_AWARENESS (varUint8Array awareness update)
messageType 3 → MESSAGE_QUERY_AWARENESS (no payload; reply with snapshot)Keeping this stable means any client that already speaks y-websocket
(e.g. browsers using @tiptap/extension-collaboration with the default
provider) can connect to the Actuate realtime endpoint without custom
client-side code.
Test coverage
42 unit tests across four files:
| File | Coverage |
| ------------------ | ----------------------------------------------------------------- |
| protocol.test | envelope shape, sync roundtrip, awareness, query, malformed input |
| room.test | handshake, two-client convergence, disconnect, awareness fan-out |
| persistence.test | debounce, flush, error handling, round trip with the memory impl |
| index.test | barrel exports, constant stability |
Run them with pnpm --filter @actuate-media/realtime test.
Status
- Slice 1 (this package, foundation) — ✅ shipped.
- Slice 2 (cms-core WebSocket gateway + Prisma adapter) — ✅ shipped.
See
@actuate-media/cms-core/realtimefor the consumer-facingcreateRealtimeGatewayandcreatePrismaDocumentPersistencehelpers. - Slice 3 (admin TipTap collaboration extension) — ✅ shipped.
See
@actuate-media/cms-admin—createCollaborationProvider,<PresenceChips />, and thecollaborationprop on<TipTapEditor />. - Slice 4 (
DocumentCommentmodel + REST endpoints) — ✅ shipped. See@actuate-media/cms-core/realtimefor the service helpers (createComment,listComments,resolveComment, …) and the HTTP endpoints under/api/cms/documents/:id/commentsand/api/cms/comments/:id. - Slice 5 (comments side panel + anchor binding in admin) — ✅ shipped.
See
@actuate-media/cms-admin— thecommentsprop on<TipTapEditor />, theCommentSidePanelcomponent, theCommentMarkTipTap mark, and the anchor helpers inlib/comment-anchor.ts. - Slice 6 (offline drafts + notifications) — ✅ shipped.
See
@actuate-media/cms-admin—<OfflineStatus />, theofflineflag oncreateCollaborationProvider, and<NotificationBell />. Server side:DocumentNotificationPrisma model + REST endpoints under/api/cms/notificationsin@actuate-media/cms-core.
Wiring the gateway (slice 2 cheat sheet)
The cms-core gateway is transport-agnostic — it takes a WebSocketLike
adapter so it works with ws, uWebSockets.js, Bun, Cloudflare Workers,
or anything else. A minimal ws integration looks like this:
import { createServer } from 'node:http'
import { WebSocketServer } from 'ws'
import {
createRealtimeGateway,
createPrismaDocumentPersistence,
type WebSocketLike,
} from '@actuate-media/cms-core/realtime'
const httpServer = createServer()
const wss = new WebSocketServer({ noServer: true })
const gateway = createRealtimeGateway({
persistence: createPrismaDocumentPersistence(prisma),
authenticate: async (req) => {
const session = await verifySessionFromHeaders(req.headers)
if (!session) return null
const url = new URL(`http://x${req.url}`)
return {
documentId: url.searchParams.get('documentId')!,
connectionId: req.headers['sec-websocket-key']!,
userId: session.userId,
}
},
})
httpServer.on('upgrade', (req, socket, head) => {
wss.handleUpgrade(req, socket, head, (ws) => {
const adapter: WebSocketLike = {
send: (d) => ws.send(d),
close: (code, reason) => ws.close(code, reason),
on: (event, handler) => {
if (event === 'message')
ws.on('message', (d: Buffer) => (handler as (b: Uint8Array) => void)(new Uint8Array(d)))
else if (event === 'close') ws.on('close', () => (handler as () => void)())
else if (event === 'error') ws.on('error', (e: Error) => (handler as (e: Error) => void)(e))
},
}
void gateway.handleConnection(adapter, {
url: req.url ?? '/',
headers: req.headers as Record<string, string | undefined>,
})
})
})
process.on('SIGTERM', () => {
void gateway.shutdown()
})The gateway handles per-document room creation, debounced persistence, idle reaping, and graceful shutdown automatically.
Wiring the editor (slice 3 cheat sheet)
The admin editor accepts an optional collaboration prop. When set, the
TipTapEditor swaps StarterKit's history for the Yjs-driven history,
mounts the collaboration + collaboration-cursor extensions, and renders
<PresenceChips /> above the toolbar.
import { TipTapEditor } from '@actuate-media/cms-admin/components/TipTapEditor'
;<TipTapEditor
content={initialHtml}
onChange={() => {
/* still fires for autosave; gateway is source of truth */
}}
collaboration={{
documentId: doc.id,
url: 'wss://your.app/api/cms/realtime/sync',
user: { id: currentUser.id, name: currentUser.name, color: '#22c55e' },
// Optional: extra query params (e.g. auth tokens for cross-origin deploys).
params: { token: previewToken },
}}
/>Behind the scenes:
createCollaborationProvider({ documentId, url, user })builds aY.Doc, opens aWebsocketProvider, and seedsAwareness.localStatewith the user info.- The editor wires
Collaboration.configure({ document })andCollaborationCursor.configure({ provider, user }), and disables StarterKit's history (Yjs owns undo/redo when collaboration is active). PresenceChipssubscribes to the awareness instance and renders an avatar strip with overflow + connection status.
The provider exposes status (connecting | connected | disconnected)
and lifecycle callbacks (onStatusChange, onError) so callers can
surface their own connection chrome if needed.
Comments REST API (slice 4 cheat sheet)
Comments live in a separate REST surface (not the Yjs wire) so plugins,
mobile clients, and notification workers can consume them without
speaking the CRDT protocol. The Prisma model is DocumentComment
(threaded, anchored, soft-deletable); the service helpers in
@actuate-media/cms-core/realtime own validation and permissions, and
the HTTP handlers in cms-core map those to status codes.
Endpoints (under /api/cms)
| Method | Path | Purpose | Auth |
| ------ | --------------------------------- | ---------------------------------------- | ------------------- |
| POST | /documents/:documentId/comments | Create a top-level comment or a reply | write role |
| GET | /documents/:documentId/comments | List active comments (filters available) | any auth |
| PATCH | /comments/:id | Edit a comment body | author or admin |
| POST | /comments/:id/resolve | Mark a thread as resolved | write role / author |
| POST | /comments/:id/reopen | Re-open a resolved thread | write role / author |
| DELETE | /comments/:id | Soft-delete a comment | author or admin |
GET supports ?includeResolved=true (anyone) and ?includeDeleted=true
(admin only). Responses are always envelope { data: CommentDTO } or
{ data: CommentDTO[] }; errors map cleanly: 400 (validation), 403
(forbidden), 404 (not found), 409 (conflict).
Anchor format
The anchor field is opaque to the API. The client encodes a pair of
Yjs relative positions with Y.encodeRelativePosition(...) and Base64s
each side:
import * as Y from 'yjs'
function makeAnchor(doc: Y.Doc, from: number, to: number) {
const type = doc.getXmlFragment('default') // or whichever ytype your editor binds to
const relFrom = Y.createRelativePositionFromTypeIndex(type, from)
const relTo = Y.createRelativePositionFromTypeIndex(type, to)
return {
from: Buffer.from(Y.encodeRelativePosition(relFrom)).toString('base64'),
to: Buffer.from(Y.encodeRelativePosition(relTo)).toString('base64'),
}
}Slice 5 reverses the operation in the comments side panel to paint live highlights that survive concurrent edits — see the next section.
Service layer (server-side)
Apps that need to manage comments outside the HTTP layer (e.g. a background worker that emits notifications on resolve) can call the service helpers directly:
import { createComment, resolveComment, type CommentsDB } from '@actuate-media/cms-core/realtime'
const db: CommentsDB = prisma // PrismaClient satisfies the narrow type
const created = await createComment(db, {
documentId: 'doc-1',
userId: 'user-1',
body: 'Looks great!',
})
if (!created.ok) throw new Error(created.error.message)
await resolveComment(db, created.value.id, {
userId: 'editor-1',
canResolve: true,
})The service is intentionally storage-agnostic — CommentsDB is the
exact subset of Prisma we use, so tests can substitute an in-memory
fake (see comments.test.ts).
Comments side panel (slice 5 cheat sheet)
Slice 5 layers a typed REST client, anchor helpers, a TipTap mark, and a side-panel React component on top of slice 4 so editors can comment on selections directly inside the admin.
import { TipTapEditor } from '@actuate-media/cms-admin/components/TipTapEditor'
;<TipTapEditor
content={initialHtml}
onChange={onAutoSave}
collaboration={{
documentId: doc.id,
url: 'wss://your.app/api/cms/realtime/sync',
user: { id: currentUser.id, name: currentUser.name, color: '#22c55e' },
}}
comments={{
documentId: doc.id,
currentUserId: currentUser.id,
isAdmin: currentUser.role === 'ADMIN',
onError: (msg) => toast.error(msg),
}}
/>The editor reacts to the comments prop by:
- Loading the
CommentMarkextension so saved comment ranges are highlighted in the document (yellow band on hover, dotted underline when resolved, outlined when the panel selects them). - Mounting
<CommentSidePanel />next to the editor pane. The panel talks to the slice-4 REST API throughlib/comments-client.tsand shows threads, replies, edit/resolve/delete affordances, and an includes-resolved toggle. - Encoding the active editor selection into a CRDT-relative anchor
when "Comment" is submitted, so the highlight survives concurrent
edits. Multi-paragraph selections fall back to a doc-level comment
(anchor
null).
Composing anchors manually
lib/comment-anchor.ts is the single place that interprets the wire
format. Consumers building their own UI can use it directly:
import * as Y from 'yjs'
import { encodeAnchor, decodeAnchor } from '@actuate-media/cms-admin/lib/comment-anchor'
const yText = paragraph.firstChild as Y.XmlText
const anchor = encodeAnchor({ doc, yType: yText, from: 6, to: 11 })
// → POST /documents/:id/comments { anchor }
// Later, after sync:
const resolved = decodeAnchor({ doc, anchor })
// → { yType, from, to } or null when the anchored text was deleted.Driving the panel from a custom shell
<CommentSidePanel /> is exported standalone — the editor wires it
through the comments prop for convenience, but a different host shell
(e.g. an inbox-style review queue) can render the same component:
import { CommentSidePanel } from '@actuate-media/cms-admin/components/CommentSidePanel'
;<CommentSidePanel
documentId={doc.id}
currentUserId={user.id}
isAdmin={user.role === 'ADMIN'}
onComposeAnchor={() => null /* doc-level only */}
onError={notify}
/>All actions (create, reply, edit, resolve / reopen, delete) flow
through the same cmsApi client that powers the rest of the admin, so
CSRF tokens, locale headers, and credential handling are inherited
without extra wiring.
Offline drafts + notifications (slice 6 cheat sheet)
Slice 6 closes Phase 3 with two ergonomics features that ride on top of slices 3-5: per-browser offline drafts via IndexedDB, and a top-bar notification bell that reacts to comment events.
Offline drafts
Opt-in via the new offline flag on createCollaborationProvider (or
the collaboration.offline shape on <TipTapEditor />):
import { createCollaborationProvider } from '@actuate-media/cms-admin/lib/collaboration-provider'
const collab = createCollaborationProvider({
documentId: doc.id,
url: 'wss://your.app/api/cms/realtime/sync',
user: { id: currentUser.id, name: currentUser.name, color: '#22c55e' },
offline: true,
onOfflineStatus: (status) => {
// 'unsupported' | 'loading' | 'ready' | 'error' | 'pending'
},
})The provider lazy-loads y-indexeddb (so the dependency stays out of
the bundle when offline drafts are disabled), seeds the local doc from
the IndexedDB snapshot before the socket opens, and writes every Yjs
update through to disk. On reconnect the y-websocket transport flushes
the pending operations and the in-memory doc stays authoritative.
<OfflineStatus connection={status} offline={offlineStatus} /> renders
the merged state as a single pill — "Saved & synced", "Saved
locally — reconnecting…", or "Local drafts failed". The editor mounts
it next to the presence chips automatically when collaboration is
set.
Notification bell
Server side, a new DocumentNotification Prisma model captures
per-user events (comment_reply, comment_resolved, comment_mention).
The comments lifecycle hooks fan out into notifications:
- Replying to a comment notifies the root author unless the replier authored the root.
- Resolving a comment notifies the root author unless the resolver authored the root.
@mentions(currently parsed from comment bodies as@user-id) emit acomment_mentionrow.
The REST surface (under /api/cms):
| Method | Path | Purpose | Auth |
| ------ | ----------------------------- | -------------------------- | -------- |
| GET | /notifications | List the caller's rows | any auth |
| GET | /notifications/unread-count | Cheap badge counter | any auth |
| POST | /notifications/:id/read | Mark one notification read | owner |
| POST | /notifications/read-all | Mark every unread row read | any auth |
Client side, drop <NotificationBell /> into the admin top bar:
import { NotificationBell } from '@actuate-media/cms-admin'
;<NotificationBell
onSelect={(notification) => navigateToDocument(notification.documentId)}
onError={(msg) => toast.error(msg)}
/>The bell polls /notifications/unread-count every 30s by default
(pollIntervalMs={0} disables polling) and re-fetches the full list
when the dropdown opens. Clicks are optimistically marked read with
automatic rollback on server failures.
