npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

@mikrostack/chbus

v0.2.5

Published

Typed, channel-based event bus with middleware, loop prevention, storm control, and full observability

Readme

@mikrostack/chbus

A typed, channel-based event bus for TypeScript. Organise messaging into named, strongly-typed channels with middleware pipelines, loop detection, storm control, and dual sync/async delivery tracks. A built-in debug wiretap and console logger give full observability with zero coupling to your application code.

Installation

npm install @mikrostack/chbus

Quick start

import { createBus } from '@mikrostack/chbus'

type PlaybackContract = {
  'playback:start': { trackId: string }
  'playback:stop':  { trackId: string }
}

const bus = createBus()
const playback = bus.channel<PlaybackContract>('playback')

playback.on('playback:start', (payload) => {
  console.log('starting', payload.trackId)
})

playback.emit('playback:start', { trackId: 'track-1' }, { from: 'playerService' })

Core concepts

Bus

The Bus is the single entry point. It owns all channels and the internal debug wiretap. Create one with the createBus() factory:

const bus = createBus()

// With global storm config:
const bus = createBus({ storm: { maxMessages: 50, windowMs: 500 } })

Channel contract

A contract is a plain TypeScript type that maps action names to their payload types. You define contracts — the library imposes no schema.

type UIContract = {
  'ui:status-update': { label: string }
  'ui:modal-open':    { id: string }
  'ui:modal-close':   void
}

Channel

A channel is a strongly-typed message conduit scoped to one contract. Retrieve (or create) a channel by name:

const ui = bus.channel<UIContract>('ui')

Calling bus.channel('ui') more than once returns the same instance — channels are singletons within a bus. The name 'debug' is reserved and will throw.


Namespaced bus

When a third-party library integrates with chbus, it should receive a NamespacedBus rather than the root Bus. A NamespacedBus scopes all channel creation to a single namespace and deliberately does not expose onDebug() or namespace().

import { NamespacedBus } from '@mikrostack/chbus'

// Library declares what it needs:
export function createVideoPlayer(bus: NamespacedBus) {
  const playback = bus.channel<PlaybackContract>('playback')
  // Channels are registered as 'player:playback' on the root bus.
}

// Application wires them together:
const bus = createBus()
createVideoPlayer(bus.namespace('player'))

Channels from different namespaces are fully isolated even when they share an action name.

const ns1 = bus.namespace('player')
const ns2 = bus.namespace('analytics')

ns1.channel<UIContract>('ui')        // registered as 'player:ui'
ns2.channel<UIContract>('ui')        // registered as 'analytics:ui' — distinct instance

Multiple calls to bus.namespace('player') return independent proxy objects but write to the same underlying channel registry — ns1.channel('playback') and ns2.channel('playback') (same namespace) return the same Channel instance.


Subscribing

Sync subscribers — on()

Registered with on() and called by emit(). Must be synchronous (any returned value is ignored).

const unsubscribe = playback.on('playback:start', (payload, { message }) => {
  console.log(message.from, payload.trackId)
})

// Later:
unsubscribe()

Async subscribers — onAsync()

Registered with onAsync() and called by emitAsync(). Must return a Promise<void>.

const unsubscribe = buffer.onAsync('buffer:flush', async (payload) => {
  await writeToDisk(payload.data)
})

The two tracks are completely separate — on() subscribers are never called by emitAsync(), and onAsync() subscribers are never called by emit().

The message in meta gives you full context: id, namespace, channel, action, from, coordinationChain, timestamp.


Emitting

Sync — emit()

Fire-and-forget. Delivers only to subscribers registered with on(). Returns immediately; no async work is awaited.

playback.emit('playback:start', { trackId: 'track-1' })

// With sender identity:
playback.emit('playback:start', { trackId: 'track-1' }, { from: 'playerService' })

Async — emitAsync()

Delivers only to subscribers registered with onAsync(). Awaits all of them in parallel using Promise.allSettled and returns their outcomes as SettledResult[].

buffer.onAsync('buffer:flush', async (payload) => {
  await writeToDisk(payload.data)
})

const results = await buffer.emitAsync('buffer:flush', { data: pendingFrames })

results.forEach((r) => {
  if (r.status === 'rejected') console.error('subscriber failed:', r.reason)
})

Because allSettled is used, a failing subscriber never prevents others from running. emitAsync returns [] if no async subscribers are registered or if the message is dropped.

Choosing an emit method

| | emit | emitAsync | |---|---|---| | Subscriber track | on() | onAsync() | | Awaitable | No | Yes — Promise<SettledResult[]> | | Isolation | Sync only | Async only | | Use when | Notifications, broadcasts | Critical side-effects must complete before proceeding |


Middleware

Middleware runs in insertion order before subscribers are notified. It applies to both emit() and emitAsync(). Each middleware receives the full typed Message and a next function. If next() is not called the message is silently dropped.

playback.use((message, next) => {
  console.log(`[${message.channel}] ${String(message.action)}`)
  next()
})

// Conditional gating:
playback.use((message, next) => {
  if (isAuthorised(message.from)) next()
  // else: message is dropped silently
})

Middleware is chainable:

channel
  .use(loggingMiddleware)
  .use(authMiddleware)
  .use(metricsMiddleware)

Loop detection

When multiple channels are wired together it is easy to create event cycles (A emits → B reacts → A emits → …). chbus automatically detects and drops looping messages using a coordination chain appended to every emitted message.

Pass the incoming chain through EmitOptions when reacting to a message:

playback.on('playback:start', (payload, { message }) => {
  ui.emit(
    'ui:status-update',
    { label: `Playing ${payload.trackId}` },
    {
      from: 'playbackService',
      coordinationChain: [...message.coordinationChain],
      // chbus appends its own coordination ID before forwarding.
      // If it recognises any existing ID as its own, the message is dropped.
    },
  )
})

