npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

@mathislair/mtbreactive

v0.3.0

Published

WebSocket reactivity layer on top of @mathislair/mtbdb: register beans, broadcast row changes scoped by foreign key. Optional per-row authorization, batched frames, and pg LISTEN/NOTIFY ingest.

Readme

mtbReactive

npm

WebSocket reactivity layer on top of @mathislair/mtbdb. Register bean classes as reactive, bind a Connection, and clients receive a broadcast every time a row they're subscribed to changes — scoped by the columns you choose (typically a foreign key).

import { ReactiveServer } from '@mathislair/mtbreactive';
import { Connection } from '@mathislair/mtbdb';
import { MessageDao, Message } from './generated';

const conn = await Connection.open({ driver: 'postgres', /* ... */ });

const reactive = new ReactiveServer({ port: 8080, path: '/ws' });
reactive.register(Message, { scopes: ['conversation_id'] });
reactive.bind(conn);

// Any conn.dao(MessageDao).save(...) / .delete(...) now broadcasts to
// every client subscribed to { channel: 'message', scope: { col: 'conversation_id', value: 42 } }.

Install

npm install @mathislair/mtbreactive @mathislair/mtbdb ws

Requires @mathislair/mtbdb ≥ 0.4.0 (typed events emitter, transaction buffering, and Connection.listen for the optional LISTEN/NOTIFY mode) and Node 18+.

How it works

mtbReactive sits between the ORM and your WebSocket clients:

  1. Registry — declares which beans are reactive and on which scope columns.
  2. Bridge — listens to conn.events (afterInsert / afterUpdate / afterDelete) emitted by AbstractDao writes.
  3. SubscriptionStore — tracks (channel, scope) → clients so each change only fans out to the clients that asked for it.
  4. WsServer — handles the wire protocol (subscribe / unsubscribe / change messages) and ties the three together.

A change in the DB → an event from the ORM → routed by registry + scope → sent only to the WebSocket clients that subscribed to that scope.

Wire protocol

Clients send JSON frames:

{ "type": "subscribe",   "channel": "message", "scope": { "col": "conversation_id", "value": 42 } }
{ "type": "unsubscribe", "channel": "message", "scope": { "col": "conversation_id", "value": 42 } }

Server replies:

{ "type": "subscribed", "channel": "message", "scope": { "col": "conversation_id", "value": 42 } }
{ "type": "error",      "code": "unknown_channel" | "invalid_scope" | "forbidden" | "invalid_json" | "unknown_message_type" }

Server broadcasts:

{
  "type": "change",
  "channel": "message",
  "scope":   { "col": "conversation_id", "value": 42 },
  "event":   {
    "type":       "afterInsert" | "afterUpdate" | "afterDelete",
    "schemaName": "public",
    "tableName":  "message",
    "primaryKey": { "id": 17 },
    "row":        { "id": 17, "conversation_id": 42, "body": "..." }
  }
}

afterUpdate events also carry a changed field listing the columns that actually changed (the diff).

Authentication & authorization

Both are optional and run on the WS handshake / per-subscription respectively.

const reactive = new ReactiveServer({
  port: 8080,
  authenticate: async (req) => {
    const user = await verifyJwt(req.headers.authorization);
    return { user }; // becomes ctx in authorize
  },
  authorize: async ({ ctx, channel, scope }) => {
    if (channel === 'message') {
      return userIsInConversation(ctx.user, scope.value);
    }
    return false;
  },
});

authenticate failures close the socket with 4401 unauthorized. authorize failures reply with { "type": "error", "code": "forbidden" } and skip the subscribe.

Snapshot on subscribe

Without a snapshot, a chat-style client races between fetch history and subscribe to live: a row inserted between the two calls is lost. Set snapshot: true on register() and the server sends the rows currently matching the scope before any live change events:

reactive.register(Message, {
  scopes: ['conversation_id'],
  snapshot: { limit: 100, orderBy: 'created_at DESC' },
});

The wire ordering is guaranteed: subscribed → snapshot → change …. While the snapshot query runs, any incoming change events for the new subscription are buffered server-side and flushed in order after the snapshot, so the client never sees a change before the snapshot it relates to.

{ "type": "snapshot", "channel": "message",
  "scope": { "col": "conversation_id", "value": 42 },
  "rows": [ { "id": 1, "conversation_id": 42, "body": "..." }, ... ] }

snapshot requires bind(conn) to have been called before clients subscribe — the server queries through the same Connection. If snapshot is requested but the server isn't bound, the client receives an { "type": "error", "code": "snapshot_unavailable" } frame.

Scoping

