@mathislair/mtbreactive
v0.3.0
Published
WebSocket reactivity layer on top of @mathislair/mtbdb: register beans, broadcast row changes scoped by foreign key. Optional per-row authorization, batched frames, and pg LISTEN/NOTIFY ingest.
Maintainers
Readme
mtbReactive
WebSocket reactivity layer on top of @mathislair/mtbdb.
Register bean classes as reactive, bind a Connection, and clients receive a
broadcast every time a row they're subscribed to changes — scoped by the
columns you choose (typically a foreign key).
import { ReactiveServer } from '@mathislair/mtbreactive';
import { Connection } from '@mathislair/mtbdb';
import { MessageDao, Message } from './generated';
const conn = await Connection.open({ driver: 'postgres', /* ... */ });
const reactive = new ReactiveServer({ port: 8080, path: '/ws' });
reactive.register(Message, { scopes: ['conversation_id'] });
reactive.bind(conn);
// Any conn.dao(MessageDao).save(...) / .delete(...) now broadcasts to
// every client subscribed to { channel: 'message', scope: { col: 'conversation_id', value: 42 } }.Install
npm install @mathislair/mtbreactive @mathislair/mtbdb wsRequires @mathislair/mtbdb ≥ 0.4.0 (typed events emitter, transaction
buffering, and Connection.listen for the optional LISTEN/NOTIFY mode)
and Node 18+.
How it works
mtbReactive sits between the ORM and your WebSocket clients:
- Registry — declares which beans are reactive and on which scope columns.
- Bridge — listens to
conn.events(afterInsert/afterUpdate/afterDelete) emitted byAbstractDaowrites. - SubscriptionStore — tracks
(channel, scope) → clientsso each change only fans out to the clients that asked for it. - WsServer — handles the wire protocol (subscribe / unsubscribe / change messages) and ties the three together.
A change in the DB → an event from the ORM → routed by registry + scope → sent only to the WebSocket clients that subscribed to that scope.
Wire protocol
Clients send JSON frames:
{ "type": "subscribe", "channel": "message", "scope": { "col": "conversation_id", "value": 42 } }
{ "type": "unsubscribe", "channel": "message", "scope": { "col": "conversation_id", "value": 42 } }Server replies:
{ "type": "subscribed", "channel": "message", "scope": { "col": "conversation_id", "value": 42 } }
{ "type": "error", "code": "unknown_channel" | "invalid_scope" | "forbidden" | "invalid_json" | "unknown_message_type" }Server broadcasts:
{
"type": "change",
"channel": "message",
"scope": { "col": "conversation_id", "value": 42 },
"event": {
"type": "afterInsert" | "afterUpdate" | "afterDelete",
"schemaName": "public",
"tableName": "message",
"primaryKey": { "id": 17 },
"row": { "id": 17, "conversation_id": 42, "body": "..." }
}
}afterUpdate events also carry a changed field listing the columns that
actually changed (the diff).
Authentication & authorization
Both are optional and run on the WS handshake / per-subscription respectively.
const reactive = new ReactiveServer({
port: 8080,
authenticate: async (req) => {
const user = await verifyJwt(req.headers.authorization);
return { user }; // becomes ctx in authorize
},
authorize: async ({ ctx, channel, scope }) => {
if (channel === 'message') {
return userIsInConversation(ctx.user, scope.value);
}
return false;
},
});authenticate failures close the socket with 4401 unauthorized. authorize
failures reply with { "type": "error", "code": "forbidden" } and skip the
subscribe.
Snapshot on subscribe
Without a snapshot, a chat-style client races between fetch history and
subscribe to live: a row inserted between the two calls is lost. Set
snapshot: true on register() and the server sends the rows currently
matching the scope before any live change events:
reactive.register(Message, {
scopes: ['conversation_id'],
snapshot: { limit: 100, orderBy: 'created_at DESC' },
});The wire ordering is guaranteed: subscribed → snapshot → change …. While
the snapshot query runs, any incoming change events for the new
subscription are buffered server-side and flushed in order after the
snapshot, so the client never sees a change before the snapshot it
relates to.
{ "type": "snapshot", "channel": "message",
"scope": { "col": "conversation_id", "value": 42 },
"rows": [ { "id": 1, "conversation_id": 42, "body": "..." }, ... ] }snapshot requires bind(conn) to have been called before clients
subscribe — the server queries through the same Connection. If snapshot
is requested but the server isn't bound, the client receives an
{ "type": "error", "code": "snapshot_unavailable" } frame.
Scoping
scopes lists the columns clients are allowed to filter on. By default, one
scope key is produced per registered scope column, skipping null/undefined
values. Override with resolveScopes(row) => ScopeKey[] to derive scopes
from computed values, multiple FKs, or a join table.
reactive.register(OrderItem, {
scopes: ['order_id', 'product_id'],
channel: 'order-items', // optional; defaults to tableName
resolveScopes: (row) => [
{ col: 'order_id', value: row.order_id },
{ col: 'product_id', value: row.product_id },
],
});Per-row authorization (filterRow)
authorize runs once at subscribe time. When "can this user see this row?"
also depends on the row's contents (private flag, soft-deletion, owner-only
fields), wire filterRow — it runs once per (subscribed-client, event)
pair before the change frame is sent to that specific client.
const reactive = new ReactiveServer({
port: 8080,
filterRow: ({ ctx, row }) => {
if (row.private && row.author_id !== ctx.user) return false;
return true;
},
});Returning false (or a Promise resolving to false) skips that client
without affecting any other subscriber. A thrown filterRow is treated
as false (fail-closed).
Batched frames (batchMs)
Bulk inserts produce one frame per row — annoying for browsers that have
to render-thrash through 100 messages. Set batchMs on a registration to
coalesce events within an N-millisecond window into a single frame:
reactive.register(Message, {
scopes: ['conversation_id'],
batchMs: 50, // up to 50ms latency, 1 frame per scope per window
});When batched, the frame uses a plural events field instead of the
singular event:
{ "type": "change", "channel": "message", "scope": {...},
"events": [ {...}, {...}, {...} ] }The client packages (@mathislair/mtbreactive-client and the framework
wrappers) accept either shape transparently.
LISTEN / NOTIFY mode
DAO writes are captured automatically by bind(conn). To capture writes
that bypass the DAO (raw SQL, triggers, separate processes), wire
listen() to a Postgres NOTIFY channel and have a row-level trigger
emit pg_notify(channel, json_build_object('type', ..., ...)):
reactive.bind(conn);
await reactive.listen({ channel: 'mtbreactive' });The default decoder expects JSON of the shape:
{ "type": "afterInsert"|"afterUpdate"|"afterDelete",
"schemaName": "public", "tableName": "message",
"primaryKey": { "id": 1 },
"row": { ... },
"changed": { ... } // optional, afterUpdate only
}Pass a custom parse(payload) if your trigger emits a different shape.
Sample SQL for a generic broadcast trigger (run once per reactive table):
CREATE OR REPLACE FUNCTION mtbreactive_notify() RETURNS trigger AS $$
DECLARE
payload jsonb;
BEGIN
payload := jsonb_build_object(
'type', 'after' || initcap(lower(TG_OP)),
'schemaName', TG_TABLE_SCHEMA,
'tableName', TG_TABLE_NAME,
'primaryKey', (CASE WHEN TG_OP = 'DELETE'
THEN to_jsonb(OLD)
ELSE to_jsonb(NEW) END),
'row', (CASE WHEN TG_OP = 'DELETE'
THEN to_jsonb(OLD)
ELSE to_jsonb(NEW) END)
);
PERFORM pg_notify('mtbreactive', payload::text);
RETURN NULL;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER message_notify
AFTER INSERT OR UPDATE OR DELETE ON message
FOR EACH ROW EXECUTE FUNCTION mtbreactive_notify();The trigger is your responsibility — mtbDB does not generate it (yet). Pick this mode when fidelity matters more than schema simplicity.
When changes are NOT broadcast
The bridge only sees writes that go through AbstractDao.save() /
AbstractDao.delete() — that's where mtbdb emits the lifecycle events
mtbreactive subscribes to. Anything that bypasses the DAO is invisible to
subscribers:
- Raw SQL through
conn.driver.execute(...)orconn.driver.query(...). - Postgres triggers that update other tables in response to a write.
- A separate process (cron job, worker, psql session) writing to the same database.
- Failed transactions — events are buffered for the duration of
conn.transaction(fn)and only emitted if the transaction commits, so rollbacks correctly do not reach subscribers.
If you need real-fidelity capture of every write regardless of source,
you'll want a Postgres LISTEN/NOTIFY bridge (DB triggers → pg_notify).
That's not built in here yet — see Mtbreactive issue #2.
Attaching to an existing HTTP server
import { createServer } from 'http';
const http = createServer(app);
const reactive = new ReactiveServer({ server: http, path: '/ws' });
http.listen(3000);License
MIT
