@vuer-ai/vuer-rtc-server
v0.4.1
Published
Vuer RTC Server — RoomBroker, WebSocket transport, MongoDB persistence
Downloads
73
Keywords
Readme
@vuer-ai/vuer-rtc-server
Real-time collaborative library for vuer.ai. Server is built with MongoDB persistence and Fastify.
The CRDT logic is in @vuer-ai/vuer-rtc. This package provides server-specific functionality.
Setup
Prerequisites
MongoDB with replica set support (required for Prisma transactions).
MongoDB Setup with Docker
Start MongoDB and Redis containers:
cd ../../docker docker compose up -dThis starts:
- MongoDB 7 with replica set
rs0on port 27017 - Redis 7 on port 6379
- MongoDB 7 with replica set
Verify replica set is initialized:
docker exec vuer-rtc-mongo mongosh --quiet --eval "rs.status().set"Should output:
rs0Configure database connection:
Create
.envfile inpackages/vuer-rtc-server/:DATABASE_URL="mongodb://localhost:27017/vuer?replicaSet=rs0"Push Prisma schema to MongoDB:
cd packages/vuer-rtc-server npx prisma db pushRun tests:
pnpm test
Stopping Services
cd ../../docker
docker compose down # Stop containers (keep data)
docker compose down -v # Stop containers and remove volumes (delete all data)Note for macOS users: If you have MongoDB installed via Homebrew, stop it first to avoid port conflicts:
brew services stop mongodb-communityHow CRDT Works
The Core Concept
CRDT (Conflict-free Replicated Data Types) allows multiple users to edit the same data concurrently without conflicts. The key insight: instead of locking or resolving conflicts, design operations that always merge correctly.
Architecture
┌─────────────┐ CRDTMessage ┌─────────────┐
│ Client A │ ──────────────────▶ │ Server │
│ (Alice) │ │ │
└─────────────┘ │ 1. Append │
│ to │
┌─────────────┐ CRDTMessage │ Journal │
│ Client B │ ──────────────────▶ │ │
│ (Bob) │ │ 2. Apply │
└─────────────┘ │ to │
│ Graph │
└─────────────┘
│
▼
Broadcast to allState Model
The server maintains two data structures:
interface ServerState {
graph: SceneGraph; // Materialized view (fast reads)
journal: CRDTMessage[]; // Append-only log (source of truth)
snapshot?: Snapshot; // Periodic checkpoint (optional)
}| Component | Purpose | Persistence |
|-----------|---------|-------------|
| graph | Current state for queries | In-memory (rebuilt from journal) |
| journal | Complete history of all messages | MongoDB / disk |
| snapshot | Checkpoint to speed up recovery | MongoDB / disk |
Why both?
graph= fast reads, but lost on restartjournal= durable, enables replay, sync, and debugging
Key Components
CRDTMessage (Envelope) - Contains metadata for ordering:
lamportTime- Logical clock for total ordering (LWW)clock- Vector clock for causal orderingsessionId- Who sent this message
Operations - Explicit
otypedetermines merge behavior:vector3.set→ Last-Write-Wins (absolute)vector3.add→ Sum values (relative/additive)number.add→ Sum values (counters)
Why Explicit otype Matters
Scenario: Two users drag the same object simultaneously
❌ Without explicit otype (ambiguous):
Alice: position = [5, 0, 0] // Did she SET or ADD?
Bob: position = [0, 3, 0] // Did he SET or ADD?
// Result: ??? (unpredictable)✅ With explicit otype (unambiguous):
Alice: { otype: 'vector3.add', value: [5, 0, 0] } // += [5,0,0]
Bob: { otype: 'vector3.add', value: [0, 3, 0] } // += [0,3,0]
// Result: position += [5, 3, 0] ✅ (both movements apply)Merge Rules by otype
| otype | Merge | Use Case |
|-------|-------|----------|
| *.set | Last-Write-Wins (higher lamport wins) | Absolute values |
| *.add | Sum all values | Counters, drag deltas |
| *.multiply | Product | Scale gestures |
| array.push | Append all | Adding children |
| array.remove | Remove from all | Removing children |
Journal Lifecycle
The journal is the source of truth. The graph is just a materialized view.
Message Flow
Client sends CRDTMessage
│
▼
┌─────────────────────────┐
│ 1. Deduplicate │ ← Skip if msg.id already in journal
│ 2. Append to journal │ ← Persist to MongoDB
│ 3. Apply to graph │ ← Update in-memory state
│ 4. Broadcast │ ← Send to other clients
└─────────────────────────┘Idempotency
Critical: Not all operations are idempotent on replay!
| Operation Type | Idempotent? | Why |
|---------------|-------------|-----|
| *.set (LWW) | ✅ Yes | Compares lamportTime, same result on replay |
| *.add | ❌ No | Blindly adds value, doubles on replay |
| *.multiply | ❌ No | Blindly multiplies, compounds on replay |
| node.insert | ✅ Yes | Checks if node exists |
Solution: Track applied message IDs per node to skip duplicates:
interface SceneNode {
// ... existing fields
appliedMsgIds: Set<string>; // Track which messages contributed
}
function applyOperation(node, op, meta) {
if (node.appliedMsgIds.has(meta.messageId)) {
return; // Already applied, skip
}
node.appliedMsgIds.add(meta.messageId);
// ... apply operation
}Recovery Flow
When server restarts:
1. Load snapshot from DB (if exists)
└─ snapshot = { graph, journalIndex }
2. Load journal entries after snapshot
└─ journal.slice(snapshot.journalIndex)
3. Replay journal onto snapshot.graph
└─ for (msg of journal) graph = applyMessage(graph, msg)
4. Server readyClient Sync Flow
When a new client connects:
┌────────────┐ ┌────────────┐
│ New Client │ │ Server │
└─────┬──────┘ └─────┬──────┘
│ │
│ 1. Connect │
│──────────────────────────────────▶│
│ │
│ 2. Send current graph │
│◀──────────────────────────────────│
│ (or snapshot + journal tail) │
│ │
│ 3. Client applies, catches up │
│ │
│ 4. Subscribe to live updates │
│◀─────────────────────────────────▶│
│ │Option A: Send full graph (simple, but large)
Option B: Send snapshot + journalTail (smaller, incremental)
Compaction
The journal grows unbounded. Periodically compact:
Before compaction:
journal: [msg1, msg2, msg3, ..., msg1000]
snapshot: null
After compaction:
journal: [msg901, msg902, ..., msg1000] ← Keep recent
snapshot: { graph: <state at msg900>, journalIndex: 900 }When to compact:
- Journal exceeds N messages (e.g., 1000)
- Periodic timer (e.g., every hour)
- On graceful shutdown
Usage
import { createEmptyGraph, applyMessage } from '@vuer-rtc/server/operations';
import type { CRDTMessage } from '@vuer-rtc/server/operations';
// Create empty scene
let graph = createEmptyGraph();
// Apply a message with operations
const msg: CRDTMessage = {
id: 'msg-001',
sessionId: 'alice',
clock: { alice: 1 },
lamportTime: 1,
timestamp: Date.now(),
ops: [
// Create scene root
{
key: 'scene',
otype: 'node.insert',
path: 'scene',
value: { key: 'uuid-scene', tag: 'Scene', name: 'My Scene' },
},
// Create cube with parent (automatically adds to parent's children)
{
key: 'cube-1',
otype: 'node.insert',
path: 'cube-1',
parent: 'scene', // Automatically adds to scene's children
value: {
key: 'uuid-001',
tag: 'Mesh',
name: 'Red Cube',
color: '#ff0000',
'transform.position': [0, 0, 0],
},
},
],
};
graph = applyMessage(graph, msg);CRDTMessage Structure
interface CRDTMessage {
id: string; // Message ID
sessionId: string; // Who sent this
clock: VectorClock; // For causal ordering
lamportTime: number; // For total ordering (LWW)
timestamp: number; // Wall-clock time
ops: Operation[]; // Batch of operations
}Operation Types
Node Operations
| otype | Description |
|-------|-------------|
| node.insert | Create new node (idempotent). Use parent field to auto-add to parent's children. |
| node.remove | Delete node (tombstone) |
Number Operations
| otype | Merge | Example |
|-------|-------|---------|
| number.set | LWW | opacity = 0.5 |
| number.add | Sum | score += 10 |
| number.multiply | Product | scale *= 2 |
| number.min | Minimum | min(current, new) |
| number.max | Maximum | max(current, new) |
Vector3 Operations
| otype | Merge | Example |
|-------|-------|---------|
| vector3.set | LWW | position = [0, 5, 0] |
| vector3.add | Component sum | position += [5, 0, 0] |
| vector3.multiply | Component product | scale *= [2, 2, 2] |
Array Operations
| otype | Merge | Example |
|-------|-------|---------|
| array.set | LWW | children = ['a', 'b'] |
| array.push | Append | children.push('c') |
| array.remove | Remove | children.remove('a') |
| array.union | Union | Merge sets |
Other Operations
| otype | Merge | Example |
|-------|-------|---------|
| color.set | LWW | color = '#ff0000' |
| string.set | LWW | name = 'Cube' |
| boolean.set | LWW | visible = true |
| quaternion.set | LWW | rotation = [0, 0, 0, 1] |
Additive vs LWW
Additive Operations (*.add)
Order doesn't matter, values accumulate:
Alice: position += [5, 0, 0]
Bob: position += [0, 3, 0]
Result: position += [5, 3, 0] ✅LWW Operations (*.set)
Higher lamportTime wins:
Alice: color = red (lamport: 10)
Bob: color = blue (lamport: 11)
Result: color = blue ✅ (Bob's lamport is higher)Client Reconciliation
When a client receives a message from the server, it applies it using the same applyMessage function:
1. Client makes local edit → applies locally (optimistic)
2. Client sends to server → server applies & broadcasts
3. Client receives broadcast → applies with deduplicationImportant: Clients must track applied message IDs to avoid double-applying their own messages:
// Client-side state
const appliedMsgIds = new Set<string>();
function onServerMessage(msg: CRDTMessage) {
if (appliedMsgIds.has(msg.id)) {
return; // Already applied locally, skip
}
appliedMsgIds.add(msg.id);
graph = applyMessage(graph, msg);
}For LWW operations: If the server's lamport time is higher, the server value wins:
// Client has: color = '#ff0000' (lamport: 5)
// Server sends: color = '#0000ff' (lamport: 8)
graph = applyMessage(graph, serverMsg); // color becomes '#0000ff'For additive operations: Each unique message applies once:
// Client applied locally: position += [5, 0, 0] (msg-alice-1)
// Server broadcasts same message back
// Client skips (already in appliedMsgIds)
// Server sends Bob's edit: position += [0, 3, 0] (msg-bob-1)
// Client applies (new message ID)
graph = applyMessage(graph, serverMsg); // position += [0, 3, 0]Key insight: Convergence requires both CRDT merge rules AND message deduplication. Without deduplication, additive operations would double-apply.
Conflict Resolution Example
import { createEmptyGraph, applyMessage } from '@vuer-rtc/server/operations';
// Setup: cube at position [0, 0, 0]
let graph = applyMessage(createEmptyGraph(), {
id: 'setup', sessionId: 'server', clock: { server: 1 }, lamportTime: 0, timestamp: Date.now(),
ops: [{ key: 'cube', otype: 'node.insert', path: 'cube', value: { key: 'uuid', tag: 'Mesh', name: 'Cube', 'position': [0, 0, 0], color: '#fff' }}],
});
// Alice drags right, Bob drags up (concurrent - both use additive)
graph = applyMessage(graph, {
id: 'alice', sessionId: 'alice', clock: { alice: 1 }, lamportTime: 1, timestamp: Date.now(),
ops: [{ key: 'cube', otype: 'vector3.add', path: 'position', value: [5, 0, 0] }],
});
graph = applyMessage(graph, {
id: 'bob', sessionId: 'bob', clock: { bob: 1 }, lamportTime: 2, timestamp: Date.now(),
ops: [{ key: 'cube', otype: 'vector3.add', path: 'position', value: [0, 3, 0] }],
});
console.log(graph.nodes['cube'].position); // [5, 3, 0] - both movements applied!
// Bob sets blue (lamport 11), then Alice's earlier edit arrives (lamport 10)
graph = applyMessage(graph, {
id: 'bob-color', sessionId: 'bob', clock: { bob: 2 }, lamportTime: 11, timestamp: Date.now(),
ops: [{ key: 'cube', otype: 'color.set', path: 'color', value: '#0000ff' }],
});
graph = applyMessage(graph, {
id: 'alice-color', sessionId: 'alice', clock: { alice: 2 }, lamportTime: 10, timestamp: Date.now(),
ops: [{ key: 'cube', otype: 'color.set', path: 'color', value: '#ff0000' }],
});
console.log(graph.nodes['cube'].color); // '#0000ff' - Bob still wins (lamport 11 > 10)Examples
See the examples/ folder for runnable examples:
npx tsx examples/01-basic-usage.ts
npx tsx examples/02-concurrent-edits.ts
npx tsx examples/03-scene-building.ts
npx tsx examples/04-conflict-resolution.tsProject Structure
src/operations/
├── OperationTypes.ts # Type definitions
├── dispatcher.ts # applyMessage(), applyMessages()
├── apply/
│ ├── index.ts # Registry exports
│ ├── types.ts # OpMeta, ApplyFn
│ ├── number.ts # NumberSet, NumberAdd, ...
│ ├── vector3.ts # Vector3Set, Vector3Add, ...
│ ├── array.ts # ArraySet, ArrayPush, ...
│ ├── node.ts # NodeInsert, NodeRemove
│ └── ...