npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

streamfence-js

v1.1.0

Published

Production-ready delivery control for Node.js Socket.IO servers - backpressure, retries, queue protection.

Downloads

58

Readme

StreamFenceJs - Embeddable JS Socket.IO Server Library

Production-ready delivery control for Node.js Socket.IO servers — backpressure, retries, queue protection, and configurable per-namespace delivery modes.

TypeScript-first port of the Java StreamFence library.


Table of contents


What it is

StreamFenceJs wraps your Socket.IO server with a delivery control layer that prevents clients from being overwhelmed, ensures critical messages arrive even over unreliable connections, and gives you fine-grained observability into what happens when things go wrong.

Each Socket.IO namespace gets its own delivery policy: choose between fire-and-forget BEST_EFFORT or acknowledged AT_LEAST_ONCE delivery, configure per-client queue limits, and select an overflow strategy. The library handles per-client queuing, backpressure, retry scheduling, and Prometheus metrics — so your application code just calls server.publish().


When to use one server vs two

For most production workloads, run two separate servers:

| Server | Port | Namespaces | Delivery | |---|---|---|---| | Feed server | 3000 | /feed, /prices, /updates | BEST_EFFORT — high-frequency, loss-tolerant | | Control server | 3001 | /commands, /alerts | AT_LEAST_ONCE — low-frequency, reliable |

Why separate ports? AT_LEAST_ONCE retries and acknowledgment tracking add per-message overhead. Mixing reliable and best-effort traffic on one server causes the reliable path's queue pressure to affect broadcast latency. Separating them keeps each server tuned to its workload.

Both servers can run in the same Node.js process.


Install

npm install streamfence-js

Requires Node.js >= 20. If you want Prometheus metrics, install prom-client (peer dependency, version >=14):

npm install prom-client

Quick start

import {
  StreamFenceServerBuilder,
  NamespaceSpec,
  DeliveryMode,
  OverflowAction,
} from 'streamfence-js';

const feedSpec = NamespaceSpec.builder('/feed')
  .topic('snapshot')
  .deliveryMode(DeliveryMode.BEST_EFFORT)
  .overflowAction(OverflowAction.DROP_OLDEST)
  .maxQueuedMessagesPerClient(128)
  .build();

const server = new StreamFenceServerBuilder()
  .port(3000)
  .namespace(feedSpec)
  .buildServer();

await server.start();
console.log('Listening on port', server.port);

// Publish to all subscribers of /feed > snapshot
server.publish('/feed', 'snapshot', { price: 42.5, ts: Date.now() });

// Publish to a specific client only
server.publishTo('/feed', clientId, 'snapshot', { price: 42.5, ts: Date.now() });

// Graceful shutdown
process.on('SIGINT', async () => {
  await server.stop();
  process.exit(0);
});

Client-side protocol

StreamFenceJs uses a simple event-based protocol over Socket.IO. Messages arrive on an event named after the topic name (not a generic topic-message event).

Subscribing

import { io } from 'socket.io-client';

const socket = io('http://localhost:3000/feed', {
  transports: ['websocket'],
});

socket.on('connect', () => {
  // Subscribe to a topic. Pass a token if auth is enabled on the namespace.
  socket.emit('subscribe', { topic: 'snapshot', token: null });
});

Receiving messages

Messages are emitted on the topic name as the Socket.IO event:

// Listen for messages on the 'snapshot' topic
socket.on('snapshot', (payload) => {
  console.log('Received snapshot:', payload);
  // payload is whatever object was passed to server.publish()
});

Acknowledging messages (AT_LEAST_ONCE only)

For AT_LEAST_ONCE namespaces, the server wraps each message with metadata. You must acknowledge receipt so the server does not retry:

socket.on('alerts', (payload, metadata) => {
  console.log('Alert:', payload);

  // Acknowledge the message to stop retries
  if (metadata?.ackRequired) {
    socket.emit('ack', { topic: metadata.topic, messageId: metadata.messageId });
  }
});

If the server does not receive an ack within ackTimeoutMs, it will re-send the message up to maxRetries times.

Unsubscribing

socket.emit('unsubscribe', { topic: 'snapshot' });

Config file loading

Instead of building servers programmatically you can load them from a YAML or JSON config file. A single file can define multiple named server entries.

import { StreamFenceServerBuilder } from 'streamfence-js';

const feedServer = StreamFenceServerBuilder
  .fromYaml('./streamfence.yaml', { server: 'feed' })
  .buildServer();

