npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

insite-subscriptions-server

v2.4.0

Published

Server-side subscriptions/publications for inSite

Readme

insite-subscriptions-server

Server-side pub/sub layer for inSite. Defines publications (named data sources), subscription handles, and WebSocket integration for real-time updates.

Part of inSite — wires to insite-ws and insite-users-server-ws; uses insite-db.

Installation

npm install insite-subscriptions-server

Or:

bun add insite-subscriptions-server

Overview

The package provides two data modes:

  • object — arbitrary data via custom fetch; you call changed() to push updates
  • map — MongoDB collection with query, projection, sort; reacts to Change Streams and sends diff updates

Two entry points:

  • Main (insite-subscriptions-server) — low-level API: Publication, SubscriptionHandle, CollectionMapPublication, CollectionMapSubscriptionHandle
  • WS (insite-subscriptions-server/ws) — wires subscription protocol to insite-ws and insite-users-server-ws; adds SubscriptionHandler and wss.publish()

Core Concepts

Publication — Named data source. Holds subscribers, invokes fetch or queries MongoDB, notifies on changes.

SubscriptionHandle — Subscription to a publication. Calls handler(fetched, reason) when data changes.

CollectionMapPublication — Publication backed by a WatchedCollection. Supports query, projection, sort. Listens to Change Streams and emits incremental updates.

CollectionMapSubscriptionHandle — Subscription to a map publication. Receives diff updates: ["i", items, sort?], ["c", doc], ["u", doc, fields?], ["d", _id].

Public API

Main Entry (insite-subscriptions-server)

| Export | Description | |--------|-------------| | publications | Map<string, Publication> — global registry | | Publication | Class | | SubscriptionHandle | Class | | CollectionMapPublication | Class | | CollectionMapSubscriptionHandle | Class | | skippedChangeStreamDocuments | WeakSet<ChangeStreamDocument> — documents to skip in Change Stream handling | | Projection | type |


Publication

class Publication<SA extends SubscriptionArgs = SubscriptionArgs>

| Property | Type | Description | |----------|------|-------------| | name | string | Publication name (readonly) | | type | string | "object" (default) | | subscriptions | Set<SubscriptionHandle<SA>> | Active subscribers | | onSubscribe | (subscription) => void | Optional callback on subscribe | | onUnsubscribe | (subscription) => void | Optional callback on unsubscribe | | fetch | (...args: SA) => unknown | Optional; called to fetch data |

| Method | Description | |--------|-------------| | constructor(name, props?) | Registers in publications | | subscribe(subscription) | Adds subscriber, calls onSubscribe | | unsubscribe(subscription) | Removes subscriber, calls onUnsubscribe | | changed(reason?) | Notifies all subscribers | | fetchSubscription(subscription, reason?) | Fetches for one subscriber; defaults to fetch(...subscription.args) |


SubscriptionHandle

class SubscriptionHandle<SA extends SubscriptionArgs>

| Property | Type | Description | |----------|------|-------------| | publication | Publication<SA> | The publication | | args | SA | Arguments passed to fetch | | handler | SubscriptionHandler | (fetched, reason?) => void |

| Method | Description | |--------|-------------| | constructor(publicationName, args, handler, immediately?, prevent?) | Subscribes if publication exists; immediately triggers initial fetch; prevent skips auto-subscribe | | changed(reason?) | Fetches and calls handler | | cancel() | Unsubscribes | | renew() | Unsubscribes, resubscribes, fetches |


CollectionMapPublication

class CollectionMapPublication<D extends Document, SA extends SubscriptionArgs> extends Publication<SA>

| Property | Type | Description | |----------|------|-------------| | collection | WatchedCollection<D> | MongoDB collection with Change Stream | | queryProps | ((...args: SA) => QueryProps<D> \| false \| null \| void) \| QueryProps<D> \| false \| null | Query config per subscription | | transform | (doc: TransformableDoc<D>, args: SA) => void | Optional doc transform before send | | type | string | "map" |

| Method | Description | |--------|-------------| | constructor(collection, name, queryProps?, transform?) | | | skip(next) | Adds next to skippedChangeStreamDocuments | | makeQueryProps(args) | Returns { query, projection, isProjectionInclusive, fields, sort, args } | | flushInitial() | Fetches for all subscribers, sends as single batch | | changed(reason) | Promise — notifies all subscribers |

onSubscribe / onUnsubscribe are set internally to manage Change Stream listeners.

QueryProps:

| Field | Type | Description | |-------|------|-------------| | query | Filter<D> | MongoDB filter | | projection | Projection | Field selection | | sort | Sort | Sort spec | | triggers | string[] | Fields that affect projection (for update filtering) |


