@degenerate/realtime
v0.7.1
Published
WebSocket transport for the ShareData Adaptive Framework Ecosystem (SAFE) — broadcasts ShareData lifecycle and transaction events to subscribed clients and accepts client-originated proposals.
Readme
@degenerate/realtime
WebSocket transport for the ShareData Adaptive Framework Ecosystem (SAFE). Broadcasts ShareData lifecycle and transaction events to subscribed clients and accepts client-originated proposals.
The server and client live in the same package because they share a wire-format contract (Zod schemas in messages/). Tree-shaking keeps the parts you don't import out of your bundle.
Install
npm install @degenerate/realtimeRequires Node ≥ 22. The browser-side client uses the global WebSocket constructor.
Quick start
A minimal loopback that exercises the full surface is in src/examples/loopback-demo.ts. After npm run build:
node packages/realtime/dist/examples/loopback-demo.jsServer
import { createServer } from 'http';
import { ShareDataManager } from '@degenerate/sharedata';
import {
attachRealtimeServer,
createIdBasedFunctionResolver,
} from '@degenerate/realtime';
const httpServer = createServer(/* your express app */);
const manager = new ShareDataManager();
const realtime = attachRealtimeServer({
server: httpServer,
manager,
authenticate: (req) => ({ subject: extractUserId(req) }),
resolveFunction: createIdBasedFunctionResolver(manager),
});
httpServer.listen(3000);
// On shutdown:
await realtime.close();Client
import { RealtimeClient } from '@degenerate/realtime';
const client = new RealtimeClient({ url: 'ws://localhost:3000/realtime' });
await client.connect();
const sub = client.subscribe('share-data-id');
sub.on('state', (f) => console.log('initial', f.currentState));
sub.on('transaction', (f) => console.log('tx', f.transactionId));
sub.on('lifecycle', (f) => console.log('lifecycle', f.event));
const result = await client.propose({
shareDataId: 'share-data-id',
functionId: 'my-function',
proposedData: { foo: 'bar' },
});Wire protocol
All messages are JSON with a { v: 1, type, ... } envelope. Schemas live in src/messages/.
Client → Server
| Type | Purpose | Response |
|---|---|---|
| subscribe | Begin receiving frames for a shareDataId. Optional since cursor replays history. | state + optional replayed transactions |
| unsubscribe | Stop receiving frames for a shareDataId. | (no response) |
| propose | Submit a proposedData mutation through a registered function. | proposal-result correlated by correlationId |
| ping | Round-trip check. | pong correlated by correlationId |
Server → Client
| Type | When emitted |
|---|---|
| state | Once per subscribe — initial snapshot with lastTransactionId cursor |
| transaction | Each manager.applyTransaction outcome (live or replayed). Live frames include currentState; replayed frames omit it and set replayed: true. |
| lifecycle | updated or removed — wholesale mutations that bypass the transaction flow. removed is terminal. |
| proposal-result | Direct ack to the propose issuer (matched by correlationId) |
| pong | Direct ack to a ping |
| error | Non-fatal protocol or authorization error |
Close codes
Defined in messages/close-codes.ts:
| Code | Constant | Meaning |
|---|---|---|
| 4001 | CLOSE_FELL_BEHIND | Send-buffer overflow — client too slow |
| 4002 | CLOSE_UNAUTHORIZED | Upgrade rejected / context absent |
| 4003 | CLOSE_UNKNOWN_SHARE_DATA | Subscribe targeted a non-existent shareDataId |
| 4004 | CLOSE_INVALID_MESSAGE | Could not parse the incoming frame |
Authentication & authorization
Three pluggable layers, all optional:
authenticate(req)— upgrade-time. Returns aTContextfor the rest of the connection's life, ornullto reject the upgrade. The context flows into every downstream authorizer.authorizeSubscribe(ctx, shareDataId)— gates eachsubscribe. Defaults to allow-all when omitted.authorizePropose(ctx, shareDataId, functionId)— gates eachpropose. Defaults to allow-all when omitted.
For JWT-backed auth that mirrors the HTTP validateJWTShareData flow, use the built-in bridge:
import {
createShareDataAuthenticate,
shareDataAudienceAuthorizer,
} from '@degenerate/realtime';
attachRealtimeServer({
// ...
authenticate: createShareDataAuthenticate({ authorities }),
authorizeSubscribe: shareDataAudienceAuthorizer,
});The bridge reads Authorization: Bearer <token> first, then falls back to ?token=... for browser clients (which cannot set custom WS headers).
Replay on reconnect
Subscriptions track lastTransactionId automatically. On reconnect the client resubscribes with since: lastTransactionId, and the server replays anything newer as transaction frames marked replayed: true.
Replayed frames do not advance the cursor — only live frames do. If the cursor predates the server's retained replay window, the server sends an error frame (code: 'replay-cursor-not-found') and skips replay; the client still has the current snapshot from the preceding state frame, so the worst-case outcome is "missed intermediate transactions" — never lost data.
The replay window is a per-shareDataId bounded buffer maintained inside the realtime package — deliberately separate from ShareData._transactionHistory, which remains unbounded and load-bearing for SAFE's cryptographic audit chain. Tune the window via:
attachRealtimeServer({
// ...
replayBufferSize: 500, // default 100
});For multi-instance deployments, swap in a custom ReplayStore (e.g. Redis-backed) and pass it via replayStore.
Backpressure & heartbeats
- Send-buffer cap:
maxBufferedBytes(default 1 MiB) onattachRealtimeServer. Exceeding it closes the connection with4001 (CLOSE_FELL_BEHIND)so a slow consumer cannot stall the broadcast loop. - Heartbeats: a 30s ping/pong (
heartbeat: { intervalMs }) detects half-open sockets and terminates them. - Client outbound queue:
propose/pingissued while the socket isn't open are queued (FIFO, bounded bymaxQueueSize, default 100) and flushed on the nextopen. Per-request timeouts honor call time, not send time.
Function resolution
propose carries a functionId string. The server resolves it to a runtime IShareFunction via the resolveFunction hook:
createIdBasedFunctionResolver(manager)— the conventional path: each function registered on a ShareData carries anidmatching the wire value. Linear search per ShareData.- Custom — pass any
FunctionResolverfor hash-based lookup, ACL-aware filtering, per-tenant function sets, etc. - Omit — disables
proposeentirely. Clients receiveproposal-result accepted:false error:"propose is not enabled...".
Functions without an id are skipped — keep them for purely-internal use that should not be reachable over the wire.
API reference
| Export | Purpose |
|---|---|
| attachRealtimeServer | Mount the WS transport on an existing http.Server |
| RealtimeClient | Browser+Node WebSocket client with auto-reconnect + resubscribe |
| Subscription | Per-shareDataId handle returned by client.subscribe() |
| createShareDataAuthenticate | JWT bridge to AuthorizedAuthorities |
| shareDataAudienceAuthorizer | Built-in subscribe authorizer for the SHAREDATA audience |
| createIdBasedFunctionResolver | Built-in FunctionResolver for the id-field convention |
| InMemoryChannelRegistry | Default channel registry; swap out for a Redis/etc. fan-out |
| InMemoryReplayStore | Default bounded per-shareDataId replay buffer; swap for a shared backend |
| AuthenticateUpgrade, AuthorizeSubscribe, AuthorizePropose, FunctionResolver, ReplayStore | Hook contracts |
| ClientMessage, ServerMessage types + schemas | Wire-format contracts |
| CLOSE_* | Close-code constants |
Tests
npm test --workspace=@degenerate/realtimeCoverage spans message schemas, the connection handler (subscribe/propose/replay), the channel registry, the authorities bridge, and the client (reconnect, queueing, correlation, subscription cursor). See src/**/__tests__/.