const controlServer = StreamFenceServerBuilder
  .fromYaml('./streamfence.yaml', { server: 'control' })
  .buildServer();

You can continue customising the builder after loading:

const server = StreamFenceServerBuilder
  .fromYaml('./streamfence.yaml', { server: 'feed' })
  .listener(myEventListener)
  .metrics(new PromServerMetrics(register))
  .buildServer();

Config file schema

servers:
  feed:                                    # server name — used in fromYaml/fromJson
    host: "0.0.0.0"                        # optional, default "0.0.0.0"
    port: 3000                             # required
    transport: WS                          # optional — WS | WSS, default WS
    engineIoTransport: WEBSOCKET_OR_POLLING  # optional — WEBSOCKET_ONLY | WEBSOCKET_OR_POLLING
    auth: NONE                             # optional — NONE | TOKEN, default NONE
    spillRootPath: ".streamfence-spill"    # optional, default ".streamfence-spill"
    tls:                                   # optional — required when transport: WSS
      certChainPemPath: "/etc/ssl/cert.pem"
      privateKeyPemPath: "/etc/ssl/key.pem"
      protocol: "TLSv1.3"                  # optional, default TLSv1.3
    namespaces:
      - path: /feed                        # required — must start with /
        topics: [snapshot, delta]          # required — at least one
        deliveryMode: BEST_EFFORT          # optional — BEST_EFFORT | AT_LEAST_ONCE
        overflowAction: DROP_OLDEST        # optional — see Overflow policies below
        maxQueuedMessagesPerClient: 128    # optional, default 64
        maxQueuedBytesPerClient: 1048576   # optional, default 524288 (512 KiB)
        ackTimeoutMs: 1000                 # optional, default 1000
        maxRetries: 0                      # optional, default 0
        coalesce: false                    # optional, default false
        allowPolling: true                 # optional, default true
        maxInFlight: 1                     # optional, default 1
        authRequired: false                # optional, default false

JSON format is also supported — same structure, .json extension. Use fromJson() instead of fromYaml().


Delivery modes

| Mode | Guarantee | Acks | Use case | |---|---|---|---| | BEST_EFFORT | At most once | None | Live feeds, price tickers, position updates | | AT_LEAST_ONCE | At least once | Required | Commands, alerts, critical state changes |

AT_LEAST_ONCE constraints

AT_LEAST_ONCE namespaces enforce the following at build time:

| Constraint | Reason | |---|---| | overflowAction must be REJECT_NEW or SPILL_TO_DISK | Other overflow actions silently discard messages, breaking at-least-once semantics. SPILL_TO_DISK is allowed because it preserves every message on disk. | | coalesce must be false | Coalescing would replace messages that need individual acknowledgement | | maxRetries must be >= 1 | At-least-once semantics require at least one retry attempt | | maxInFlight must not exceed maxQueuedMessagesPerClient | In-flight limit cannot be larger than the queue itself |


Overflow policies

Applied when a client's per-topic queue is full and a new message arrives.

| Action | Behaviour | Best for | |---|---|---| | REJECT_NEW | Incoming message rejected; publisher receives QueueOverflowEvent | AT_LEAST_ONCE; reliable back-pressure | | DROP_OLDEST | Oldest queued message dropped; new message accepted | Live feeds where stale data is harmless | | COALESCE | Most recent same-key entry replaced with new one; if no matching key found, message is rejected | Ticker data — only latest value per key matters | | SNAPSHOT_ONLY | All queued messages discarded; only new message kept | Single-value snapshot feeds | | SPILL_TO_DISK | Excess messages persist to disk; transparently recovered during drain | High-throughput feeds that cannot tolerate drops |


Spill to disk

When a namespace uses OverflowAction.SPILL_TO_DISK, messages that exceed the in-memory queue limit are written to disk files under a configurable root directory. During drain, the queue transparently refills from disk in FIFO order, so all messages are delivered in the order they were published.

How it works

  1. Messages are enqueued in-memory up to maxQueuedMessagesPerClient.
  2. When the in-memory queue is full, new messages are serialized to JSON and written to individual files on disk using atomic writes (write to .tmp, then rename).
  3. When the in-memory queue drains, entries are recovered from disk and re-enqueued.
  4. When a client disconnects, all spill files for that client are cleaned up.

Configuration

