npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

@presenc3/link-core

v0.5.0

Published

Shared WebSocket link protocol, client, and hub for inter-service communication. Also bundles companion helpers (logger, env coercion, observability, RPC patterns, lifecycle, secrets loader)

Readme

@presenc3/link-core

Shared WebSocket link protocol, client, and hub for inter-service communication.

link-core gives you a small, opinionated way to tie multiple Node.js services together over WebSockets. It ships:

  • LinkClient - a client that handles connection, reconnection, HMAC-signed messages, periodic status pushes, peer discovery, and request/response RPC.
  • createHub - an embeddable hub that routes messages between peers, tracks status, enforces one connection per peer kind, and lets you expose your own server-side RPCs. Transport-agnostic - you bring the HTTP/WebSocket server.
  • createHubServer - a batteries-included hub server that wraps createHub with an HTTP server, WebSocket server, a default /health route, an opt-in /state route, signal handling, and graceful shutdown. Use this when the hub is your service.

The library is unopinionated about roles - every service uses LinkClient, and the meaning of "coordinator", "worker", "publisher", etc. is up to you.

Table of contents

Install

npm install @presenc3/link-core

Requires Node.js ≥ 20.

link-core ships dual CommonJS + ESM entry points, with hand-written TypeScript declarations. All three styles work without configuration:

// CommonJS
const { LinkClient, createHubServer, RpcTimeoutError } = require('@presenc3/link-core');

// ESM
import { LinkClient, createHubServer, RpcTimeoutError } from '@presenc3/link-core';

// TypeScript - types resolve automatically via the package's `exports` map
import { LinkClient, type LinkClientOptions } from '@presenc3/link-core';

The package is marked "sideEffects": false so bundlers (webpack, Rollup, esbuild, Vite) can tree-shake unused exports.

Quick start

There are two ways to run a hub:

  • createHubServer - batteries-included. Spins up the HTTP server, WebSocket server, a default /health route, an opt-in /state route, signal handling, and graceful shutdown for you. Use this when the hub is the whole job.
  • createHub - transport-agnostic. You bring the HTTP/WebSocket server. Use this when you're embedding the hub inside a larger app (Express, Fastify, an existing service).

Run a hub (batteries-included)

const { createHubServer } = require('@presenc3/link-core');

const server = createHubServer({
  secret: process.env.LINK_SECRET,
  port:   8080,
});

server.start();

That's the whole thing. You get:

  • An HTTP server on :8080 with GET /health already wired. (GET /state is opt-in: pass enableStateRoute: true.)
  • A WebSocket server attached to it, routing connections into the hub.
  • SIGINT/SIGTERM handlers that run a graceful shutdown (close WSS → close client sockets → terminate stragglers → close HTTP → stop hub).

See createHubServer below for the full options list, including server-side RPCs, custom routes, an extraState hook for /state, an onShutdown hook for cleanup like flushing files, and BYO HTTP server support.

Run a hub (transport-agnostic)

If you want full control - e.g. you already have an Express app, a custom upgrade handler, HTTPS certs, or you don't want the default routes - use createHub and bring your own server:

const http = require('http');
const { WebSocketServer } = require('ws');
const { createHub } = require('@presenc3/link-core');

const hub = createHub({
  secret: process.env.LINK_SECRET,
});

const server = http.createServer();
const wss = new WebSocketServer({ server });
wss.on('connection', (ws, req) => hub.attach(ws, req));

server.listen(8080, () => console.log('link hub on :8080'));

Connect a client

const { LinkClient } = require('@presenc3/link-core');

const link = new LinkClient({
  url:    'ws://localhost:8080',
  secret: process.env.LINK_SECRET,
  kind:   'worker',
  name:   'worker-1',
});

link.start();

That's enough to connect, authenticate with the shared secret, and stay connected (it'll reconnect with exponential backoff if the hub goes away).

kind is a singleton service identity, not a worker-pool name. The hub enforces one connection per kind: if a second client connects with the same kind as a connected one, the old socket is closed. This is intentional - it prevents zombie connections after a hard crash + restart. If you want to run multiple instances of the same service in parallel (worker-1, worker-2, ...), give each a unique kind.

Connection lifecycle

A LinkClient connection moves through three states. Knowing the difference matters when the hub is configured with per-peer keys (where the hub may reject a hello) or when you want a "wait until I'm fully connected before doing X" hook.

| State | Event | What it means | Safe to publish/send/rpc? | | ------------ | ----------- | -------------------------------------------------------------- | ------------------------- | | connected | 'connect' | TCP/WebSocket open. hello has been sent. | No | | verified | 'verified'| First signed message arrived. Crypto checks pass. | No | | ready | 'ready' | Hub accepted the hello (hello.ack.ok !== false). | Yes |