CollectionMapSubscriptionHandle

class CollectionMapSubscriptionHandle<D extends Document, SA extends SubscriptionArgs> extends SubscriptionHandle<SA>

| Property | Type | Description | |----------|------|-------------| | ids | Set<string> | Document ids in current result set | | query | Filter<D> \| null | Set by onSubscribe | | projection | Projection \| null | Set by onSubscribe | | isProjectionInclusive | boolean | Set by onSubscribe | | fields | Set<string> \| null | Set by onSubscribe | | sort | Sort \| null | Set by onSubscribe | | match | (doc: D) => boolean | Sift matcher for query | | updates | unknown[] | Pending updates (batched) | | flushUpdates | () => void | Sends updates to handler and clears | | collectionChangeListener | (next: ChangeStreamDocument<D>) => Promise<void> | Change Stream handler |

| Method | Description | |--------|-------------| | constructor(publicationName, args, handler, immediately?) | Subscribes and optionally fetches initial data | | changed(next?) | Fetches and calls handler([result]) |

Map update format (passed to handler as array of tuples):

| Tuple | Description | |-------|-------------| | ["i", items[], sort?] | Initial load | | ["c", doc] | Create | | ["u", doc, fields?] | Update; fields is string[], true, or undefined | | ["d", _id] | Delete |


Types

| Type | Description | |------|-------------| | PublicationProps<SA> | { type?, fetch?, fetchSubscription?, onSubscribe?, onUnsubscribe? } | | Projection | { [key: string]: Projection \| boolean \| number } — MongoDB-style projection | | PartialWithId<D> | Partial<D> & { _id: string } | | TransformableDoc<D> | PartialWithId<D> & { [key: string]: any } | | SubscriptionArgs | unknown[] | | SubscriptionHandler | (fetched: unknown, reason?: unknown) => void |


WS Entry (insite-subscriptions-server/ws)

| Export | Description | |--------|-------------| | SubscriptionHandler | Wires subscription protocol to WSServer | | Publication | Publication with WSSubscriptionArgs (first arg is WSSCWithUser) | | SubscriptionHandle | Subscription handle for WS context | | CollectionMapPublication | Map publication for WS context | | CollectionMapSubscriptionHandle | Map subscription handle for WS context | | Subscriptions | Map<number \| string, SubscriptionHandle> — per-client subscriptions | | WithPublish<T, AS> | Type: T & { publish(...) } | | WithPublishCollection<T, AS> | Type: T & WithPublish & { publish(...) } for collections | | isPublicationCollectionMap(publication) | Type guard | | isCollectionMapPublicationArgs(args) | Type guard |


SubscriptionHandler

class SubscriptionHandler<AS extends AbilitiesSchema>

| Method | Description | |--------|-------------| | constructor(wss: WSServer, withCollections?) | Listens to client-connect, client-session, client-message:s-s, client-message:s-u, client-close, should-renew-subscriptions. When withCollections is true, adds wss.publish() for both object and map publications | | renewSubscriptionsFor(webSockets) | Renews subscriptions for given clients |

Protocol messages:

| Message | Direction | Description | |--------|-----------|-------------| | s-s | client → server | Subscribe: (type, publicationName, id, ...restArgs, immediately?) | | s-u | client → server | Unsubscribe: (id) | | s-c | server → client | Changed: (id, data) |


Usage

Object publication (standalone):

import {
	publications,
	Publication,
	SubscriptionHandle
} from "insite-subscriptions-server";

const pub = new Publication("items", {
	type: "object",
	fetch: () => ({ items: [1, 2, 3] })
});

const handle = new SubscriptionHandle("items", [], (data) => {
	console.log(data);
}, true);

pub.changed(); // pushes update to all subscribers
handle.cancel();

Map publication (MongoDB):

import { CollectionMapPublication, CollectionMapSubscriptionHandle } from "insite-subscriptions-server";

const pub = new CollectionMapPublication(
	collection,
	"items",
	(userId) => ({ query: { userId }, projection: { name: 1 } }),
	(doc, args) => { doc.extra = "computed"; }
);

const handle = new CollectionMapSubscriptionHandle("items", [userId], (updates) => {
	// updates: [["i", items, sort], ["c", doc], ["u", doc, fields], ["d", id]]
}, true);

WebSocket integration:

import { SubscriptionHandler } from "insite-subscriptions-server/ws";
import type { WSServer } from "insite-ws/server";

const wss: WSServer = /* ... */;
new SubscriptionHandler(wss, true);

// With withCollections: true, wss gets publish()
const pub = wss.publish(collection, "items", (wssc, userId) => ({
	query: { userId },
	projection: { name: 1 }
}));

Related

License

MIT