const server = new StreamFenceServerBuilder()
  .port(3000)
  .spillRootPath('/var/data/streamfence-spill')  // default: '.streamfence-spill'
  .namespace(
    NamespaceSpec.builder('/feed')
      .topic('data')
      .deliveryMode(DeliveryMode.BEST_EFFORT)
      .overflowAction(OverflowAction.SPILL_TO_DISK)
      .maxQueuedMessagesPerClient(64)
      .build(),
  )
  .buildServer();

Spill files are organized as:

{spillRootPath}/{namespace}/{clientId}/{topic}/00000001.json

Metrics

Each message spilled to disk increments the streamfence_messages_spilled_total counter (labels: namespace, topic).


Authentication

Set auth: TOKEN (config) or .authMode(AuthMode.TOKEN) (builder) and provide a TokenValidator:

import { AuthMode, AuthDecision, type TokenValidator } from 'streamfence-js';

const validator: TokenValidator = {
  validate(token, namespace, topic) {
    if (token === 'secret-token') {
      return AuthDecision.accept('user-alice');
    }
    return AuthDecision.reject('invalid token');
  },
};

const server = new StreamFenceServerBuilder()
  .port(3000)
  .authMode(AuthMode.TOKEN)
  .tokenValidator(validator)
  .namespace(spec)
  .buildServer();

TokenValidator.validate() may return a plain AuthDecision or a Promise<AuthDecision> for async validation (database lookups, JWT verification, etc.).

When auth is enabled, clients must include a token in the subscribe payload:

socket.emit('subscribe', { topic: 'snapshot', token: 'secret-token' });

TLS

import { TransportMode, TlsConfig } from 'streamfence-js';

const server = new StreamFenceServerBuilder()
  .port(3000)
  .transportMode(TransportMode.WSS)
  .tlsConfig(TlsConfig.create({
    certChainPemPath: '/etc/ssl/cert.pem',
    privateKeyPemPath: '/etc/ssl/key.pem',
    // protocol: 'TLSv1.3',       // default
    // privateKeyPassword: '...',  // optional
  }))
  .namespace(spec)
  .buildServer();

Metrics & management

Use PromServerMetrics for Prometheus metrics. Pass your own prom-client registry — streamfence-js registers its counters into it. Mount the scrape route on your existing HTTP server; no extra port needed.

import { register } from 'prom-client';
import { PromServerMetrics } from 'streamfence-js';

const server = new StreamFenceServerBuilder()
  .port(3000)
  .metrics(new PromServerMetrics(register))   // your existing registry
  .namespace(spec)
  .buildServer();

// Mount on your existing Express app
app.get('/metrics', async (_req, res) => {
  res.set('Content-Type', register.contentType);
  res.end(await register.metrics());
});

If you don't have a custom registry, omit the argument — it defaults to prom-client's global register:

.metrics(new PromServerMetrics())

Using a dedicated isolated registry? Pass new Registry():

import { Registry } from 'prom-client';
const registry = new Registry();
.metrics(new PromServerMetrics(registry))

Available metrics

| Metric | Labels | Description | |---|---|---| | streamfence_connections_total | namespace | Total successful client connections | | streamfence_disconnections_total | namespace | Total client disconnections | | streamfence_messages_published_total | namespace, topic | Total outbound messages published | | streamfence_messages_published_bytes_total | namespace, topic | Total outbound message bytes published | | streamfence_messages_received_total | namespace, topic | Total inbound messages received | | streamfence_messages_received_bytes_total | namespace, topic | Total inbound message bytes received | | streamfence_queue_overflow_total | namespace, topic, reason | Total queue overflow events | | streamfence_retries_total | namespace, topic | Total retry attempts | | streamfence_retries_exhausted_total | namespace, topic | Total exhausted retry outcomes | | streamfence_messages_dropped_total | namespace, topic | Total dropped messages (DROP_OLDEST) | | streamfence_messages_coalesced_total | namespace, topic | Total coalesced messages | | streamfence_messages_spilled_total | namespace, topic | Total messages spilled to disk | | streamfence_auth_rejected_total | namespace | Total auth rejections | | streamfence_auth_rate_limited_total | namespace | Total auth rate-limited rejections |


Event listeners

Register a listener via the builder's .listener() method or at runtime via server.addListener(). All callbacks are optional — implement only what you need. Exceptions thrown from callbacks are caught and logged; they never crash the server.

import type { ServerEventListener } from 'streamfence-js';

