npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

@connectum/events

v1.0.0-rc.9

Published

Universal event adapter layer for Connectum: proto-first pub/sub with pluggable broker adapters

Downloads

334

Readme

@connectum/events

Universal event adapter layer for Connectum: proto-first pub/sub with pluggable broker adapters.

@connectum/events provides a transport-agnostic EventBus for publishing and subscribing to events using protobuf schemas. Swap between NATS, Kafka, Redis Streams, or in-memory adapter without changing application code.

Layer: 1 (Extension) | Node.js: >=20.0.0 | License: Apache-2.0

Features

  • createEventBus() -- factory with explicit lifecycle (start() / stop())
  • Proto-first -- publish and subscribe using @bufbuild/protobuf message schemas
  • Pluggable Adapters -- swap NATS, Kafka, Redis Streams, or in-memory without code changes
  • EventRouter -- type-safe handler registration mirroring ConnectRouter pattern
  • Middleware Pipeline -- composable middleware with built-in retry and DLQ
  • Wildcard Subscriptions -- NATS-style patterns (* single segment, > greedy)
  • MemoryAdapter -- zero-dependency in-memory adapter for testing
  • Graceful Shutdown -- integrates with @connectum/core via EventBusLike interface
  • Auto-ack -- successful handler completion auto-acknowledges if neither ack() nor nack() called

Installation

pnpm add @connectum/events

Peer dependencies (installed automatically):

pnpm add @bufbuild/protobuf

You also need a broker adapter:

pnpm add @connectum/events-nats    # NATS JetStream
pnpm add @connectum/events-kafka   # Kafka / Redpanda
pnpm add @connectum/events-redis   # Redis Streams

Quick Start

Minimal Example (in-memory)

import { createEventBus, MemoryAdapter } from '@connectum/events';
import { UserCreatedSchema } from '#gen/events_pb.js';

const bus = createEventBus({
  adapter: MemoryAdapter(),
});

await bus.start();

// Publish an event
await bus.publish(UserCreatedSchema, { id: '1', name: 'Alice' });

await bus.stop();

With Routes and Middleware

import { createEventBus } from '@connectum/events';
import { NatsAdapter } from '@connectum/events-nats';
import { UserCreatedSchema } from '#gen/events_pb.js';
import { UserService } from '#gen/user_pb.js';

const bus = createEventBus({
  adapter: NatsAdapter({ servers: 'nats://localhost:4222' }),
  routes: [eventRoutes],
  middleware: {
    retry: { maxRetries: 3, backoff: 'exponential' },
    dlq: { topic: 'service.dlq' },
  },
});

await bus.start();

Integration with @connectum/core

import { createServer } from '@connectum/core';
import { createEventBus } from '@connectum/events';
import { NatsAdapter } from '@connectum/events-nats';

const eventBus = createEventBus({
  adapter: NatsAdapter({ servers: 'nats://localhost:4222' }),
  routes: [eventRoutes],
});

const server = createServer({
  services: [routes],
  port: 5000,
  eventBus, // Lifecycle managed by server
});

await server.start(); // Also starts eventBus

EventRouter (type-safe handlers)

import type { EventRouter } from '@connectum/events';
import { UserService } from '#gen/user_pb.js';
import { UserCreatedSchema } from '#gen/events_pb.js';

export default (router: EventRouter) => {
  router.service(UserService, {
    async userCreated(ctx) {
      const event = ctx.event; // Typed from proto schema
      console.log(`User created: ${event.name}`);
      // Auto-ack on successful return
    },
  });
};

Middleware

The middleware pipeline wraps event handlers in the order:

Custom[0] → Custom[1] → ... → DLQ → Retry → Handler

Retry Middleware

Retries failed handlers with configurable backoff strategy.

const bus = createEventBus({
  adapter: NatsAdapter({ servers: 'nats://localhost:4222' }),
  middleware: {
    retry: {
      maxRetries: 3,           // Max retry attempts (default: 3)
      backoff: 'exponential',  // 'exponential' | 'linear' | 'fixed'
      initialDelay: 200,       // Initial delay in ms (default: 200)
      maxDelay: 30000,         // Max delay in ms (default: 30000)
      retryableErrors: (err) => !(err instanceof ValidationError),
    },
  },
});

DLQ Middleware

Routes permanently failed events to a dead-letter queue topic.

const bus = createEventBus({
  adapter: NatsAdapter({ servers: 'nats://localhost:4222' }),
  middleware: {
    dlq: {
      topic: 'service.dlq',        // DLQ topic name (required)
      errorSerializer: (err) => ({  // Custom error serialization
        message: err instanceof Error ? err.message : String(err),
        stack: err instanceof Error ? err.stack : undefined,
      }),
    },
  },
});

Custom Middleware

import type { EventMiddleware } from '@connectum/events';

const loggingMiddleware: EventMiddleware = (next) => async (ctx) => {
  console.log(`Processing event: ${ctx.eventType}`);
  const start = Date.now();
  await next(ctx);
  console.log(`Processed in ${Date.now() - start}ms`);
};

const bus = createEventBus({
  adapter: NatsAdapter({ servers: 'nats://localhost:4222' }),
  middleware: {
    custom: [loggingMiddleware],
    retry: { maxRetries: 3 },
    dlq: { topic: 'service.dlq' },
  },
});

Typed Errors

Control retry behavior declaratively by throwing typed error classes:

import { NonRetryableError, RetryableError } from '@connectum/events';