When a loop is detected a warning is logged to the console and the message is dropped:

[chbus] Loop detected on channel "player:ui" action "ui:status-update" from "playbackService"

Each channel retains up to 10,000 coordination IDs, evicting the oldest when the limit is reached.


Storm control

Each channel tracks message rates per sender within a sliding window. If a sender exceeds the threshold, subsequent messages are dropped with a warning:

[chbus] Storm detected on channel "player:playback" from sender "trackService" — 101 messages in 1000ms

Global config (applies to all channels):

const bus = createBus({
  storm: { maxMessages: 100, windowMs: 1000 }, // these are the defaults
})

Per-channel override:

// High-frequency channel — raise the limit
const telemetry = bus.channel<TelemetryContract>('telemetry', {
  storm: { maxMessages: 1000, windowMs: 1000 },
})

Debug wiretap

Every message that completes the emit flow is automatically forwarded to the debug wiretap — emitters and subscribers are completely unaware of it. It is only accessible on the root Bus.

const unsubDebug = bus.onDebug((msg) => {
  console.debug(
    `[${msg.qualifiedChannel}] ${msg.action}`,
    { from: msg.from, payload: msg.payload },
  )
})

// Stop listening:
unsubDebug()

DebugMessage shape:

{
  namespace:         'player',
  channel:           'playback',
  qualifiedChannel:  'player:playback',   // 'playback' if no namespace
  action:            'playback:start',
  payload:           { trackId: 'track-1' },
  from:              'playerService',
  coordinationChain: ['abc123'],
  timestamp:         1711234567890,
  messageId:         'xyz789',
}

Console logger

createLogger is a zero-config devtool built on the debug wiretap. It pretty-prints every message using console.groupCollapsed and returns a stop function.

import { createLogger } from '@mikrostack/chbus'

const stop = createLogger(bus)

// Later, in cleanup:
stop()

Options:

const stop = createLogger(bus, {
  collapsed: false,            // use console.group instead of console.groupCollapsed
  filter: {
    namespaces: ['player'],       // only log messages from the 'player' namespace
    channels:   ['playback'],  // only log messages from the 'playback' channel
    actions:    ['playback:start'],
  },
})

All filter arrays are optional and independent — combine them to narrow output to exactly the traffic you care about.


Lifecycle

Channels and the bus expose a destroy() method that clears all subscribers, cancels pending timers, and releases internal state. Calling emit() or emitAsync() on a destroyed channel is a no-op and logs a warning.

// Destroy a single channel:
playback.destroy()

// Destroy the bus and all its channels:
bus.destroy()

API reference

createBus(config?: BusConfig): Bus

| Option | Type | Default | Description | |---|---|---|---| | storm.maxMessages | number | 100 | Max messages per sender per window | | storm.windowMs | number | 1000 | Window duration in milliseconds |

Bus

| Method | Returns | Description | |---|---|---| | channel<C>(name, options?) | Channel<C> | Get or create a channel. Throws for 'debug'. | | namespace(name) | NamespacedBus | Create a namespaced proxy for library integration. | | onDebug(handler) | () => void | Subscribe to the debug wiretap. Returns unsubscribe. | | destroy() | void | Destroy all channels and clear internal state. |

NamespacedBus

| Property / Method | Returns | Description | |---|---|---| | namespace | string | The namespace this proxy is scoped to. | | channel<C>(name, options?) | Channel<C> | Get or create a namespaced channel. |

Channel<C>

| Property / Method | Returns | Description | |---|---|---| | name | string | Unqualified channel name. | | namespace | string | Namespace, or '' if none. | | use(middleware) | this | Append middleware to the pipeline. | | on(action, subscriber) | () => void | Register a sync subscriber. Returns unsubscribe. | | onAsync(action, subscriber) | () => void | Register an async subscriber. Returns unsubscribe. | | emit(action, payload, options?) | void | Sync fire-and-forget. Delivers to on() subscribers only. | | emitAsync(action, payload, options?) | Promise<SettledResult[]> | Async fan-out. Delivers to onAsync() subscribers only. | | destroy() | void | Clear all subscribers, timers, and state. |

ChannelOptions

| Field | Type | Description | |---|---|---| | storm | Partial<StormConfig> | Per-channel storm config override. Merged with global config. |

EmitOptions

| Field | Type | Default | Description | |---|---|---|---| | from | string | 'anonymous' | Sender identity | | coordinationChain | string[] | [] | Upstream chain for loop detection |

SettledResult

| Field | Type | Description | |---|---|---| | status | 'fulfilled' \| 'rejected' | Outcome of the async subscriber | | reason | unknown | Rejection reason, present only when status is 'rejected' |

LoggerOptions

| Field | Type | Default | Description | |---|---|---|---| | collapsed | boolean | true | Use groupCollapsed instead of group | | filter.namespaces | string[] | — | Only log messages from these namespaces | | filter.channels | string[] | — | Only log messages from these channel names | | filter.actions | string[] | — | Only log messages matching these action names |


TypeScript

The library is written in strict TypeScript and ships with full .d.ts declarations. Payload types are inferred directly from your contract — no casting needed anywhere in the public API.

// Payload is inferred as { trackId: string }
playback.on('playback:start', (payload) => {
  payload.trackId  // ✓ string
  payload.unknown   // ✗ TypeScript error
})

Use NamespacedBus in library type signatures to communicate intent — it signals that the library will only create channels, not subscribe to the wiretap or spawn new namespaces:

import type { NamespacedBus } from '@mikrostack/chbus'

export interface VideoPlayerOptions {
  bus: NamespacedBus
}