const listener: ServerEventListener = {
  onServerStarted(event) {
    console.log(`Server started on ${event.host}:${event.port}`);
  },
  onClientConnected(event) {
    console.log('Client connected:', event.clientId, 'on', event.namespace);
  },
  onClientDisconnected(event) {
    console.log('Client disconnected:', event.clientId);
  },
  onSubscribed(event) {
    console.log('Subscribed:', event.clientId, '->', event.topic);
  },
  onPublishRejected(event) {
    console.warn('Publish rejected:', event.reasonCode, event.reason);
  },
  onQueueOverflow(event) {
    console.warn('Queue overflow:', event.namespace, event.topic, event.reason);
  },
  onRetryExhausted(event) {
    console.error('Retry exhausted:', event.messageId, 'after', event.retryCount, 'attempts');
  },
};

const server = new StreamFenceServerBuilder()
  .port(3000)
  .listener(listener)
  .namespace(spec)
  .buildServer();

All callbacks and their event types

Server lifecycle

| Callback | Event type | Fields | |---|---|---| | onServerStarting | ServerStartingEvent | host, port | | onServerStarted | ServerStartedEvent | host, port | | onServerStopping | ServerStoppingEvent | host, port | | onServerStopped | ServerStoppedEvent | host, port |

Client connection

| Callback | Event type | Fields | |---|---|---| | onClientConnected | ClientConnectedEvent | namespace, clientId, transport ('websocket' | 'polling'), principal (string \| null) | | onClientDisconnected | ClientDisconnectedEvent | namespace, clientId |

Subscription

| Callback | Event type | Fields | |---|---|---| | onSubscribed | SubscribedEvent | namespace, clientId, topic | | onUnsubscribed | UnsubscribedEvent | namespace, clientId, topic |

Publishing

| Callback | Event type | Fields | |---|---|---| | onPublishAccepted | PublishAcceptedEvent | namespace, clientId, topic | | onPublishRejected | PublishRejectedEvent | namespace, clientId, topic, reasonCode, reason | | onQueueOverflow | QueueOverflowEvent | namespace, clientId, topic, reason |

Authentication

| Callback | Event type | Fields | |---|---|---| | onAuthRejected | AuthRejectedEvent | namespace, clientId, remoteAddress, reason |

Retry (AT_LEAST_ONCE only)

| Callback | Event type | Fields | |---|---|---| | onRetry | RetryEvent | namespace, clientId, topic, messageId, retryCount (1-based) | | onRetryExhausted | RetryExhaustedEvent | namespace, clientId, topic, messageId, retryCount |


Server API reference

StreamFenceServerBuilder

Fluent builder for server configuration.

| Method | Description | |---|---| | host(value: string) | Bind address (default '0.0.0.0') | | port(value: number) | Socket.IO listen port (use 0 for OS-assigned) | | transportMode(value: TransportModeValue) | WS or WSS | | engineIoTransportMode(value) | WEBSOCKET_ONLY or WEBSOCKET_OR_POLLING | | authMode(value: AuthModeValue) | NONE or TOKEN | | tokenValidator(value: TokenValidator \| null) | Custom token validation function | | tlsConfig(value: TlsConfig \| null) | TLS certificate/key config (required for WSS) | | listener(value: ServerEventListener) | Add an event listener (can be called multiple times) | | metrics(value: ServerMetrics) | Metrics implementation (default NoopServerMetrics) | | spillRootPath(value: string) | Root directory for disk spill files (default '.streamfence-spill') | | namespace(value: NamespaceSpec) | Add a namespace (at least one required) | | buildServer() | Build and return a StreamFenceServer | | buildSpec() | Build and return the immutable StreamFenceServerSpec | | static fromYaml(path, { server }) | Load config from YAML file | | static fromJson(path, { server }) | Load config from JSON file |

StreamFenceServer

| Method / Property | Description | |---|---| | start(): Promise<void> | Start the Socket.IO server | | stop(): Promise<void> | Graceful shutdown — disconnects clients, stops retry loop, closes port | | publish(namespace, topic, payload) | Broadcast a message to all subscribers of a topic in a namespace | | publishTo(namespace, clientId, topic, payload) | Send a message to a specific client only | | onMessage(namespace, topic, handler) | Register a handler for inbound client messages on a topic | | addListener(listener: ServerEventListener) | Register an event listener at runtime (after construction) | | metrics(): ServerMetrics | Access the metrics instance | | port: number \| null | Actual bound port after start() (useful when constructed with port 0) |


NamespaceSpec builder

Create namespace policies via NamespaceSpec.builder('/path'):