scopes lists the columns clients are allowed to filter on. By default, one scope key is produced per registered scope column, skipping null/undefined values. Override with resolveScopes(row) => ScopeKey[] to derive scopes from computed values, multiple FKs, or a join table.

reactive.register(OrderItem, {
  scopes: ['order_id', 'product_id'],
  channel: 'order-items',          // optional; defaults to tableName
  resolveScopes: (row) => [
    { col: 'order_id',   value: row.order_id },
    { col: 'product_id', value: row.product_id },
  ],
});

Per-row authorization (filterRow)

authorize runs once at subscribe time. When "can this user see this row?" also depends on the row's contents (private flag, soft-deletion, owner-only fields), wire filterRow — it runs once per (subscribed-client, event) pair before the change frame is sent to that specific client.

const reactive = new ReactiveServer({
  port: 8080,
  filterRow: ({ ctx, row }) => {
    if (row.private && row.author_id !== ctx.user) return false;
    return true;
  },
});

Returning false (or a Promise resolving to false) skips that client without affecting any other subscriber. A thrown filterRow is treated as false (fail-closed).

Batched frames (batchMs)

Bulk inserts produce one frame per row — annoying for browsers that have to render-thrash through 100 messages. Set batchMs on a registration to coalesce events within an N-millisecond window into a single frame:

reactive.register(Message, {
  scopes: ['conversation_id'],
  batchMs: 50, // up to 50ms latency, 1 frame per scope per window
});

When batched, the frame uses a plural events field instead of the singular event:

{ "type": "change", "channel": "message", "scope": {...},
  "events": [ {...}, {...}, {...} ] }

The client packages (@mathislair/mtbreactive-client and the framework wrappers) accept either shape transparently.

LISTEN / NOTIFY mode

DAO writes are captured automatically by bind(conn). To capture writes that bypass the DAO (raw SQL, triggers, separate processes), wire listen() to a Postgres NOTIFY channel and have a row-level trigger emit pg_notify(channel, json_build_object('type', ..., ...)):

reactive.bind(conn);
await reactive.listen({ channel: 'mtbreactive' });

The default decoder expects JSON of the shape:

{ "type": "afterInsert"|"afterUpdate"|"afterDelete",
  "schemaName": "public", "tableName": "message",
  "primaryKey": { "id": 1 },
  "row": { ... },
  "changed": { ... }   // optional, afterUpdate only
}

Pass a custom parse(payload) if your trigger emits a different shape.

Sample SQL for a generic broadcast trigger (run once per reactive table):

CREATE OR REPLACE FUNCTION mtbreactive_notify() RETURNS trigger AS $$
DECLARE
  payload jsonb;
BEGIN
  payload := jsonb_build_object(
    'type', 'after' || initcap(lower(TG_OP)),
    'schemaName', TG_TABLE_SCHEMA,
    'tableName', TG_TABLE_NAME,
    'primaryKey', (CASE WHEN TG_OP = 'DELETE'
                        THEN to_jsonb(OLD)
                        ELSE to_jsonb(NEW) END),
    'row',        (CASE WHEN TG_OP = 'DELETE'
                        THEN to_jsonb(OLD)
                        ELSE to_jsonb(NEW) END)
  );
  PERFORM pg_notify('mtbreactive', payload::text);
  RETURN NULL;
END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER message_notify
AFTER INSERT OR UPDATE OR DELETE ON message
FOR EACH ROW EXECUTE FUNCTION mtbreactive_notify();

The trigger is your responsibility — mtbDB does not generate it (yet). Pick this mode when fidelity matters more than schema simplicity.

When changes are NOT broadcast

The bridge only sees writes that go through AbstractDao.save() / AbstractDao.delete() — that's where mtbdb emits the lifecycle events mtbreactive subscribes to. Anything that bypasses the DAO is invisible to subscribers:

  • Raw SQL through conn.driver.execute(...) or conn.driver.query(...).
  • Postgres triggers that update other tables in response to a write.
  • A separate process (cron job, worker, psql session) writing to the same database.
  • Failed transactions — events are buffered for the duration of conn.transaction(fn) and only emitted if the transaction commits, so rollbacks correctly do not reach subscribers.

If you need real-fidelity capture of every write regardless of source, you'll want a Postgres LISTEN/NOTIFY bridge (DB triggers → pg_notify). That's not built in here yet — see Mtbreactive issue #2.

Attaching to an existing HTTP server

import { createServer } from 'http';
const http = createServer(app);
const reactive = new ReactiveServer({ server: http, path: '/ws' });
http.listen(3000);

License

MIT