@savanapoint/zero-pub-sub
v0.4.0
Published
Realtime Engine: multi-provider event streaming with WebSocket gateway, replay, ack and ordering
Maintainers
Readme
@savanapoint/zero-pub-sub
Realtime Engine for application events: publish once, deliver over WebSocket, replay missed events, track acknowledgements, and keep provider details out of frontend apps.
It is domain-agnostic. The library does not know what an order, chat, driver, vendor or payment is. Your application owns the room names, event types, payloads and authorization rules.
Contents
- Install
- Quick Start
- Mental Model
- Backend API
- WebSocket Gateway
- Frontend Client
- Providers
- Hybrid Mode
- Advanced Features
- Security
- Production Checklist
- Troubleshooting
- Validation
Install
Install the library:
npm install @savanapoint/zero-pub-subDatabase drivers for MongoDB, PostgreSQL and Redis are bundled with the server
entrypoint so URI setup works out of the box. Install ws only when running the
WebSocket Gateway in Node/Bun:
npm install wsThe /client entrypoint remains browser-safe and does not import the server
factory or database transports.
Database URLs and database clients must stay on the backend. Frontend and mobile apps should connect only to your WebSocket Gateway.
Entry Points
Use explicit entrypoints in application code:
// Backend only: database connections, transports and WebSocket Gateway.
import { connectRealtime, createWebSocketGateway } from '@savanapoint/zero-pub-sub/server';
// Frontend/mobile only: WebSocket client, room helpers and shared event types.
import { createRealtimeClient, room } from '@savanapoint/zero-pub-sub/client';The package root remains available for backwards compatibility, but new code
should prefer /server and /client. This keeps frontend bundles away from
backend-only APIs and makes accidental database usage in client apps obvious.
Quick Start
MongoDB
import { connectRealtime } from '@savanapoint/zero-pub-sub/server';
import { room } from '@savanapoint/zero-pub-sub/client';
const realtime = await connectRealtime({
provider: 'mongo',
connection: process.env.MONGO_URL!,
});
await realtime.publish({
room: room('chat', '123'),
type: 'message.created',
payload: {
id: 'msg_1',
text: 'Hello',
senderId: 'user_1',
},
});PostgreSQL
import { connectRealtime } from '@savanapoint/zero-pub-sub/server';
const realtime = await connectRealtime({
provider: 'postgres',
connection: process.env.DATABASE_URL!,
});Redis
import { connectRealtime } from '@savanapoint/zero-pub-sub/server';
const realtime = await connectRealtime({
provider: 'redis',
connection: process.env.REDIS_URL!,
});Advanced Features
Snapshots
Use snapshots when a room has a large event history and clients should start
from consolidated state instead of replaying everything from sequence 0.
await realtime.snapshot('chat:123', {
lastSequence: 100000,
state: {
messages: recentMessages,
participants,
},
});On the backend:
const snapshot = await realtime.getSnapshot('chat:123');
for await (const event of realtime.streamReplay({
room: 'chat:123',
fromSequence: snapshot?.lastSequence ?? 0,
batchSize: 1000,
})) {
await applyIncrementalEvent(event);
}On the frontend:
client.subscribe(
'chat:123',
(event) => applyIncrementalEvent(event),
{
sync: 'snapshot',
onSnapshot(snapshot) {
hydrateRoom(snapshot?.state);
},
},
);Flow:
snapshot -> replay after snapshot.lastSequence -> live streamMongo URI setup creates durable realtime_snapshots and realtime_dlq
collections automatically. Other providers expose the same API through the
shared transport layer.
Retry and DLQ
Retries are configured per subscription:
realtime.subscribe(
{
room: 'chat:123',
retry: {
attempts: 5,
strategy: 'exponential',
baseDelayMs: 100,
},
},
async (event) => {
await handleEvent(event);
},
);If the handler keeps failing after all attempts, the event is moved to DLQ:
const failed = await realtime.dlq('chat:123').list({ limit: 100 });
for await (const event of realtime.dlq('chat:123').replay({
limit: 100,
})) {
await repair(event);
}Supported retry strategies:
'fixed' | 'linear' | 'exponential'Flow Control
Use flow control when consumers can be slower than producers:
realtime.subscribe(
{
room: 'chat:123',
flowControl: {
maxInFlight: 100,
strategy: 'buffer',
maxBufferSize: 1000,
},
},
async (event) => {
await handleEvent(event);
},
);Strategies:
pause: rejects delivery when the handler backlog is full.drop: drops events when the listener is saturated.buffer: buffers up tomaxBufferSize.
Streaming Replay
streamReplay() is an alias for paginated catch-up and is the API to use for
large operational drains:
for await (const event of realtime.streamReplay({
room: 'chat:123',
fromSequence: 0,
batchSize: 1000,
})) {
await processEvent(event);
}It does not load the full backlog into memory.
Ephemeral Events
Ephemeral events are fanout-only. They are not persisted and do not participate in replay or ack.
client.publishEphemeral({
room: 'chat:123',
type: 'typing',
payload: {
userId: 'u1',
},
});
client.subscribeEphemeral('chat:123', (event) => {
showTyping(event.payload);
}, {
eventTypes: ['typing'],
});Use ephemeral events for typing indicators, cursor movement and transient UI state.
Presence
Presence is built on ephemeral events:
const presence = client.presence('chat:123');
presence.enter({
userId: 'u1',
metadata: {
name: 'Francisco',
},
});
presence.subscribe((users) => {
console.log(users);
});
presence.leave();Presence users have this shape:
type PresenceUser = {
userId: string;
metadata?: Record<string, unknown>;
lastSeenAt: string;
};Multi-Room Subscribe
Frontend clients can subscribe to multiple rooms with one call:
client.subscribe(['chat:1', 'chat:2'], (event) => {
console.log(event.room, event.payload);
});Wildcard room subscriptions support scoped patterns:
client.subscribe('chat:*', (event) => {
console.log(event.room, event.payload);
});Rules:
- Only
scope:*patterns are supported. - Global
*is not supported. - Auth still runs against the wildcard room string.
- Backend code should only allow wildcard scopes that are safe for that user or tenant.
E2EE Payloads
The library supports encrypted payload markers. Encryption itself should happen in your client or application layer before publishing:
client.publish({
room: 'chat:123',
type: 'message.created',
encrypted: true,
payload: encryptedPayload,
});The server stores and routes the payload without needing to understand it.
Middleware
Middleware lets backend code intercept, modify, validate, block or observe realtime lifecycle operations. It runs server-side only.
Configure middleware at construction time:
const realtime = await connectRealtime({
provider: 'postgres',
connection: process.env.DATABASE_URL!,
middleware: [
async (ctx, next) => {
console.log('before', ctx.action);
await next();
console.log('after', ctx.action);
},
],
});Or register later:
realtime.use(async (ctx, next) => {
if (ctx.action === 'publish' && ctx.event) {
ctx.event.metadata = {
...ctx.event.metadata,
traceId: crypto.randomUUID(),
};
}
await next();
});Execution order:
operation -> middleware 1 -> middleware 2 -> core -> middleware 2 -> middleware 1Supported actions include:
'publish'
'subscribe'
'deliver'
'replay'
'catchUp'
'ack'
'snapshot'
'getSnapshot'
'ephemeralPublish'
'ephemeralDeliver'
'presenceEnter'
'presenceLeave'
'dlqWrite'
'dlqReplay'
'close'Example: block large payloads:
realtime.use(async (ctx, next) => {
if (ctx.action === 'publish') {
const size = Buffer.byteLength(JSON.stringify(ctx.event?.payload));
if (size > 64 * 1024) {
throw new Error('Payload too large');
}
}
await next();
});Official helpers are available from /server:
import {
loggerMiddleware,
metricsMiddleware,
payloadSizeMiddleware,
tenantMiddleware,
} from '@savanapoint/zero-pub-sub/server';Metrics
snapshotMetrics() includes advanced counters:
{
retryCount: 3,
dlqSize: 1,
snapshotUsage: 4,
replayLatencyMs: 12,
}Recipes
Chat
Use durable messages plus ephemeral typing and presence:
await realtime.publish({
room: 'chat:123',
type: 'message.created',
payload: { text: 'hello', senderId: 'u1' },
});
client.publishEphemeral({
room: 'chat:123',
type: 'typing',
payload: { userId: 'u1' },
});
client.presence('chat:123').enter({
userId: 'u1',
metadata: { name: 'Francisco' },
});Notifications
Use one room per user:
await realtime.publish({
room: 'notifications:user_123',
type: 'notification.created',
payload: { title: 'Payment received' },
});Banking Events
Use idempotent event IDs, ack after processing and DLQ for failed consumers:
await realtime.publish({
id: 'txn_9843A9XK21:created',
room: 'bank:transactions',
type: 'transaction.created',
payload: transaction,
});Workers
Use streamReplay() for large recovery jobs:
for await (const event of realtime.streamReplay({
room: 'bank:transactions',
fromSequence: 0,
batchSize: 1000,
})) {
await processEvent(event);
await realtime.ack(event.room, event.sequence, 'settlement-worker-1');
}Multi-Tenant
Put the tenant in metadata and enforce it with middleware:
realtime.use(async (ctx, next) => {
if (ctx.action === 'publish' && !ctx.event?.metadata?.tenantId) {
throw new Error('Missing tenantId');
}
await next();
});Load Testing
Run the default in-memory load test:
npm run load:testRun a 1M-event profile:
REALTIME_LOAD_ROOMS=100 \
REALTIME_LOAD_SUBSCRIBERS_PER_ROOM=1 \
REALTIME_LOAD_TARGET_EVENTS=1000000 \
npm run load:testSubscribe on the Backend
const unsubscribe = realtime.subscribe(
{
room: 'chat:123',
subscriberId: 'worker-1',
},
async (event) => {
console.log(event.sequence, event.type, event.payload);
await realtime.ack(event.room, event.sequence, 'worker-1');
},
);When subscriberId is provided, the transport reads the last acked sequence for
that subscriber and replays pending events in pages before continuing with live
events. Use fromSequence only when you want to override that cursor manually.
For large backlogs, tune the page size instead of loading everything at once:
realtime.subscribe(
{
room: 'chat:123',
subscriberId: 'worker-1',
catchUp: {
batchSize: 1000,
},
},
async (event) => {
await processEvent(event);
await realtime.ack(event.room, event.sequence, 'worker-1');
},
);Close Connections
await unsubscribe();
await realtime.close();When you use connectRealtime, the library owns the clients it creates and
closes them from realtime.close() by default.
Mental Model
Rooms
A room is the delivery key. It always uses:
scope:resourceIdExamples:
import { buildRoom, room } from '@savanapoint/zero-pub-sub/client';
room('chat', '123'); // chat:123
room('order', '456'); // order:456
buildRoom('payment', '789'); // payment:789The scope must match:
[A-Za-z0-9][A-Za-z0-9._-]{0,63}The resource id can contain additional : characters, but it cannot be empty.
Events
An event is an append-only realtime message.
await realtime.publish({
room: 'order:order_123',
type: 'order.status.changed',
payload: {
status: 'ready',
},
});The stored envelope has this shape:
type RealtimeEnvelope<TPayload = unknown> = {
id: string;
type: string;
room: string;
sequence: number;
emittedAt: string;
expiresAt: string;
payload: TPayload;
metadata?: {
provider?: 'postgres' | 'mongo' | 'redis';
producer?: string;
correlationId?: string;
traceId?: string;
tenantId?: string;
[key: string]: unknown;
};
};Sequences
Sequences are per room. A client receiving chat:123 expects:
1, 2, 3, 4...If it sees a gap, the frontend client asks the gateway for replay.
Replay
Replay fetches old events for a room:
const { events } = await realtime.replay({
room: 'chat:123',
fromSequence: 25,
limit: 100,
});Only events with sequence > fromSequence are returned.
Replay is a single page. Use catchUp() when you want to drain a large backlog
page by page.
for await (const event of realtime.catchUp({
room: 'chat:123',
subscriberId: 'worker-1',
batchSize: 1000,
})) {
await processEvent(event);
await realtime.ack(event.room, event.sequence, 'worker-1');
}Ack
Ack records the last processed sequence per subscriber and room:
await realtime.ack('chat:123', 26, 'device-1');Backend API
connectRealtime(options)
Use this for the best developer experience. It accepts connection strings, creates provider clients internally, applies sensible defaults, and owns client lifecycle.
const realtime = await connectRealtime({
provider: 'mongo',
connection: process.env.MONGO_URL!,
});Supported providers:
type Provider = 'mongo' | 'postgres' | 'redis' | 'hybrid';createRealtime(options)
Use this when your app already owns connected database clients.
const realtime = createRealtime({
provider: 'postgres',
connection: {
client: pgPool,
listenerClient: pgListenerClient,
},
});publish(event)
const envelope = await realtime.publish({
id: 'evt_optional_idempotency_key',
room: 'chat:123',
type: 'message.created',
ttlMs: 24 * 60 * 60 * 1000,
payload: {
text: 'Hello',
},
metadata: {
producer: 'api',
correlationId: 'req_123',
},
});If id is provided, transports use it as an idempotency key where supported.
subscribe(options, handler)
const unsubscribe = realtime.subscribe(
{
room: 'chat:123',
subscriberId: 'worker-1',
eventTypes: ['message.created'],
limit: 100,
},
async (event) => {
console.log(event.payload);
},
);Subscribe catch-up behavior:
- If
fromSequenceis provided, replay starts after that sequence. - If
fromSequenceis omitted andsubscriberIdis provided, replay starts after the subscriber's last acked sequence. - If neither is available, replay starts after sequence
0. - Catch-up is paginated.
limitis treated as the default page size for backwards compatibility. - Use
catchUp: { batchSize: 1000 }to control page size. - Use
catchUp: { maxBatches: 10 }to cap one subscription's startup work. - Use
catchUp: falsefor live-only subscriptions. - After catch-up, the same subscription continues receiving live events.
For worker-style consumers that must process a very large backlog before going live, prefer the explicit iterator:
for await (const event of realtime.catchUp({
room: 'bank:transactions',
subscriberId: 'settlement-worker-1',
eventTypes: ['transaction.created'],
batchSize: 1000,
})) {
await settleTransaction(event.payload);
await realtime.ack(event.room, event.sequence, 'settlement-worker-1');
}catchUp() never stores the full backlog in memory. It calls replay() with a
bounded limit, advances the cursor to the last delivered sequence, yields to
the event loop between pages, and continues until there are no more events.
replay(options)
const result = await realtime.replay({
room: 'chat:123',
fromSequence: 100,
toSequence: 150,
eventTypes: ['message.created'],
limit: 100,
});catchUp(options)
for await (const event of realtime.catchUp({
room: 'chat:123',
subscriberId: 'worker-1',
fromSequence: 100,
toSequence: 10000,
eventTypes: ['message.created'],
batchSize: 1000,
maxBatches: 100,
})) {
await handle(event);
}Options:
subscriberId: uses the stored ack cursor whenfromSequenceis omitted.fromSequence: overrides the stored cursor.toSequence: stops after this sequence.eventTypes: filters by event type.batchSize: page size; defaults to500and is capped at10000.maxBatches: optional circuit breaker for a single drain run.
This is the recommended API for backfills, worker recovery and operational jobs where millions of messages may be pending.
ack(room, sequence, subscriberId)
await realtime.ack('chat:123', 101, 'device-1');health()
const health = await realtime.health?.();
console.log(health);Example response:
{
provider: 'postgres',
status: 'healthy',
details: {
listenerReady: true,
activeRooms: 2,
activeListeners: 5,
},
}snapshotMetrics()
const metrics = realtime.snapshotMetrics?.();Metrics include:
{
published: 10,
received: 10,
acked: 8,
gapsDetected: 0,
errors: 0,
replayed: 2,
duplicatesDropped: 1,
activeRooms: 1,
activeListeners: 3,
averageDeliveryLagMs: 12,
}close()
await realtime.close();Always call this during graceful shutdown.
WebSocket Gateway
The gateway is the recommended frontend entrypoint. It keeps provider clients, database credentials and direct database listeners out of frontend apps.
import {
connectRealtime,
createWebSocketGateway,
} from '@savanapoint/zero-pub-sub/server';
const transport = await connectRealtime({
provider: 'postgres',
connection: process.env.DATABASE_URL!,
});
const { gateway, server } = createWebSocketGateway({
port: 8080,
transport,
limits: {
maxConnections: 10000,
maxSubscriptions: 128,
maxPayloadBytes: 64 * 1024,
maxMessagesPerWindow: 100,
rateLimitWindowMs: 1000,
initTimeoutMs: 10000,
heartbeatIntervalMs: 30000,
heartbeatTimeoutMs: 10000,
maxSocketBufferedBytes: 1024 * 1024,
allowClientPublish: false,
},
auth: {
scopes: ['chat', 'order'],
async jwt(token) {
return verifyJwt(token);
},
async authorize(ctx) {
return canAccessRoom(ctx.claims, ctx.room, ctx.action);
},
},
});server is null when the optional ws package is not installed and no
custom WebSocketServer constructor is passed.
Attach to an Existing HTTP Server
import http from 'node:http';
import { createWebSocketGateway } from '@savanapoint/zero-pub-sub/server';
const httpServer = http.createServer(app);
createWebSocketGateway({
server: httpServer,
transport,
});
httpServer.listen(8080);Gateway Health
const health = await gateway.health();Example:
{
status: 'healthy',
connections: 42,
transport: {
provider: 'redis',
status: 'healthy',
},
metrics: {
published: 120,
received: 118,
},
}Gateway Shutdown
await gateway.close();This closes active WebSocket connections and then closes the transport.
One-Call Gateway
Use serveRealtime() when you want the library to own provider connection and
WebSocket gateway setup in one call:
import { serveRealtime } from '@savanapoint/zero-pub-sub/server';
const app = await serveRealtime({
provider: 'redis',
connection: process.env.REDIS_URL!,
websocket: {
port: 8080,
},
});
process.on('SIGTERM', () => {
void app.close();
});The application still runs the process, but it does not implement WebSocket protocol, fanout, replay, presence or ack handling.
CLI Gateway
Run a ready-made gateway without writing a server file:
npx zero-pub-sub gateway \
--provider mongo \
--connection "$MONGO_URL" \
--port 8080Environment-variable form:
ZERO_PUBSUB_PROVIDER=redis \
ZERO_PUBSUB_CONNECTION="$REDIS_URL" \
PORT=8080 \
npx zero-pub-sub gatewayHosted Cloud Mode
For hosted/SaaS deployments, use the cloud client contract:
import { createRealtimeCloudClient } from '@savanapoint/zero-pub-sub/client';
const client = createRealtimeCloudClient({
appId: 'app_123',
apiKey: 'public_key',
subscriberId: 'device-1',
});Backend publishing through a compatible cloud endpoint:
import { connectRealtimeCloud } from '@savanapoint/zero-pub-sub/server';
const realtime = connectRealtimeCloud({
endpoint: 'https://realtime.example.com',
secretKey: process.env.ZERO_SECRET_KEY!,
});Wire Protocol
Client to server:
{ "type": "init", "subscriberId": "device-1", "token": "jwt" }
{ "type": "subscribe", "room": "chat:123", "eventTypes": ["message.created"], "fromSequence": 0 }
{ "type": "unsubscribe", "room": "chat:123" }
{ "type": "publish", "event": { "type": "message.created", "room": "chat:123", "payload": { "text": "Hello" } } }
{ "type": "ack", "room": "chat:123", "sequence": 10 }
{ "type": "replay_request", "room": "chat:123", "fromSequence": 10, "limit": 100 }Server to client:
{ "type": "ready", "subscriberId": "device-1" }
{ "type": "subscribed", "room": "chat:123" }
{ "type": "event", "data": { "id": "evt_1", "room": "chat:123", "sequence": 1, "type": "message.created", "payload": {} } }
{ "type": "replay", "room": "chat:123", "events": [] }
{ "type": "ack", "room": "chat:123", "sequence": 1 }
{ "type": "unsubscribed", "room": "chat:123" }
{ "type": "error", "message": "Access denied for room: chat:123", "code": "4408" }Frontend Client
Use createRealtimeClient from frontend or mobile code. Do not pass database
URLs to frontend apps.
import {
createRealtimeClient,
room,
} from '@savanapoint/zero-pub-sub/client';
const client = createRealtimeClient({
url: 'wss://realtime.example.com',
subscriberId: 'device-1',
authToken: jwt,
reconnect: true,
reconnectDelayMs: 1000,
onError(error) {
console.error(error);
},
});
client.connect();
const unsubscribe = client.subscribe(
room('chat', '123'),
(event) => {
console.log(event.sequence, event.payload);
},
{
eventTypes: ['message.created'],
fromSequence: 0,
},
);Publish from the Frontend
Frontend publishing is supported by the protocol, but production apps usually publish through authenticated HTTP mutations and let the backend emit realtime events after committing business state.
client.publish({
room: 'chat:123',
type: 'message.created',
payload: {
text: 'Hello',
},
});Reconnect and Replay
The client:
- queues outbound messages until the socket opens
- reconnects by default
- tracks the last sequence per room
- buffers out-of-order events
- requests replay when it detects a sequence gap
- sends
ackafter flushing an event
Useful client options:
createRealtimeClient({
url: 'wss://realtime.example.com',
subscriberId: 'device-1',
reconnect: true,
reconnectDelayMs: 1000,
maxBufferedEventsPerRoom: 1000,
replayThrottleMs: 500,
maxQueuedMessages: 1000,
onError(error) {
reportError(error);
},
});Providers
MongoDB
Best for durable replay when your app already uses MongoDB.
URI setup:
const realtime = await connectRealtime({
provider: 'mongo',
connection: process.env.MONGO_URL!,
});Advanced URI setup:
const realtime = await connectRealtime({
provider: 'mongo',
connection: {
uri: process.env.MONGO_URL!,
database: 'zero_realtime',
collectionPrefix: 'realtime',
eventsCollection: 'realtime_events',
countersCollection: 'realtime_counters',
subscribersCollection: 'realtime_subscribers',
autoCreateIndexes: true,
closeClient: true,
},
});Default collections:
realtime_events
realtime_counters
realtime_subscribersDefault indexes:
events: { room: 1, sequence: 1 } unique
events: { id: 1 } unique
events: { expiresAt: 1 }
subscribers: { room: 1, subscriberId: 1 } uniqueInject existing collections:
import { MongoClient } from 'mongodb';
import { createRealtime } from '@savanapoint/zero-pub-sub/server';
const mongo = new MongoClient(process.env.MONGO_URL!);
await mongo.connect();
const db = mongo.db('zero_realtime');
const realtime = createRealtime({
provider: 'mongo',
connection: {
events: db.collection('realtime_events'),
counters: db.collection('realtime_counters'),
subscribers: db.collection('realtime_subscribers'),
},
});MongoDB change streams are used when available. Change streams require a replica set or a managed MongoDB deployment that supports them. Local fanout still works inside the current process without change streams.
PostgreSQL
Best for durable replay and strict per-room ordering.
URI setup:
const realtime = await connectRealtime({
provider: 'postgres',
connection: process.env.DATABASE_URL!,
});Advanced URI setup:
const realtime = await connectRealtime({
provider: 'postgres',
connection: {
uri: process.env.DATABASE_URL!,
ssl: process.env.NODE_ENV === 'production'
? { rejectUnauthorized: false }
: false,
autoMigrate: true,
eventsTable: 'realtime_events',
subscribersTable: 'realtime_subscribers',
notifyChannel: 'realtime_events',
closeClients: true,
},
});connectRealtime creates:
- one
pg.Poolfor queries and writes - one dedicated
pg.ClientforLISTEN/NOTIFY - default tables and indexes when
autoMigrateis true
autoMigrate defaults to true outside production and false when
NODE_ENV=production. Set it explicitly in production so schema changes are
part of your deploy process.
Inject existing clients:
import { Pool, Client } from 'pg';
import { createRealtime } from '@savanapoint/zero-pub-sub/server';
const client = new Pool({ connectionString: process.env.DATABASE_URL });
const listenerClient = new Client({ connectionString: process.env.DATABASE_URL });
await listenerClient.connect();
const realtime = createRealtime({
provider: 'postgres',
connection: {
client,
listenerClient,
},
});Manual schema:
psql "$DATABASE_URL" -f migrations/001_postgres_initial.sql
psql "$DATABASE_URL" -f migrations/002_postgres_prune_function.sqlRetention cleanup:
SELECT realtime_prune_expired_events();Run that periodically from a cron job, scheduled worker or database scheduler.
Redis
Best for low-latency fanout and high-volume realtime delivery.
URI setup:
const realtime = await connectRealtime({
provider: 'redis',
connection: process.env.REDIS_URL!,
});Advanced URI setup:
const realtime = await connectRealtime({
provider: 'redis',
connection: {
uri: process.env.REDIS_URL!,
streamPrefix: 'realtime:stream:',
channelPrefix: 'realtime:room:',
subscriberPrefix: 'realtime:subscriber:',
sequencePrefix: 'realtime:sequence:',
idempotencyPrefix: 'realtime:idempotency:',
sequenceIndexPrefix: 'realtime:sequence-index:',
persistStreams: true,
closeClients: true,
},
});connectRealtime creates:
- one primary Redis client
- one duplicated subscriber client
Inject existing clients:
import { createClient } from 'redis';
import { createRealtime } from '@savanapoint/zero-pub-sub/server';
const client = createClient({ url: process.env.REDIS_URL });
const subscriberClient = client.duplicate();
await client.connect();
await subscriberClient.connect();
const realtime = createRealtime({
provider: 'redis',
connection: {
client,
subscriberClient,
},
});Redis uses:
INCR per-room sequence
XADD replay stream
XRANGE replay reads
PUBLISH realtime fanout
SUBSCRIBE realtime listeners
HSET subscriber ack cursor
SET idempotency cache and sequence-to-stream indexSet persistStreams: false when Redis is used only for fanout and another
provider owns durable replay. Hybrid mode does this automatically for Redis
URI connections used as the realtime leg.
Hybrid Mode
Hybrid mode writes to a durable storage provider and delivers realtime events through another provider.
Recommended production shape:
const realtime = await connectRealtime({
provider: 'hybrid',
storage: {
provider: 'postgres',
connection: process.env.DATABASE_URL!,
},
realtime: {
provider: 'redis',
connection: process.env.REDIS_URL!,
},
});Publish goes to storage first, then realtime fanout:
publish -> PostgreSQL -> Redis -> WebSocket subscribersReplay reads from storage:
replay -> PostgreSQLCatch-up also drains from storage, while live events continue through the realtime leg. This prevents Redis fanout streams from becoming the long-term source of truth when storage is PostgreSQL or MongoDB.
This is the strongest default for many production apps: PostgreSQL or MongoDB for durable replay, Redis for low-latency fanout.
Security
Keep Database Credentials Server-Side
Never expose DATABASE_URL, MONGO_URL or REDIS_URL to frontend or mobile
apps. The browser should only know your WebSocket endpoint.
Authenticate Connections
createWebSocketGateway({
transport,
auth: {
async jwt(token) {
return verifyJwt(token);
},
},
});If auth.jwt is configured, the client must send:
{ "type": "init", "subscriberId": "device-1", "token": "jwt" }Restrict Room Scopes
createWebSocketGateway({
transport,
auth: {
scopes: ['chat', 'order'],
},
});This rejects rooms such as admin:settings unless admin is in the scope list.
Authorize Per Room
createWebSocketGateway({
transport,
auth: {
async authorize(ctx) {
const { subscriberId, claims, room, action } = ctx;
return canAccessRoom({ subscriberId, claims, room, action });
},
},
});Actions are:
'subscribe' | 'publish' | 'replay' | 'ack'Production Checklist
- Use the WebSocket Gateway as the only frontend realtime entrypoint.
- Keep database connection strings in backend environment variables.
- Prefer
connectRealtimefor app-owned realtime infrastructure. - Use
createRealtimeonly when your app already owns provider clients. - Use MongoDB or PostgreSQL for durable replay.
- Use Redis for high-volume fanout.
- Use Hybrid mode when you need both durable replay and low latency.
- Tune
catchUp.batchSizefor workers that may recover large backlogs. - Ack only after successful processing so offline subscribers can resume safely.
- Configure gateway
auth.jwt,auth.scopesandauth.authorize. - Set gateway limits for payload size, connection count, subscription count and rate limits.
- Call
health()in readiness checks. - Export
snapshotMetrics()to your monitoring system. - Call
close()during graceful shutdown. - Prune expired events for durable providers.
- Keep Redis subscriber clients separate from write clients.
- Use a dedicated PostgreSQL listener client for
LISTEN/NOTIFY.
Troubleshooting
Missing bundled dependency "pg"
Reinstall the package so its bundled database drivers are restored:
npm install @savanapoint/zero-pub-subThe same applies to mongodb and redis. The ws package is still optional
and only needed when you run the WebSocket Gateway in Node/Bun.
WebSocket implementation is required
In Node.js clients, pass a WebSocket implementation:
import WebSocket from 'ws';
const client = createRealtimeClient({
url: 'ws://localhost:8080',
subscriberId: 'device-1',
WebSocket,
});Browsers already provide globalThis.WebSocket.
Realtime connection must send init first
The first protocol message must be:
{ "type": "init", "subscriberId": "device-1", "token": "jwt" }createRealtimeClient sends this automatically on open.
No Events Across Multiple Backend Instances
Check provider-specific fanout:
- PostgreSQL needs a dedicated connected
listenerClient. - Redis needs a connected
subscriberClient. - MongoDB cross-process fanout needs change streams.
- Hybrid mode with Redis is recommended for multi-instance fanout.
MongoDB Change Streams Do Not Fire Locally
MongoDB change streams require replica set support. Use a local replica set, MongoDB Atlas, or Hybrid mode with Redis for fanout.
PostgreSQL Tables Are Missing
Use URI setup with autoMigrate: true, or run:
psql "$DATABASE_URL" -f migrations/001_postgres_initial.sql
psql "$DATABASE_URL" -f migrations/002_postgres_prune_function.sqlRedis Replay Returns No Events
Replay depends on Redis Streams. Make sure your client supports xAdd and
xRange, and that events were published through the Redis transport.
Frontend Receives Sequence Gaps
The client automatically requests replay. If gaps persist, check:
- retention windows
- transport
limit - provider health
- event pruning
- whether multiple publishers are using the same room sequence provider
Validation
Build:
npm run buildTest:
npm testPackage check:
npm run pack:checkIf your local npm cache has permission issues, use a temporary cache:
npm --cache /private/tmp/zero-npm-cache pack --dry-runSynthetic load test:
npm run load:testTune load test values:
REALTIME_LOAD_ROOMS=50 \
REALTIME_LOAD_SUBSCRIBERS_PER_ROOM=10 \
REALTIME_LOAD_EVENTS_PER_ROOM=100 \
npm run load:testLicense
MIT