const spec = NamespaceSpec.builder('/prices')
  .topics(['bid', 'ask', 'last'])          // register multiple topics at once
  .deliveryMode(DeliveryMode.BEST_EFFORT)
  .overflowAction(OverflowAction.COALESCE)
  .maxQueuedMessagesPerClient(128)
  .maxQueuedBytesPerClient(1_048_576)      // 1 MiB
  .coalesce(true)
  .build();

| Method | Default | Description | |---|---|---| | topic(name: string) | — | Add a single topic | | topics(names: string[]) | [] | Set multiple topics at once | | deliveryMode(mode) | BEST_EFFORT | BEST_EFFORT or AT_LEAST_ONCE | | overflowAction(action) | REJECT_NEW | Overflow strategy (see Overflow policies) | | maxQueuedMessagesPerClient(n) | 64 | Max messages per client per topic before overflow applies | | maxQueuedBytesPerClient(n) | 524288 (512 KiB) | Max total bytes queued per client; messages exceeding this are rejected | | ackTimeoutMs(n) | 1000 | Timeout before retrying an unacknowledged message (AT_LEAST_ONCE) | | maxRetries(n) | 0 | Max retry attempts per message (must be >= 1 for AT_LEAST_ONCE) | | coalesce(flag) | false | Enable coalesce key matching for COALESCE overflow | | allowPolling(flag) | true | Allow HTTP long-polling transport (set false to force WebSocket only) | | maxInFlight(n) | 1 | Max messages awaiting acknowledgement simultaneously (AT_LEAST_ONCE) | | authRequired(flag) | false | Require token auth for this namespace | | inboundAckPolicy(policy) | ACK_ON_RECEIPT | When to acknowledge inbound messages: ACK_ON_RECEIPT or ACK_AFTER_HANDLER_SUCCESS | | build() | — | Validate and return an immutable NamespaceSpec |


API reference — exports

Enums

| Export | Values | |---|---| | DeliveryMode | BEST_EFFORT, AT_LEAST_ONCE | | OverflowAction | DROP_OLDEST, REJECT_NEW, COALESCE, SNAPSHOT_ONLY, SPILL_TO_DISK | | TransportMode | WS, WSS | | AuthMode | NONE, TOKEN | | EngineIoTransportMode | WEBSOCKET_ONLY, WEBSOCKET_OR_POLLING | | InboundAckPolicy | ACK_ON_RECEIPT, ACK_AFTER_HANDLER_SUCCESS |

Classes & factories

| Export | Description | |---|---| | StreamFenceServerBuilder | Fluent builder for server configuration; fromYaml(), fromJson() static factories | | StreamFenceServer | Running server instance — start(), stop(), publish(), publishTo(), onMessage(), addListener() | | NamespaceSpec / NamespaceSpecBuilder | Namespace policy builder | | AuthDecision | accept(principal) / reject(reason) factory | | TlsConfig | create(input) factory | | PromServerMetrics | Prometheus metrics implementation | | NoopServerMetrics | No-op metrics (default) |

Interfaces

| Export | Description | |---|---| | TokenValidator | Custom token authentication | | ServerEventListener | Optional lifecycle + runtime event callbacks (14 hooks) | | ServerMetrics | Metrics recording interface | | StreamFenceServerSpec | Immutable server configuration | | InboundMessageContext | Context passed to onMessage handlers | | InboundMessageHandler | Handler type: (payload, context) => void \| Promise<void> |

Event types

All event interfaces are exported for use in typed listener implementations:

ServerStartingEvent, ServerStartedEvent, ServerStoppingEvent, ServerStoppedEvent, ClientConnectedEvent, ClientDisconnectedEvent, SubscribedEvent, UnsubscribedEvent, PublishAcceptedEvent, PublishRejectedEvent, QueueOverflowEvent, AuthRejectedEvent, RetryEvent, RetryExhaustedEvent


Examples

See examples/ for runnable code:

  • single-server — programmatic builder API, one namespace
  • multi-namespace — one server with three namespaces: DROP_OLDEST prices, SNAPSHOT_ONLY portfolio, AT_LEAST_ONCE alerts
  • mixed-workload — two servers from a single YAML config: a BEST_EFFORT feed server and an AT_LEAST_ONCE control server

Run with:

npx tsx examples/multi-namespace/server.ts

Status / roadmap

v1 is complete and published. Planned for v2:

  • TLS PEM hot reload
  • Cluster-aware delivery (multi-process / multi-node)

License

Apache-2.0