// Skip retry entirely (e.g., validation errors)
throw new NonRetryableError('Invalid payload schema');

// Force retry regardless of retryableErrors predicate
throw new RetryableError('Temporary connection lost', { cause: originalError });

Priority: NonRetryableError > RetryableError > retryableErrors predicate > retry all (default).

Both classes use Symbol.for() branding for cross-realm compatibility.

API Reference

createEventBus()

import { createEventBus } from '@connectum/events';

function createEventBus(options: EventBusOptions): EventBus

Parameters (EventBusOptions):

| Parameter | Type | Default | Description | |-----------|------|---------|-------------| | adapter | EventAdapter | required | Broker adapter (NATS, Kafka, Redis, Memory) | | routes | EventRoute[] | [] | Event route handlers | | middleware | MiddlewareConfig | {} | Middleware configuration | | group | string | undefined | Consumer group name | | signal | AbortSignal | undefined | External abort signal | | handlerTimeout | number | 30000 | Per-handler timeout in ms | | drainTimeout | number | 30000 | Max ms to wait for in-flight handlers during shutdown |

EventBus Interface

interface EventBus {
  start(): Promise<void>;
  stop(): Promise<void>;
  publish<T>(schema: DescMessage, data: T, options?: PublishOptions): Promise<void>;
}

PublishOptions

| Parameter | Type | Default | Description | |-----------|------|---------|-------------| | metadata | Record<string, string> | undefined | Event metadata / headers | | topic | string | undefined | Override the schema-derived event type / topic name | | key | string | undefined | Partition key (Kafka) or routing key |

EventContext

| Property | Type | Description | |----------|------|-------------| | event | T | Deserialized event payload | | eventType | string | Event type / topic name | | eventId | string | Unique event identifier | | metadata | Map<string, string> | Event metadata | | signal | AbortSignal | Abort signal (shutdown + timeout) | | ack() | () => Promise<void> | Acknowledge event (idempotent) | | nack(requeue?) | (requeue?: boolean) => Promise<void> | Negative-acknowledge event |

DlqOptions

| Parameter | Type | Default | Description | |-----------|------|---------|-------------| | topic | string | required | DLQ topic name | | errorSerializer | (error: unknown) => Record<string, unknown> | Default serializer | Custom error serialization |

RetryOptions

| Parameter | Type | Default | Description | |-----------|------|---------|-------------| | maxRetries | number | 3 | Maximum retry attempts | | backoff | 'exponential' \| 'linear' \| 'fixed' | 'exponential' | Backoff strategy | | initialDelay | number | 200 | Initial delay in ms | | maxDelay | number | 30000 | Maximum delay in ms | | retryableErrors | (err: unknown) => boolean | All errors | Filter for retryable errors |

MemoryAdapter

Zero-dependency in-memory adapter for unit and integration tests.

import { createEventBus, MemoryAdapter } from '@connectum/events';

const bus = createEventBus({
  adapter: MemoryAdapter(),
  routes: [myRoutes],
});

await bus.start();

// Publish and consume synchronously in-process
await bus.publish(MyEventSchema, { value: 42 });

await bus.stop();

Supports wildcard subscriptions (* and > patterns).

Graceful Shutdown

EventBus tracks in-flight message handlers and waits for them to complete during stop():

const bus = createEventBus({
  adapter: NatsAdapter({ servers: 'nats://localhost:4222' }),
  routes: [eventRoutes],
  drainTimeout: 15_000, // Wait up to 15s for handlers (default: 30s)
});

// During stop():
// 1. Stop accepting new messages (nack with requeue)
// 2. Wait for in-flight handlers up to drainTimeout
// 3. Force-abort remaining via AbortSignal
// 4. Disconnect adapter
await bus.stop();

Set drainTimeout: 0 for immediate abort (skip drain).

Exports Summary

| Export | Kind | Description | |--------|------|-------------| | createEventBus | function | Factory for creating an EventBus | | NonRetryableError | class | Error that skips retry middleware | | RetryableError | class | Error that forces retry | | EventRouterImpl | class | Event router implementation | | MemoryAdapter | function | In-memory adapter factory | | dlqMiddleware | function | DLQ middleware factory | | retryMiddleware | function | Retry middleware factory | | composeMiddleware | function | Middleware composition utility | | createEventContext | function | EventContext factory | | resolveTopicName | function | Topic name resolution from proto | | matchPattern | function | NATS-style wildcard matching | | EventAdapter | type | Adapter interface | | EventBus | type | EventBus interface | | EventBusOptions | type | Options for createEventBus() | | EventContext | type | Handler context interface | | EventRouter | type | Router interface | | PublishOptions | type | Publish options | | EventMiddleware | type | Middleware function type | | RetryOptions | type | Retry middleware options | | DlqOptions | type | DLQ middleware options | | RawEvent | type | Raw event from adapter | | RawEventHandler | type | Raw event handler type | | EventSubscription | type | Subscription handle | | MiddlewareConfig | type | Middleware configuration |

Dependencies

Internal

  • @connectum/core -- EventBusLike interface for server integration

External

  • @bufbuild/protobuf -- Protocol Buffers runtime (serialization/deserialization)

Requirements

  • Node.js: >=20.0.0
  • pnpm: >=10.0.0
  • TypeScript: >=5.7.2 (for type checking)

Documentation

License

Apache-2.0


Part of @connectum -- Universal framework for production-ready gRPC/ConnectRPC microservices