The reason verified and ready are split is architectural: verified fires when any correctly-signed frame from the hub has been processed, while ready requires the specific hello.ack with ok !== false. The 'rejected' event handles the protocol-level case where the hub explicitly returns hello.ack ok:false. (Note: the bundled hub doesn't currently emit ok:false for unknown kinds - it silently drops to avoid confirming kind existence to attackers, and the client experiences this as a 'no-ack' protocol-error after helloAckDiagnosticMs followed by a reconnect. The verified/ready split is forward-compatible with future hub policies that do return ok:false.) Treating verified as "OK to publish" would race against any of these rejection paths.

The recommended startup pattern:

const link = new LinkClient({ url, secret, kind: 'worker' });

await link.ready({ timeoutMs: 5_000 });   // start()s for you if needed

link.publish('jobs.complete', { id: 42 });

link.ready():

  • Calls start() for you if the link isn't already running.
  • Resolves with { kind, features } when the hub accepts the hello.
  • Rejects with HelloRejectedError (code: 'HELLO_REJECTED') if the hub explicitly rejects.
  • Rejects with a timeout error if timeoutMs elapses first.
  • Rejects with an AbortError if signal aborts.
  • Resolves immediately if the link is already ready.

If you don't want to use await:

link.on('ready',    ({ kind, features }) => { /* gate work here */ });
link.on('rejected', ({ reason })          => { /* config error somewhere */ });
link.start();

By default, a 'rejected' event also stop()s the client. Without this, a client with a wrong key would hammer the hub at the initial reconnect interval forever - see the v0.4.0 bugfix notes. To opt out (e.g. if you expect the hub's key registry to add your kind at runtime), set reconnectOnRejection: true.

Subscriptions are tracked locally and replay automatically on every 'ready' transition, so reconnects don't require application-level re-subscribe logic.

A two-service example

Say you have a coordinator that hands out jobs and a worker that does them. Both use the same LinkClient; the only difference is what they do with it.

Worker

const { LinkClient } = require('@presenc3/link-core');

const link = new LinkClient({
  url:    process.env.LINK_URL,
  secret: process.env.LINK_SECRET,
  kind:   'worker',
  name:   'worker-1',

  // Pushed to the hub on connect and every 10s.
  makeStatus: () => ({
    status: 'idle',
    at:     Date.now(),
  }),

  // Handlers for incoming RPC requests, keyed by rpcType.
  rpcHandlers: {
    'job.run': async ({ jobId, payload }) => {
      const result = await doTheWork(payload);
      return { jobId, result };
    },
  },
});

link.start();

Coordinator

const { LinkClient } = require('@presenc3/link-core');

const link = new LinkClient({
  url:    process.env.LINK_URL,
  secret: process.env.LINK_SECRET,
  kind:   'coordinator',
});

link.start();

async function dispatch(job) {
  // Send an RPC to any peer with kind 'worker' and await its response.
  const result = await link.rpc('worker', 'job.run', {
    jobId:   job.id,
    payload: job.payload,
  });
  console.log('Job done:', result);
}

Both sides are symmetric - the coordinator could just as easily expose its own rpcHandlers, and the worker could call link.rpc(...) back.

Custom server-side RPCs

Any rpc.request with to: 'server' is handled by the hub itself, using the rpcHandlers you pass to createHub. The shape is identical to a client's rpcHandlers:

const hub = createHub({
  secret: process.env.LINK_SECRET,
  rpcHandlers: {
    'ping': async () => ({ pong: true, now: Date.now() }),

    'time': async () => ({ now: Date.now() }),
  },
});

Clients then call them just like any other RPC:

const { pong, now } = await link.rpc('server', 'ping', {});

Example: a small KV store

The plug-in model makes it easy to layer features onto the hub without forking it. Here's a persisted JSON key-value store exposed as kv.get / kv.set / kv.del / kv.list:

const fs = require('fs');
const path = require('path');

function createKvHandlers(filePath) {
  const kv = new Map(Object.entries(
    fs.existsSync(filePath) ? JSON.parse(fs.readFileSync(filePath, 'utf8')) : {}
  ));

  let saveTimer = null;
  const scheduleSave = () => {
    if (saveTimer) return;
    saveTimer = setTimeout(() => {
      saveTimer = null;
      const tmp = `${filePath}.${process.pid}.tmp`;
      fs.mkdirSync(path.dirname(filePath), { recursive: true });
      fs.writeFileSync(tmp, JSON.stringify(Object.fromEntries(kv), null, 2));
      fs.renameSync(tmp, filePath);
    }, 250);
  };

  return {
    'kv.get':  async ({ key })        => ({ key, value: kv.get(key) ?? null }),
    'kv.set':  async ({ key, value }) => { kv.set(key, value); scheduleSave(); return { ok: true }; },
    'kv.del':  async ({ key })        => { kv.delete(key);    scheduleSave(); return { ok: true }; },
    'kv.list': async ({ prefix = '' }) => ({ keys: [...kv.keys()].filter(k => k.startsWith(prefix)) }),
  };
}

const hub = createHub({
  secret: process.env.LINK_SECRET,
  rpcHandlers: createKvHandlers('./data/kv.json'),
});

Pub/sub topics

Beyond peer-routed RPC, link-core has a generic publish/subscribe channel for fan-out events. Any client can subscribe to a topic and publish to one; the hub maintains the topic→subscriber map and fans out each publish to every subscriber. Status broadcasts are a separate, narrower mechanism - pub/sub is for everything else.

// Subscribe - handler fires for every publish on that topic.
link.subscribe('events.user.signup', (payload, msg) => {
  console.log('new user from', msg.from, '-', payload);
});

// Publish - fan-out to all current subscribers (excluding self).
link.publish('events.user.signup', { userId: 123, email: '[email protected]' });

// Multiple handlers per topic share one hub-side subscription.
link.subscribe('events.user.signup', metricsHandler);

// Remove one handler, or all handlers for a topic.
link.unsubscribe('events.user.signup', metricsHandler);
link.unsubscribe('events.user.signup');

Behavior notes:

  • Topic names must match [a-zA-Z0-9._-]+, length 1–256. Wildcards (*, **) are reserved for a future release and currently rejected. Invalid topics throw synchronously on subscribe/publish/unsubscribe.
  • No self-delivery in v0.4. A peer that publishes to a topic does not receive its own message even if subscribed. (worker-1 publishing events.x will not fire worker-1's own events.x handler.)
  • At-most-once. If a subscriber is offline at publish time, they miss the message - pub/sub does not queue or persist. If you need durability, use RPC against a hub-handled handler that writes to your own store.
  • Trusted from. The hub overwrites msg.from with the publisher's authenticated kind before forwarding, same as RPC. Subscribers can rely on msg.from for routing/authorization.
  • Reconnect-safe. Subscriptions are tracked client-side and replayed automatically on every 'ready' transition. The hub forgets a peer's subscriptions on disconnect; the client re-establishes them on reconnect with no application-code involvement.
  • Backpressure-aware. If a subscriber's send buffer is full (bufferedAmount > maxBufferedBytes), the hub drops the message for that subscriber only, not for the whole topic.
  • Hub feature flag. The hub advertises 'topics' in hello.ack.data.features; the client captures this as link.hubFeatures. publish() throws FeatureUnsupportedError if the hub doesn't advertise topic support - including against v0.3.x hubs that don't advertise any features at all (the client treats "no advertisement" as "feature absent" so a publish against a hub that won't act on it fails loud rather than silently dropping).

Introspection: link.topic.list

The hub provides a built-in server RPC for listing subscribers:

// Subscribers of a single topic.
const { subscribers } = await link.rpc('server', 'link.topic.list', {
  topic: 'events.user.signup',
});
// → { topic: 'events.user.signup', subscribers: ['analytics', 'mailer'] }

// All topics + subscribers (omit `topic`).
const { topics } = await link.rpc('server', 'link.topic.list', {});
// → { topics: [{ topic: '...', subscribers: [...] }, ...] }

The link. prefix is reserved for built-in server RPCs. You can override any built-in by registering your own handler with the same name (e.g. for permission-aware topic listing).

Directed fire-and-forget

link.send(to, type, data) is the third primitive alongside rpc() and publish():

| | one recipient | many recipients | |-------------|---------------|-----------------| | needs response | link.rpc(to, type, data) | - | | fire-and-forget | link.send(to, type, data) | link.publish(topic, payload) |

Use it for progress notifications, telemetry, signals, or anything where the sender doesn't need a reply and doesn't want the round-trip cost of an RPC.

// Sender
link.send('coordinator', 'job.progress', { jobId: 123, pct: 50 });

// Receiver
link.on('direct', ({ from, type, data, msg }) => {
  if (from === 'worker' && type === 'job.progress') {
    console.log(`[${data.jobId}] ${data.pct}%`);
  }
});

Behavior notes:

  • At-most-once. If the target peer is offline at delivery time, the message is dropped silently - there is no queueing. Use rpc() (with retries on RpcDisconnectError) when you need delivery feedback.
  • Trusted from. The hub overwrites msg.from with the sender's authenticated kind before forwarding, same as RPC and pub/sub. Receivers can rely on from for routing/authorization.
  • Backpressure-aware. Returns false if the local send buffer is over maxBufferedBytes (a 'backpressure' event was emitted). Returns true on success.
  • Hub feature flag. The hub advertises 'direct' in hello.ack.data.features. send() throws FeatureUnsupportedError if the hub doesn't advertise it (including against v0.3.x hubs that don't advertise any features), so you fail loud rather than silently dropping.
  • Validation. Throws synchronously on a missing/non-string to or type, or if the link is not ready.

Dynamic RPC handlers

The constructor's rpcHandlers is the right place for handlers that exist for the lifetime of the client. For plugins or features that come and go - or for "register on every link-up" patterns - use handle() / unhandle():

// Register
const previous = link.handle('auth.refresh', async ({ token }) => {
  return { newToken: await refreshToken(token) };
});

// previous is the previous handler for that rpcType, or undefined.
// Re-registering the same handler on every 'ready' is safe and idempotent.
link.on('verified', () => {
  link.handle('auth.refresh', refreshHandler);
});

// Remove
link.unhandle('auth.refresh');  // → true if a handler was removed, false otherwise

handle() replaces silently - no warning on collision - because the intended use case is plugins that re-register on every reconnect. If you want collision detection, check link.rpcHandlers[rpcType] before calling.

Waiting for events

link.waitFor(event, opts) replaces the verbose await new Promise(r => link.once('ready', r)) pattern with a one-liner that supports a real timeout and an AbortSignal:

// Block until ready, with a hard cap. (For ready specifically, link.ready({ timeoutMs })
// is even tidier - it also calls start() if needed.)
await link.waitFor('ready', { timeoutMs: 5000 });

// Wait for the next peer connect.
const peer = await link.waitFor('peer.connect');
console.log('joined:', peer.kind);

// Cancellable wait.
const ac = new AbortController();
setTimeout(() => ac.abort(), 1000);
try {
  await link.waitFor('peer.disconnect', { signal: ac.signal });
} catch (e) {
  if (e.name === 'AbortError') console.log('cancelled');
}

waitFor always waits for the next occurrence - it does not check current state. For "now or wait" semantics on 'ready', prefer link.ready({ timeoutMs }) which already does the check; for other events, check first:

async function ensurePeer(link, kind, timeoutMs = 5000) {
  if (link.getPeers().some((p) => p.kind === kind)) return;
  while (true) {
    const p = await link.waitFor('peer.connect', { timeoutMs });
    if (p.kind === kind) return;
  }
}

Health snapshot

link.health() returns a synchronous, allocation-light snapshot suitable for /health integrations and dashboards:

const h = link.health();
// {
//   connected:         true,
//   verified:          true,
//   ready:             true,
//   lastVerifiedAt:    1730000000000,  // ms since epoch, or null if never
//   peerCount:         3,
//   pendingRpcCount:   0,
//   subscriptionCount: 2,
//   bufferedAmount:    0,
//   reconnectAttempt:  0,
//   stopped:           false,
// }

Three fields are worth highlighting:

  • ready is the "hub has accepted us, safe to publish/send/rpc" gate. verified only tells you crypto checks pass; ready adds "and the hub said yes". With per-peer-keys hubs, the two can differ briefly.
  • lastVerifiedAt is updated on every verified message, not just the first. That makes it useful for "we connected but haven't heard from the hub in a while" alerts - a bare connected check would happily report green during a one-way ws read failure.
  • reconnectAttempt is non-zero when we're stuck in the reconnect loop. Surface it as a warning in dashboards.

A typical /health integration:

app.get('/health', (req, res) => {
  const h = link.health();
  const stale = h.lastVerifiedAt && (Date.now() - h.lastVerifiedAt) > 60_000;
  const ok = h.connected && h.ready && !stale && h.reconnectAttempt === 0;
  res.status(ok ? 200 : 503).json({ ok, ...h });
});

Errors

rpc() rejects with one of these typed errors. They all extend Error and carry a stable code string, so existing catch (e) { e.message } paths keep working - but instanceof and err.code are the recommended checks for new code:

| Class | code | Thrown when | |---------------------------|-----------------------|----------------------------------------------------------------------------------------------------------| | RpcTimeoutError | RPC_TIMEOUT | The RPC didn't get a response within timeoutMs. | | RpcDisconnectError | RPC_DISCONNECT | The link disconnected (or was stopped) while the RPC was in flight. | | RpcAbortError | RPC_ABORT | The caller-supplied AbortSignal fired before the response arrived. | | RpcRemoteError | RPC_REMOTE | The remote handler threw, or the hub returned an error response (e.g. target offline, unknown rpcType). | | BackpressureError | BACKPRESSURE | The local send buffer was over maxBufferedBytes (synchronous reject for rpc()). | | LinkNotReadyError | LINK_NOT_READY | publish() / send() / rpc() / ready() called before the link is 'ready' (no socket, hello not yet acked, or after stop()). Synchronous reject. | | FeatureUnsupportedError | FEATURE_UNSUPPORTED | publish() / send() called against a hub that doesn't advertise the required feature. | | ProtocolError | PROTOCOL_ERROR | Available for callers that need to throw rather than emit; mostly emitted via 'protocol-error' events today. | | HelloRejectedError | HELLO_REJECTED | The hub rejected the client's hello (hello.ack ok:false). What link.ready() rejects with. | | RpcError (base) | RPC_ERROR | Common base for the RPC errors above. | | LinkError (base) | LINK_ERROR | Common base for everything in this table. |

Each error carries call-site context - to, rpcType, id on the RPC ones; timeoutMs on RpcTimeoutError; bufferedAmount and maxBufferedBytes on BackpressureError; op on LinkNotReadyError; op and feature on FeatureUnsupportedError; reason on ProtocolError and HelloRejectedError. Useful when wiring up retries:

const {
  RpcTimeoutError, RpcDisconnectError, RpcAbortError, RpcRemoteError,
} = require('@presenc3/link-core');

async function rpcWithRetry(link, to, type, data, { tries = 3 } = {}) {
  let lastErr;
  for (let i = 0; i < tries; i++) {
    try {
      return await link.rpc(to, type, data);
    } catch (e) {
      lastErr = e;
      if (e instanceof RpcAbortError)      throw e;       // user wants out
      if (e instanceof RpcRemoteError)     throw e;       // remote said no - don't retry
      if (e instanceof RpcTimeoutError)    continue;      // retry
      if (e instanceof RpcDisconnectError) continue;      // retry
      throw e;                                            // unknown - don't retry
    }
  }
  throw lastErr;
}

Note on RpcRemoteError. The wire format only carries an error string, so RpcRemoteError.message is the remote-supplied string verbatim. The class exists primarily for instanceof discrimination against transport-level failures - your retry policy almost certainly should not retry on RpcRemoteError, because the failure is the remote's, not the link's.

Aborting an in-flight RPC

link.rpc() accepts an options object with { timeoutMs, signal } as the fourth argument:

const ac = new AbortController();
setTimeout(() => ac.abort(), 30_000);

try {
  const result = await link.rpc('worker', 'job.run', payload, {
    timeoutMs: 60_000,
    signal:    ac.signal,
  });
} catch (e) {
  if (e.code === 'RPC_ABORT')   console.log('user cancelled');
  if (e.code === 'RPC_TIMEOUT') console.log('took too long');
}

The legacy positional form link.rpc(to, type, data, timeoutMs) is still accepted - pass a number for the old behavior.

Note. Aborting only releases the caller side: the local pending entry is removed and the promise rejects with RpcAbortError. The wire request is not cancelled - the remote handler may still complete and the response will be logged-and-dropped on arrival. A rpc.cancel wire message is planned for a later release.

Security & threat model

link-core is designed for trusted-network, single-trust-zone deployments - services you run, talking to services you run, on infrastructure you control. Read this section before exposing a hub on a network you don't fully control.

What the protocol gives you

  • Wire integrity. Every message carries an HMAC-SHA256 signature over its envelope. Tampering with any field breaks the signature and the message is dropped. Verification is constant-time.
  • Anti-impersonation between authenticated peers. Once a socket completes the hello handshake, the hub binds it to that kind and overwrites the from field on every forwarded message. A peer cannot forge messages claiming to come from another peer.
  • Replay protection (since v0.4.0). Both client and hub drop messages whose ts is outside the configured replayWindowMs (default ±5 minutes), and remember message ids for that duration to drop duplicates. An attacker who captures a signed message on the wire and re-sends it within the window will see it dropped on the second arrival; outside the window, dropped on the first. Window and id-cache size are configurable; set replayWindowMs: 0 to disable. Caveat: this defends against passive eavesdroppers replaying captured frames. It does not defend against an active attacker who has the shared secret - they can sign their own fresh messages.
  • Bounded incoming payload size (since v0.4.0). Default cap of 1 MiB on incoming WebSocket frames, enforced both at the transport and in-handler. Configurable via maxMessageBytes.
  • Backpressure cap (since v0.4.0). Every outgoing send checks ws.bufferedAmount against maxBufferedBytes (default 4 MiB) and drops or rejects rather than buffering unboundedly. A single slow or stuck peer cannot OOM the hub by refusing to drain its socket.
  • Bounded retained hello (since v0.4.0). The hub sanitizes each peer's hello payload to a small whitelist (kind, name, pid, startedAt) before persisting it in memory, dropping unknown fields and capping string lengths. A peer that puts a 50 MB string in name no longer pins hub memory until restart.

What the protocol does not give you

  • No rate limiting. A client with a valid secret can flood the hub (within the per-message size cap).
  • No transport encryption built-in. Run behind wss:// (terminated at a reverse proxy or directly) if the network isn't trusted. The HMAC protects against tampering, not eavesdropping.
  • In shared-secret mode, no per-peer credentials. The shared secret is all-powerful: anyone with it can connect as any kind of their choosing. They cannot impersonate per-message after handshake (see above), but they can hello-bomb to displace a legitimate peer. Use per-peer keys (since v0.4.0) to remove this concern.

Deployment recommendations

  • Run the hub on localhost or a private network. If you must cross a network boundary, terminate wss:// at a reverse proxy (nginx, Caddy, Cloudflare) - don't expose ws:// over the public internet.
  • Treat secrets like database passwords. Don't commit them. With per-peer keys, you can rotate one peer's key without restarting everyone.
  • The /state route on createHubServer exposes peer kinds, hello payloads, and last-known status. In v0.5 the default is enableStateRoute: false; opt in explicitly for internal dashboards / dev (createHubServer({ enableStateRoute: true })). If you opt in and bind to 0.0.0.0, the hub also emits an informational warning at startup.
  • When embedding the hub inside an existing HTTP server (server: yourServer), always pass a path unless you're certain no other code on that server uses WebSocket upgrades. Without it, the hub intercepts every upgrade.

Per-peer keys

By default, createHub({ secret: 'sometext' }) runs in shared-secret mode - every peer signs and verifies with the same key. This is fine for tightly-scoped, fully-trusted deployments (a coordinator and three workers in the same VPC), but it has a downside: the secret is all-powerful. Anyone holding it can authenticate as any kind. Rotating it requires a coordinated restart of every peer.

Per-peer keys (since v0.4.0) lets you give each peer its own HMAC key. Pass an object or a function as secret:

// Static map. Hellos for unknown kinds are silently dropped
const hub = createHub({
  secret: {
    coordinator: process.env.LINK_KEY_COORD,
    'worker-a':  process.env.LINK_KEY_WORKER_A,
    'worker-b':  process.env.LINK_KEY_WORKER_B,
  },
});
// Dynamic resolver. Async permitted; returning null means "no key for that kind"
const hub = createHub({
  secret: async (kind) => {
    const v = await vault.get(`link/${kind}`);
    return v ? v.value : null;
  },
});

Under the hood, the hub becomes a re-signing relay: it verifies each incoming message with the sender's key, then re-signs each outgoing forward (peers.update, status broadcasts, RPC requests/responses, topic messages, direct messages) with the recipient's key. The from field is still stamped from the authenticated socket, so the trust property "a peer cannot forge messages from another peer" is preserved. The wire format does not change - only the key used for HMAC differs.

Each LinkClient only ever needs its own key:

// On worker-a
const link = new LinkClient({
  url:    'ws://hub:8080',
  secret: process.env.LINK_KEY_WORKER_A,   // its own key only
  kind:   'worker-a',
});

What this gives you, vs shared-secret:

  • Revocation per peer. Drop a kind from the map, or have the resolver return null, and that peer can no longer authenticate. Other peers keep working without restart.
  • Compromise containment. A leaked worker-a key cannot be used to authenticate as coordinator (the hub will look up coordinator's key and fail to verify the leaked-key signature).
  • Auditable identity. Operators can see exactly which peers exist by the registered kinds. (In dynamic-resolver mode, by querying the backing vault/DB.)

What it doesn't change:

  • The wire envelope (still HMAC over JSON; clients running v0.3.x can connect to v0.4 hubs in shared-secret mode without changes).
  • The hub trust model in general - the hub still sees plaintext, still controls fan-out, still trusts the bound kind after hello. End-to-end encryption is not in scope.
  • Rate limiting (still none). A peer with a valid key can still flood within the per-message size cap.

Hello rejection behavior. When a hello cannot be verified - wrong key, unknown kind, or malformed - the hub drops the message silently and lets the pre-hello timeout (default 10 s) reap the socket. The hub does not return hello.ack ok:false for these cases, on purpose: it would confirm to an attacker that a particular kind exists. The client sees the diagnostic protocol-error reason 'no-ack' after helloAckDiagnosticMs and reconnects with growing backoff.

If your application does want explicit rejection feedback (e.g. for an internal admin tool), surface it via your own RPC after 'ready' rather than embedding it in the auth handshake.

Helpers

Since v0.5.0, link-core ships a collection of helper functions covering the patterns that kept showing up in my own use in services - a leveled logger, env coercion, observability listener bundles, RPC retries and safe-publish wrappers, graceful shutdown, a secrets loader for secrets vault client, and a dashboard-friendly event recorder.

Everything is also reachable via the ./helpers subpath if you'd rather keep the helper namespace separate from the protocol/client/hub surface at the package root:

// flat from the root
const { createLogger, attachClientObservability, loadSecrets } = require('@presenc3/link-core');

// namespaced via the subpath
const helpers = require('@presenc3/link-core/helpers');
helpers.createLogger();

Both forms point at the same functions. Pick whichever fits the call site.

Logger - createLogger

A four-level logger (DEBUG/INFO/WARN/ERROR) with a structured [HH:MM:SS.mmm] [context] message prefix and an optional errorSink for mirroring errors to a webhook, Sentry, etc.

const { createLogger, LEVELS } = require('@presenc3/link-core');

const log = createLogger({
  minLevel: LEVELS.INFO,                 // or 'INFO' / 'DEBUG' / etc
  errorSink: async (ctx, msg, err) => {  // optional; receives Error instances passed to lE()
    await postToDiscord({ ctx, msg, err });
  },
});

log.l ('boot', 'starting up');            // info
log.lD('boot', 'verbose detail');         // debug (suppressed at minLevel=INFO)
log.lW('link', 'unexpected, but recoverable');
log.lE('link', 'init failed', err);       // also fires errorSink

All four methods take (context, message, ...args). The errorSink is fire-and-forget - sink failures are logged via console.error and never propagate.

The default minLevel is INFO when NODE_ENV === 'production' and DEBUG otherwise. Override via setMinLevel(...) at runtime if you need to switch on the fly.

Env - num, bool, requireEnv, linkClientOptionsFromEnv

Coercion helpers that return undefined for missing/empty inputs (so callers can ?? defaultValue without sentinel collisions on 0 / false), plus a one-shot assembler for the standard LinkClient option bag.

const { LinkClient } = require('@presenc3/link-core');
const { requireEnv, linkClientOptionsFromEnv } = require('@presenc3/link-core');

requireEnv(['LINK_URL', 'LINK_KIND', 'LINK_SECRET']);   // throws listing every missing key

const link = new LinkClient({
  /* reads
   * LINK_URL,    LINK_KIND,
   * LINK_SECRET, LINK_HASH_ALGO,
   * LINK_PERMESSAGE_DEFLATE, LINK_RECONNECT_ON_REJECTION,
   * LINK_RECONNECT_JITTER,   LINK_REPLAY_WINDOW_MS,
   * LINK_MAX_RECENT_IDS,     LINK_MAX_MESSAGE_BYTES,
   * LINK_MAX_BUFFERED_BYTES
  **/
  ...linkClientOptionsFromEnv(),
  name: 'My Service',
  makeStatus,
  rpcHandlers,
  // A LeveledLogger from createLogger() satisfies the two-method
  // Logger shape directly - no adapter required. Pre-v0.5 code used
  // `{ log: log.l, warn: log.lW }`; that still works for back-compat.
  logger: log,
});

Pass linkClientOptionsFromEnv(env, { envPrefix: 'FOO_' }) to read FOO_URL / FOO_KIND / etc. instead.

Observability - attachClientObservability, attachHubObservability

Wire the standard listener bundles onto a LinkClient or hub EventEmitter. Membership churn at info, security-relevant drops at warn, per-RPC trace at debug (or info when verbose: true); protocol-error reasons are classified into "concerning" (worth surfacing) vs "noisy" (clock drift, dedupe) and routed accordingly.

const { attachClientObservability, attachHubObservability } = require('@presenc3/link-core');

attachClientObservability(link, { logger: log, context: 'link' });
attachHubObservability(server.hub, { logger: log, context: 'hub' });

To extend the default reason classification without losing the built-ins, pass extraConcerningReasons: ['my-custom-reason']. To replace it outright, pass concerningReasons: [...]. The defaults are also exported (DEFAULT_CLIENT_CONCERNING_REASONS, DEFAULT_HUB_CONCERNING_REASONS) if you want to inspect or build atop them.

RPC + topic helpers - waitForPeer, rpcWithRetry, createSafePublisher, createSafeSend

waitForPeer(link, kind, { timeoutMs, requireConnected }) blocks until a peer of the given kind appears (and, by default, is connected). Event-driven - uses link.waitFor('peer.connect', ...) internally, no polling.

await waitForPeer(link, 'vault', { timeoutMs: 30_000 });

rpcWithRetry(link, to, type, data, { tries, timeoutMs, baseDelayMs, signal }) retries on transient failures (RpcTimeoutError, RpcDisconnectError) but never on RpcAbortError (caller cancelled) or RpcRemoteError (handler said no). Linear backoff with jitter.

const result = await rpcWithRetry(link, 'worker', 'job.run', payload, { tries: 3 });

createSafePublisher(link, { logger }) and createSafeSend(link, { logger }) wrap link.publish / link.send so they never throw on the common transient conditions (mid-reconnect, hub doesn't advertise topics/direct). The first feature-unsupported skip logs at warn, subsequent skips at debug - useful when running against a v0.3-era hub that doesn't advertise capabilities and you'd otherwise drown in identical warnings.

const publish = createSafePublisher(link, { logger: log, context: 'handlers' });
publish('user.changed', { id: 42 });   // returns boolean - false on any drop

const send = createSafeSend(link, { logger: log, context: 'fanout' });
send('worker', 'job.queued', { id: 42 });

Pass featureCheck: true to short-circuit the publish/send entirely when link.hubFeatures doesn't include the feature - avoids one wasted throw-per-call on v0.3 hubs.

Lifecycle - installProcessHandlers, createGracefulShutdown

Kept separate so the shutdown procedure can be unit-tested without touching process.on(...).

const { installProcessHandlers, createGracefulShutdown } = require('@presenc3/link-core');

const shutdown = createGracefulShutdown({
  logger: log,
  timeoutMs: 30_000,
  steps: [
    () => link.stop(),
    async () => { await closeDBs(); },
  ],
});

installProcessHandlers({ shutdown, logger: log });

installProcessHandlers wires up SIGINT, SIGTERM, uncaughtException, and unhandledRejection. Returns an uninstall() function that removes everything it added (useful in tests).

createGracefulShutdown runs its steps sequentially and is bounded by a watchdog timer that force-exits if anything hangs. A throwing step is logged via lE but does not stop subsequent steps from running. Calling the returned shutdown() while one is already in progress is a no-op.

Secrets - loadSecrets

Fetch a { envName: 'sec/<ns>/<rest>' } mapping from the link_secs vault peer at boot, optionally subscribing to secs.changed.<ns> for hot-reload on rotation.

const { loadSecrets, LOADED_SECRETS_UNWATCH } = require('@presenc3/link-core');

const cfg = await loadSecrets(link, {
  OPENAI_API_KEY: 'sec/shared/openai',
  SENTRY_DSN:     'sec/datastore/sentry-dsn',
}, {
  watch: true,                        // requires a v0.4+ hub
  logger,                             // optional LeveledLogger; uses console.warn otherwise
  onChange: ({ name, action, newValue }) => {
    // rebuild your frozen cfg snapshot, re-init clients, etc.
  },
});

// later, on shutdown / test teardown:
cfg[LOADED_SECRETS_UNWATCH]?.();

loadSecrets waits for link.ready() and for a vault peer to be present before fetching, both bounded by a single cumulative timeoutMs budget (default 30 s; every individual secs.get RPC shares the same deadline). It throws if any secret is missing on the initial load or returned with the wrong type - fail-fast at boot is much better than silently running with undefined keys.

The returned object is mutated in-place when watched secrets change, so a frozen snapshot held by the caller will go stale. Use onChange to rebuild your own snapshot.

When watch: true, the returned object also carries a non-enumerable [LOADED_SECRETS_UNWATCH] method (Symbol-keyed so it can't collide with a secret env name) that removes the rotation subscriptions. Idempotent and only removes the helper's own subscriptions; caller-installed handlers on the same topic are untouched.

Rotation event schema. When watch: true, the helper subscribes to secs.changed.<ns> for every namespace referenced in the mapping. The vault is expected to publish payloads of shape:

{
  path: string,                // e.g. 'sec/shared/openai'
  action: 'set' | 'del'        // 'set' triggers a refetch via secs.get; 'del' clears the key locally
}

Paths the helper doesn't care about are silently ignored, so it's safe for the vault to broadcast a single secs.changed.<ns> event covering all paths in that namespace.

Event recorder - createEventRecorder

A dashboard-friendly observer that wraps a LinkClient with a bounded ring buffer of recorded events plus a snapshot of current bus state. Designed for SSE consumers (or any "what's happening right now" panel) that want a single subscribe call and don't want to assemble peer / status / health / event-log themselves on every frame.

const { createEventRecorder } = require('@presenc3/link-core');

const recorder = createEventRecorder(link, {
  ringSize:            30,    // default; max events kept
  heartbeatIntervalMs: 1000,  // default; periodic snapshot emit. 0 disables
  startedAt:           Date.now(),
});

// SSE consumer: gets the current snapshot immediately, then every
// time something interesting happens on the bus (peer joins/leaves,
// hub ready/disconnect, peer status), plus a heartbeat tick
const unsub = recorder.onSnapshot((snap) => {
  res.write(`data: ${JSON.stringify(snap)}\n\n`);
});

// ... later
unsub();
recorder.close();   // detach all listeners, clear heartbeat

The snapshot shape is:

{
  connected, ready,
  self: { kind, name, features },
  peers,         // link.getPeers()
  statuses,      // { [peerKind]: link.getPeerStatus(kind) } for every peer with a status
  startedAt,     // the option you passed in
  health,        // link.health() snapshot, or null
  eventLog,      // copy of the ring buffer
  at,            // Date.now() at snapshot build time
  _reason,       // 'tick' | 'ready' | 'peer.connect' | ... - present on auto-emits
}

Recorded event shape is { kind, from, ...detail, t }. The kind taxonomy normalizes hub-side events into a dashboard-friendly vocabulary:

| kind | trigger | snapshot? | |-------------------|------------------------------------------|-----------| | hub-up | link event 'ready' | yes | | rejected | link event 'rejected' | yes | | hub-down | link event 'disconnect' | yes | | join | link event 'peer.connect' | yes | | leave | link event 'peer.disconnect' | yes | | status | link event 'peer.status' (non-self) | yes | | protocol-error | link event 'protocol-error' | no | | backpressure | link event 'backpressure' | no | | rpc-fail | link event 'rpc.complete' with ok:false | no | | direct | link event 'direct' | no |

Membership-and-lifecycle events also fire a snapshot emit, so a subscriber doesn't wait up to one heartbeat for the first frame after a peer joins. The heartbeat is the floor - it guarantees a frame even on a fully idle network so the dashboard's "last beat" age label keeps moving. Set heartbeatIntervalMs: 0 to disable if your transport doesn't need it.

The recorder is observation-only: it doesn't call publish / send / rpc and doesn't depend on hub features. It works against any v0.4+ LinkClient. Combine with attachClientObservability if you also want log lines for these events - they're independent.

For per-event subscribers (e.g. a live log panel that wants every event individually, not a full snapshot), use recorder.onEvent(fn). That one does not replay history on subscribe; call recorder.getRecent() first if you need the ring buffer too.

close() is idempotent and detaches every listener the recorder added to the underlying LinkClient. Always call it before discarding the recorder, or you'll leak listeners across reconnects.

API

new LinkClient(options)

| Option | Type | Required | Default | Description | |-------------------------|------------|----------|---------------|------------------------------------------------------------------------------------------------| | url | string | yes | | WebSocket URL of the hub (e.g. ws://localhost:8080). | | secret | string | yes | | HMAC secret. In shared-secret hub mode, must match every peer; in per-peer-keys mode, only this peer's key (the hub looks up the right key by kind). | | kind | string | yes | | Service type identifier (e.g. 'worker'). Used for routing and peer lookup. | | name | string | no | kind | Human-readable instance name. | | makeStatus | function | no | | Called on connect and every statusIntervalMs; return value is sent as a status.update. | | rpcHandlers | object | no | {} | Map of rpcType → async (rpcData, msg) => result. Thrown errors become RPC error responses. | | logger | object\|null | no | console | Custom logger with log and warn methods, or null to silence. | | defaultRpcTimeoutMs | number | no | 5000 | Default per-call RPC timeout. Per-call timeoutMs overrides. | | statusIntervalMs | number | no | 10000 | Cadence for automatic status.update pushes when makeStatus is set. | | reconnectInitialMs | number | no | 1000 | Initial reconnect delay. | | reconnectMaxMs | number | no | 10000 | Maximum reconnect delay (cap). | | reconnectGrowth | number | no | 1.5 | Multiplicative backoff factor between failed reconnect attempts. | | helloAckDiagnosticMs | number | no | 5000 | If no verified message arrives within this many ms of open, warn (likely secret mismatch). Set to 0 to disable. | | replayWindowMs | number | no | 300000 | Replay-protection window. Messages with ts outside ±this from now are dropped, and message ids are remembered for this duration. Set to 0 to disable. | | maxRecentIds | number | no | 10000 | Cap on remembered message ids (LRU). | | maxMessageBytes | number | no | 1048576 | Maximum incoming WebSocket frame size, in bytes (1 MiB). | | maxBufferedBytes | number | no | 4194304 | Cap on ws.bufferedAmount before sends are dropped (4 MiB). Status updates, publish(), and send() silently drop with a 'backpressure' event; rpc() rejects synchronously with BackpressureError (err.code === 'BACKPRESSURE'). | | hashAlgo | string | no | 'sha256' | HMAC hash algorithm. Must match the hub. | | perMessageDeflate | boolean \| object | no | false | Pass-through to the underlying ws client. false disables compression. true accepts library defaults; pass an options object for fine control. Off by default - see notes below. | | reconnectOnRejection | boolean | no | false | Behavior on hello.ack ok:false. Default false calls stop() to avoid hot-looping. Set true to keep retrying with backoff (only useful if you expect the hub's key registry to admit your kind at runtime). |

Methods:

  • start() - Connect to the hub. No-op if url, secret, or kind is missing (just logs a warning), or if a connection is already open or in flight. If a previous socket is in CLOSING/CLOSED state, its listeners are detached so its eventual close can't interfere with the new connection.
  • stop() - Close the connection, cancel timers, and reject any pending RPCs with RpcDisconnectError ("Link stopped before RPC completed"). The client will not auto-reconnect after stop().
  • isConnected() - true if the WebSocket is open. Note that "open" doesn't imply "ready" - the hub may still reject the hello. Use isReady() or the 'ready' event for the gate.
  • isReady() - true if the hub has accepted the hello (the 'ready' event has fired since the last connect). This is the safe-to-publish/send/rpc gate.
  • ready(opts?) - Returns a Promise that resolves with { kind, features } when the link is ready. Calls start() if not already running. Resolves immediately if already ready. Rejects with HelloRejectedError on hello.ack ok:false, with a timeout Error if opts.timeoutMs elapses, or with AbortError if opts.signal aborts. See Connection lifecycle.
  • rpc(to, rpcType, rpcData, optsOrTimeoutMs?) - Send an RPC to a peer of kind to (or 'server' for hub-handled RPCs) and return a Promise that resolves with the result or rejects with a typed error. The fourth argument can be a number (legacy positional timeoutMs) or an object { timeoutMs?, signal? } - see Aborting an in-flight RPC. Rejection types: LinkNotReadyError (called before the link is ready - synchronous), RpcTimeoutError, RpcDisconnectError, RpcAbortError, RpcRemoteError (remote handler threw or hub returned an error), or BackpressureError.
  • send(to, type, data) - Directed fire-and-forget. Returns true if sent, false if dropped due to local backpressure. Throws LinkNotReadyError if not ready or FeatureUnsupportedError if the hub doesn't advertise the 'direct' feature. See Directed fire-and-forget. At-most-once: not queued if the target is offline.
  • handle(rpcType, fn) - Register or replace the handler for rpcType at runtime. Returns the previous handler, or undefined. Designed for "register on every 'ready'" plugin patterns; replaces silently. See Dynamic RPC handlers.
  • unhandle(rpcType) - Remove the handler for rpcType. Returns true if a handler was removed, false if there was none.
  • waitFor(event, opts?) - Wait for the next occurrence of event and resolve with its payload. opts is { timeoutMs?, signal? }. With timeoutMs > 0, rejects on timeout; with an aborted signal, rejects with an AbortError-named error. See Waiting for events.
  • health() - Synchronous snapshot of { connected, verified, ready, lastVerifiedAt, peerCount, pendingRpcCount, subscriptionCount, bufferedAmount, reconnectAttempt, stopped }. See Health snapshot.
  • subscribe(topic, handler) - Register handler(payload, msg) for a topic. Multiple handlers per topic are allowed and share one hub-side subscription. Throws on invalid topic. Subscribing while disconnected is fine - the subscription is replayed automatically after each reconnect.
  • unsubscribe(topic, handler?) - Remove handler from topic, or all handlers if omitted. Returns true if anything was removed locally. The hub-side subscription is dropped only when the last handler for the topic is removed.
  • publish(topic, payload) - Publish to a topic. Throws LinkNotReadyError if not ready or FeatureUnsupportedError if the hub doesn't support topics. Returns true if sent, false if dropped due to local backpressure (in which case 'backpressure' was emitted). At-most-once: not queued for offline subscribers.
  • getPeers() - Returns the latest peer list from the hub: [{ kind, hello, connectedAt, connected }, ...].
  • getPeerStatus(kind) - Returns the last known status for a peer of that kind, or null.

Properties:

  • hubFeatures - The capability list announced by the hub in hello.ack, e.g. ['topics', 'direct'] for a v0.4.0+ hub. null until the first verified message arrives. An empty array means the hub didn't advertise any features (likely a v0.3.x hub). Use link.hubFeatures?.includes('topics') to pre-flight feature availability.
  • rpcHandlers - The current map of rpcType → handler. Initially populated from the constructor; mutated by handle() / unhandle(). Treat as read-only - go through handle() for runtime changes.

Events

LinkClient extends EventEmitter. All events are additive - existing code that doesn't subscribe to anything continues to work.

| Event | Payload | Fired when | |--------------------|----------------------------------------------------------------------|---------------------------------------------------------------------------| | 'connect' | { url, kind } | Underlying WebSocket has opened and hello has been sent. | | 'verified' | { kind } | First signed-and-verified message arrived. Crypto checks pass. Not yet a "safe to publish" gate - see 'ready'. | | 'ready' | { kind, features } | Hub accepted the hello (hello.ack.ok !== false). The "really connected, safe to use" event. features is the hub's capability list (['topics','direct'] for v0.4); null for hubs that don't advertise. | | 'rejected' | { reason, error } | Hub rejected the hello (hello.ack.ok === false). Default behavior: client stop()s itself - set reconnectOnRejection: true to keep retrying. | | 'disconnect' | { code?, reason, willReconnect, wasReady } | WebSocket closed. willReconnect is false after stop() or after a hello rejection (default). wasReady indicates whether 'ready' had fired during this connection. | | 'reconnecting' | { delayMs, attempt } | A reconnect attempt is scheduled. | | 'ws-error' | Error | Underlying WebSocket error. | | 'protocol-error' | { reason, type?, msg?, size?, skew?, error? } | A message was rejected. reason is one of 'parse-error', 'bad-signature', 'bad-version', 'replay-window', 'replay-id', 'missing-id', 'oversize', 'no-ack'. | | 'backpressure' | { type, to?, rpcType?, bufferedAmount } | A send was dropped (or an RPC rejected) because ws.bufferedAmount exceeded maxBufferedBytes. | | 'message' | { msg, raw } | Power-user firehose: every verified message post-checks. | | 'peer.connect' | peer (PeerInfo) | A new peer kind appeared in the latest peers.update. | | 'peer.disconnect'| peer (PeerInfo) | A peer kind disappeared from the latest peers.update. | | 'peer.replaced' | { kind, prevPeer, peer } | A peer of the same kind reconnected with a fresh socket (different connectedAt). Both prevPeer and peer are PeerInfo objects, so peer.connectedAt and prevPeer.connectedAt give you the old and new stamps. Fires after internal peers state has been updated, so link.getPeers() from inside the handler reflects the new connection. Useful for tearing down per-connection state without inferring it from the disconnect/connect pair (which doesn't fire on same-kind replacement). (since v0.5) | | 'peer.status' | { from, status, at } | A peer broadcast a status update. | | 'rpc.request' | { from, rpcType, rpcData, msg } | An incoming RPC request was received (fires before the handler runs). | | 'rpc.timeout' | { id, to, rpcType, timeoutMs } | A pending outbound RPC timed out. | | 'rpc.abort' | { id, to, rpcType } | An outbound RPC was aborted via its AbortSignal. | | 'rpc.disconnect' | { id, to, rpcType } | An outbound RPC was orphaned by a disconnect (fires before the rejection). | | 'rpc.complete' | { id, to, rpcType, ok, reason, durationMs, error } | Unified outbound-RPC lifecycle event. Fires exactly once per rpc() call. reason is null on success, or one of 'timeout' \| 'abort' \| 'disconnect' \| 'not-ready' \| 'remote-error' \| 'send-error' \| 'backpressure' on failure. id and durationMs are always populated, including for synchronous pre-send rejections. | | 'direct' | { from, type, data, msg } | A directed fire-and-forget message arrived (sent via link.send). from is the sender's authenticated kind (hub-stamped), type is the application-level message type. |

Note: socket and protocol failures are emitted as 'ws-error' and 'protocol-error', not as the bare 'error' event. This means an unhandled emit doesn't crash the process - appropriate for a long-lived background client.

link.on('ready',          ({ features }) => console.log('hub link is up; supports', features));
link.on('rejected',       ({ reason })   => console.error('hub rejected:', reason));
link.on('peer.connect',   (p)            => console.log(`${p.kind} joined`));
link.on('protocol-error', (e)            => alerting.warn(`link protocol-error: ${e.reason}`));
link.on('rpc.complete',   (i)            => metrics.rpc.observe(i.durationMs, { ok: i.ok, reason: i.reason }));

perMessageDeflate. Off by default. permessage-deflate has had memory-amplification CVEs against malicious peers; the safe default is to leave it off and only enable on trusted networks where you control both ends. When enabling, prefer the options-object form so you control the trade-offs - e.g. perMessageDeflate: { threshold: 1024, serverMaxWindowBits: 10 } to set a minimum compressible size and constrain the decompression buffer.

createHub(options)

const { createHub } = require('@presenc3/link-core');

Returns an EventEmitter with attach, getState, health, and stop methods.

| Option | Type | Required | Default | Description | |-----------------------|----------------|----------|---------------|-----------------------------------------------------------------------------------------------| | secret | string \| Record<string,string> \| (kind) => string \| Promise<string> | yes | | HMAC secret. string = shared mode (back-compat). Object = per-peer keys map. Function = dynamic resolver. See Per-peer keys. | | rpcHandlers | object | no | {} | Map of rpcType → async (rpcData, msg) => result for RPCs addressed to 'server'. | | logger | object\|null | no | console | Custom logger with log and warn methods, or null to silence. | | keepaliveIntervalMs | number | no | 15000 | Ping cadence for liveness detection (post-hello sockets only). | | helloTimeoutMs | number | no | 10000 | Time after socket open to wait for a successful hello. Closes the socket if missed. Set to 0 to disable. | | maxPendingSockets | number | no | 1024 | Cap on concurrent un-authenticated sockets. When exceeded, the oldest pending socket is force-closed (FIFO eviction) and emits peer.timeout with reason: 'pending-cap'. Defends against attackers opening many TCP connections and never speaking. (since v0.5) | | replayWindowMs | number | no | 300000 | Replay-protection window (matching the client). Set to 0 to disable. | | maxRecentIds | number | no | 10000 | Cap on remembered message ids. | | maxMessageBytes | number | no | 1048576 | Defensive in-handler size check. createHub is transport-agnostic; set maxPayload on your WebSocketServer separately. (createHubServer does this for you.) | | maxBufferedBytes | number | no | 4194304 | Per-peer cap on ws.bufferedAmount before sends to that peer are dropped (4 MiB). Drops are logged. RPC forwards to a backpressured peer return an error response to the original caller. | | hashAlgo | string | no | 'sha256' | HMAC hash algorithm. Must match every client. |

Methods:

  • attach(ws, req) - Wire up an incoming WebSocket. Call this from your WebSocketServer's 'connection' handler. The hub takes care of hello handshake, message verification, routing, keep-alive, and cleanup.
  • getState() - Returns { peers, lastStatus } for introspection (e.g. health endpoints).
  • health() - Returns { peerCount, pendingSocketCount, topicCount, totalSubscribers, recentIdsSize, statusCount }. Cheap synchronous snapshot for /health integrations.
  • stop() - Cancel timers, close all sockets, remove all event listeners. Call on shutdown.

Hub events

The returned EventEmitter emits the following events. A