npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

@scope3/eventbus

v1.0.0

Published

Type-safe event bus library with Google Cloud Pub/Sub integration

Readme

@scope3/eventbus

A flexible, type-safe event bus library for TypeScript applications with built-in Google Cloud Pub/Sub integration. Perfect for microservices, distributed systems, and event-driven architectures.

Features

  • Pre-instantiated Singletons: Import and use directly - no instantiation needed
  • Flexible Architecture: Generic event emitter + specialized database event emitter
  • Type-Safe: Full TypeScript support with optional runtime validation
  • Google Cloud Pub/Sub: Built-in integration with message ordering for both publishing and listening
  • Modular Exports: Import just what you need via /database, /emitter, /publisher, or /listener
  • Validation Options: Zod schemas or custom validators for database events
  • Non-Blocking: Uses process.nextTick to avoid blocking the event loop
  • Configurable: Custom ordering keys, logging, and behavior
  • Flexible Message Routing: Attribute-based routing, catch-all handlers, and command/action pattern support

Installation

npm install @scope3/eventbus

Optional Dependencies

# If using Zod schemas for database events (recommended)
npm install zod

Zod is an optional peer dependency. The library ships with Google Cloud Pub/Sub built-in.

Quick Start

Generic Event Emitter

For simple event emission without schema validation:

import { EventEmitter } from '@scope3/eventbus'

// Configure the emitter (typically done once at app startup)
EventEmitter.setConfig({
  publisherOptions: {
    projectId: 'my-gcp-project',
    topicName: 'events',
  },
  debug: false,
})

// Emit events with any data
EventEmitter.emitEvent('user-service', 'user-created', {
  id: 1,
  email: '[email protected]',
  name: 'John Doe',
})

EventEmitter.emitEvent('payment-service', 'payment-processed', {
  id: 'txn-123',
  amount: 99.99,
  currency: 'USD',
})

Database Event Emitter

For database events with schema validation:

import { DatabaseEventEmitter, DatabaseEventActions } from '@scope3/eventbus/database'
import { z } from 'zod'

// Configure the database event emitter
DatabaseEventEmitter.setConfig({
  publisherOptions: {
    projectId: 'my-gcp-project',
    topicName: 'database-events',
  },
  debug: false,
})

// Add schemas for your database tables
DatabaseEventEmitter.addSchema('users', {
  schema: z.object({
    id: z.number(),
    name: z.string(),
    email: z.string().email(),
    createdAt: z.date(),
    updatedAt: z.date(),
  }),
})

DatabaseEventEmitter.addSchema('posts', {
  schema: z.object({
    id: z.number(),
    title: z.string(),
    content: z.string(),
    authorId: z.number(),
    publishedAt: z.date().nullable(),
  }),
})

// Emit database events with full type safety
DatabaseEventEmitter.event('users', DatabaseEventActions.Insert, {
  id: 1,
  name: 'John Doe',
  email: '[email protected]',
  createdAt: new Date(),
  updatedAt: new Date(),
})

DatabaseEventEmitter.event('posts', DatabaseEventActions.Update, {
  id: 1,
  title: 'Updated Title',
  content: 'Updated content',
  authorId: 1,
  publishedAt: new Date(),
})

Architecture

The library provides two emitter classes:

  1. EventEmitter - Generic event emitter for simple use cases

    • No schema validation
    • Flexible event emission with any data
    • Perfect for application events, service notifications, etc.
  2. DatabaseEventEmitter - Extends EventEmitter with database-specific features

    • Schema validation (Zod or custom validators)
    • Standardized database actions (Insert, Update, Delete)
    • Type-safe event emission with schema enforcement

Both emitters:

  • Share the same Pub/Sub publisher integration
  • Support custom ordering keys for message delivery guarantees
  • Use non-blocking event publishing with process.nextTick
  • Provide configurable logging and debug modes

Modular Exports

Import just what you need:

// Generic event emitter (main export)
import { EventEmitter } from '@scope3/eventbus'

// Database-specific emitter and types
import { DatabaseEventEmitter, DatabaseEventActions } from '@scope3/eventbus/database'

// Base emitter class (for extending)
import { BaseEventEmitter } from '@scope3/eventbus/emitter'

// Publisher for advanced use cases
import { PubSubPublisher } from '@scope3/eventbus/publisher'

// Listeners for advanced use cases
import { PubSubListener, CommandListener } from '@scope3/eventbus/listener'

API Reference

EventEmitter (Generic)

Pre-instantiated singleton for simple event emission.

Methods

  • setConfig(config) - Configure the emitter

    EventEmitter.setConfig({
      publisherOptions?: {
        projectId?: string
        topicName?: string
        messageOrdering?: boolean
        enabled?: boolean
      },
      logger?: Logger,
      orderingKeyGenerator?: (source: string, timestamp: number) => string,
      debug?: boolean
    })
  • emitEvent(source, action, data) - Emit an event

    EventEmitter.emitEvent('user-service', 'user-created', {
      id: 1,
      name: 'John Doe'
    })
  • async validate() - Validate publisher configuration

    await EventEmitter.validate()
  • getConfig() - Get current configuration

  • reset() - Reset the emitter (useful for testing)

DatabaseEventEmitter

Pre-instantiated singleton extending EventEmitter with schema validation.

Configuration Methods

  • setConfig(config) - Same as EventEmitter.setConfig()

  • addSchema<T>(table, schemaConfig) - Add a single table schema

    DatabaseEventEmitter.addSchema('users', {
      schema: z.object({ id: z.number() })
    })
  • setSchemas(schemas) - Set multiple schemas at once

    DatabaseEventEmitter.setSchemas({
      users: { schema: userSchema },
      posts: { schema: postSchema }
    })

Event Methods

  • event(table, action, data) - Emit a database event

    DatabaseEventEmitter.event('users', DatabaseEventActions.Insert, {
      id: 1,
      name: 'John'
    })
  • async validate() - Validate publisher configuration

    await DatabaseEventEmitter.validate()

Utility Methods

  • getSchemas() - Get all registered schemas
  • hasSchema(table) - Check if a schema exists for a table
  • getConfig() - Get current configuration
  • reset() - Reset the emitter (useful for testing)

DatabaseEventActions

Enum with standard database actions:

  • Insert - 'insert'
  • Update - 'update'
  • Delete - 'delete'

PubSubListener

Generic listener for receiving events from Google Cloud Pub/Sub with flexible routing options.

The PubSubListener supports two routing mechanisms:

  1. Attribute-based routing: Route messages based on Pub/Sub message attributes (eventType or event_type)
  2. Catch-all handler: Process all messages with a wildcard '*' handler

Basic Usage - Catch-All Handler

Perfect for processing all messages from a subscription:

import { PubSubListener } from '@scope3/eventbus/listener'

const listener = new PubSubListener({
  projectId: 'my-gcp-project',
  topicName: 'events',
  subscriptionName: 'my-service-subscription',
})

await listener.validate()

// Process all messages
listener.addHandler('*', async (data) => {
  console.log('Received message:', data)
  // Your business logic here
})

// Graceful shutdown
process.on('SIGTERM', async () => {
  await listener.close()
})

Attribute-Based Routing

Route messages based on message attributes:

import { PubSubListener } from '@scope3/eventbus/listener'

const listener = new PubSubListener({
  projectId: 'my-gcp-project',
  topicName: 'events',
  subscriptionName: 'my-service-subscription',
})

await listener.validate()

// Handle specific event types via message attributes
listener.addHandler('user-created', async (data) => {
  console.log('User created:', data)
})

listener.addHandler('order-placed', async (data) => {
  console.log('Order placed:', data)
})

// Catch-all for unhandled event types
listener.addHandler('*', async (data) => {
  console.log('Unhandled event:', data)
})

Messages should have an eventType or event_type attribute:

// Publisher side
await publisher.publish({
  data: { userId: 123, name: 'John' },
  attributes: {
    eventType: 'user-created',  // or event_type: 'user-created'
    source: 'auth-service',
    timestamp: Date.now().toString()
  },
  orderingKey: 'user-123'
})

Configuration Methods

  • new PubSubListener(config) - Create a new listener instance

    const listener = new PubSubListener({
      projectId?: string,           // Defaults to process.env.GOOGLE_PROJECT_ID
      topicName?: string,            // Defaults to process.env.EVENT_EMITTER_TOPIC or 'events'
      subscriptionName?: string,     // Defaults to process.env.EVENT_EMITTER_SUBSCRIPTION or 'events-subscription'
      enabled?: boolean,             // Defaults to true
      subscriptionOptions?: {
        autoCreate?: boolean,        // Defaults to true - creates subscription if it doesn't exist
        deleteOnClose?: boolean      // Defaults to false - deletes subscription when listener closes
      },
      logger?: Logger
    })
  • static async init(config, handlers?) - Factory method with optional handler registration

    // Basic usage - automatically validates
    const listener = await PubSubListener.init({
      topicName: 'events',
      subscriptionName: 'my-service'
    })
    // Returns undefined if disabled
    
    // With handlers array - registers handlers during initialization
    const listener = await PubSubListener.init(
      {
        topicName: 'events',
        subscriptionName: 'my-service'
      },
      [
        {
          eventType: 'user-created',
          enabled: true,
          handler: async (data) => {
            console.log('User created:', data)
          }
        },
        {
          eventType: '*',  // Catch-all
          enabled: true,
          handler: async (data) => {
            console.log('All events:', data)
          }
        }
      ]
    )

Handler Methods

  • addHandler(eventType, handler, enabled?) - Register a message handler

    // Specific event type
    listener.addHandler('user-created', async (data) => {
      // Handle the message
    }, true)  // enabled defaults to true
    
    // Catch-all
    listener.addHandler('*', async (data) => {
      // Handle all messages
    })
  • removeHandler(eventType) - Remove a handler

    listener.removeHandler('user-created')
  • getHandlerEventTypes() - Get all registered handler event types

    const eventTypes = listener.getHandlerEventTypes()
    // ['user-created', 'order-placed', '*', ...]

Validation & Lifecycle

  • async validate() - Validate topic exists and create/check subscription

    await listener.validate()
  • async close() - Close the listener and clean up resources

    await listener.close()
  • setupGracefulShutdown() - Setup automatic shutdown handlers for SIGTERM/SIGINT

    listener.setupGracefulShutdown()
    // Listener will automatically close on process termination
    // Does NOT call process.exit() - safe to use with other shutdown handlers like Terminus
    // Uses process.once() to allow handler chaining

Getters

  • getConfig() - Get current configuration
  • getTopicName() - Get the Pub/Sub topic name
  • getSubscriptionName() - Get the subscription name
  • isEnabled() - Check if listener is enabled
  • isListening() - Check if actively listening for messages

CommandListener

Specialized listener for messages following the command/action pattern. Extends PubSubListener to provide routing based on command or action fields in the message payload.

Basic Usage

import { CommandListener } from '@scope3/eventbus/listener'

const listener = new CommandListener({
  projectId: 'my-gcp-project',
  topicName: 'events',
  subscriptionName: 'my-service-subscription',
})

await listener.validate()

// Add command handlers
listener.addCommandHandler('user-created', async (data) => {
  console.log('User created:', data)
  // data contains the 'data' field from the message
})

listener.addCommandHandler('order-placed', async (data) => {
  console.log('Order placed:', data)
})

Message Format

The CommandListener expects messages in this format:

{
  "command": "user-created",
  "data": {
    "id": 1,
    "name": "John Doe",
    "email": "[email protected]"
  }
}

Or using action instead of command:

{
  "action": "user-created",
  "data": { ... }
}

Factory Method

const listener = await CommandListener.init(
  {
    topicName: 'events',
    subscriptionName: 'my-service'
  },
  [
    {
      command: 'user-created',
      enabled: true,
      handler: async (data) => {
        console.log('User created:', data)
      }
    },
    {
      command: 'order-placed',
      enabled: process.env.NODE_ENV === 'production',
      handler: async (data) => {
        console.log('Order placed:', data)
      }
    }
  ]
)

Handler Methods

  • addCommandHandler(command, handler, enabled?) - Register a command handler

    listener.addCommandHandler('user-created', async (data) => {
      // Handle the command
    })
  • removeCommandHandler(command) - Remove a command handler

    listener.removeCommandHandler('user-created')
  • getCommandHandlers() - Get all registered command names

    const commands = listener.getCommandHandlers()
    // ['user-created', 'order-placed', ...]

Error Handling (Both Listeners)

Handlers should handle errors internally. If a handler throws an error, the message will still be acknowledged to prevent infinite redelivery:

// PubSubListener
listener.addHandler('user-created', async (data) => {
  try {
    await processUser(data)
  } catch (error) {
    console.error('Error processing user:', error)
  }
})

// CommandListener
listener.addCommandHandler('user-created', async (data) => {
  try {
    await processUser(data)
  } catch (error) {
    console.error('Error processing user:', error)
  }
})

Type-Safe Handlers (Both Listeners)

Use TypeScript generics for type-safe message handling:

interface UserCreatedData {
  id: number
  email: string
  name: string
}

// PubSubListener
listener.addHandler<UserCreatedData>('user-created', async (data) => {
  if (!data) return
  console.log(`User ${data.name} created with email ${data.email}`)
})

// CommandListener
listener.addCommandHandler<UserCreatedData>('user-created', async (data) => {
  if (!data) return
  console.log(`User ${data.name} created with email ${data.email}`)
})

Integration with Publisher

Use listeners alongside the publisher for full event-driven workflows:

import { PubSubPublisher } from '@scope3/eventbus/publisher'
import { PubSubListener, CommandListener } from '@scope3/eventbus/listener'

// Publisher in auth service
const publisher = new PubSubPublisher({
  projectId: 'my-project',
  topicName: 'events'
})

// Option 1: Publish with eventType attribute (for PubSubListener)
await publisher.publish({
  data: { userId: 123, name: 'John' },
  attributes: {
    eventType: 'user-created',
    source: 'auth-service',
    timestamp: Date.now().toString()
  },
  orderingKey: 'user-123'
})

// Option 2: Publish with command/action in payload (for CommandListener)
await publisher.publish({
  data: {
    command: 'user-created',
    data: { userId: 123, name: 'John' }
  },
  attributes: {
    source: 'auth-service',
    timestamp: Date.now().toString()
  },
  orderingKey: 'user-123'
})

// PubSubListener in email service (attribute-based routing)
const listener = new PubSubListener({
  projectId: 'my-project',
  topicName: 'events',
  subscriptionName: 'email-service-sub'
})

listener.addHandler('user-created', async (data) => {
  await sendWelcomeEmail(data)
})

// CommandListener in notification service (command/action routing)
const commandListener = new CommandListener({
  projectId: 'my-project',
  topicName: 'events',
  subscriptionName: 'notification-service-sub'
})

commandListener.addCommandHandler('user-created', async (data) => {
  await sendPushNotification(data)
})

Ephemeral Subscriptions

For temporary subscriptions that should only exist while your service is running, use deleteSubscriptionOnClose:

import { PubSubListener } from '@scope3/eventbus/listener'

// Create a listener with an ephemeral subscription
const listener = new PubSubListener({
  projectId: 'my-project',
  topicName: 'events',
  subscriptionName: `temp-sub-${process.pid}`, // Unique per process
  subscriptionOptions: {
    autoCreate: true,      // Create on startup
    deleteOnClose: true    // Delete on shutdown
  }
})

await listener.validate()

// Register handlers
listener.addHandler('event', async (data) => {
  console.log('Processing event:', data)
})

// Setup graceful shutdown to ensure cleanup
listener.setupGracefulShutdown()

// Subscription will be automatically deleted when:
// - Process receives SIGTERM or SIGINT
// - listener.close() is called

This pattern is useful for:

  • Development and testing environments
  • Horizontal scaling where each instance needs its own subscription
  • Temporary workers or jobs that shouldn't leave orphaned subscriptions

Subscription Management

Control subscription creation behavior:

// Require subscription to exist (fail if not found)
const listener = new PubSubListener({
  topicName: 'events',
  subscriptionName: 'existing-subscription',
  subscriptionOptions: {
    autoCreate: false  // Don't create, just error if missing
  }
})

try {
  await listener.validate()
} catch (error) {
  console.error('Subscription does not exist:', error)
}

Advanced Usage

Custom Validator

Use custom validation logic instead of Zod:

import { DatabaseEventEmitter } from '@scope3/eventbus/database'
import type { CustomValidator } from '@scope3/eventbus/database'

interface UserData {
  id: number
  username: string
  email: string
  age: number
}

const userValidator: CustomValidator<UserData> = {
  validate: (data: unknown) => {
    const obj = data as Record<string, unknown>

    if (typeof obj.id !== 'number' || obj.id <= 0) {
      return {
        success: false,
        error: new Error('Invalid user ID: must be a positive number'),
      }
    }

    if (typeof obj.username !== 'string' || obj.username.length < 3) {
      return {
        success: false,
        error: new Error('Invalid username: must be at least 3 characters'),
      }
    }

    if (
      typeof obj.email !== 'string' ||
      !obj.email.match(/^[^\s@]+@[^\s@]+\.[^\s@]+$/)
    ) {
      return {
        success: false,
        error: new Error('Invalid email format'),
      }
    }

    if (typeof obj.age !== 'number' || obj.age < 0 || obj.age > 150) {
      return {
        success: false,
        error: new Error('Invalid age: must be between 0 and 150'),
      }
    }

    return {
      success: true,
      data: obj as UserData,
    }
  },
}

DatabaseEventEmitter.setConfig({ debug: true })
DatabaseEventEmitter.addSchema('users', { schema: userValidator })

Custom Ordering Keys

Customize how ordering keys are generated for Pub/Sub message ordering:

import { EventEmitter } from '@scope3/eventbus'

EventEmitter.setConfig({
  orderingKeyGenerator: (source, timestamp) => {
    // Custom logic for ordering keys (e.g., sharding)
    const shard = timestamp % 10
    return `${source}:shard-${shard}:${timestamp}`
  },
})

Extending the Base Emitter

Create your own custom emitter by extending BaseEventEmitter:

import { BaseEventEmitter } from '@scope3/eventbus/emitter'

class MyCustomEmitter extends BaseEventEmitter {
  // Add your custom logic here
  emitWithMetadata(source: string, action: string, data: unknown, metadata: Record<string, string>) {
    // Custom implementation
    const enrichedData = { ...data, metadata }
    return this.emitEvent(source, action, enrichedData)
  }
}

export const CustomEmitter = new MyCustomEmitter()

Google Cloud Pub/Sub Integration

The library includes built-in support for Google Cloud Pub/Sub with message ordering.

Configuration

import { EventEmitter } from '@scope3/eventbus'

// Configure with Pub/Sub
EventEmitter.setConfig({
  publisherOptions: {
    projectId: 'my-gcp-project',      // Defaults to process.env.GOOGLE_PROJECT_ID
    topicName: 'events',              // Defaults to process.env.EVENT_EMITTER_TOPIC
    messageOrdering: true,            // Defaults to true
    enabled: true,                    // Defaults to true
  },
})

// Validate topic exists before use
await EventEmitter.validate()

// Emit events - they will be published to Pub/Sub
EventEmitter.emitEvent('user-service', 'user-created', { id: 1, name: 'John' })

Publisher Configuration Options

  • projectId - Google Cloud project ID (defaults to process.env.GOOGLE_PROJECT_ID)
  • topicName - Pub/Sub topic name (defaults to process.env.EVENT_EMITTER_TOPIC or 'events')
  • messageOrdering - Enable message ordering (defaults to true)
  • enabled - Enable/disable publisher (defaults to true)

Using Environment Variables

// Set environment variables
process.env.GOOGLE_PROJECT_ID = 'my-project'
process.env.EVENT_EMITTER_TOPIC = 'events'

// Use defaults from environment
EventEmitter.setConfig({
  publisherOptions: {}, // Uses environment variables
})

Disabling for Local Development

import { EventEmitter } from '@scope3/eventbus'

// Disable publishing for local development
EventEmitter.setConfig({
  publisherOptions: {
    enabled: false  // Events will not be published
  },
})

// Or omit publisherOptions entirely - events will only emit locally
EventEmitter.setConfig({})

Google Cloud Authentication

The Pub/Sub publisher uses Application Default Credentials (ADC). The Google Cloud client library searches for credentials in the following order:

  1. GOOGLE_APPLICATION_CREDENTIALS environment variable - Points to a service account key JSON file
  2. Cloud SDK credentials - From gcloud auth application-default login
  3. Workload Identity - For applications running on GKE
  4. Service Account - For applications running on GCE, Cloud Run, Cloud Functions, etc.

Local Development

# Method 1: Service Account Key File
export GOOGLE_APPLICATION_CREDENTIALS="/path/to/service-account-key.json"
export GOOGLE_PROJECT_ID="my-gcp-project"
export EVENT_EMITTER_TOPIC="events"

# Method 2: Use gcloud CLI
gcloud auth application-default login
gcloud config set project my-gcp-project

Service Account Permissions

Ensure your service account has the following IAM permissions:

  • pubsub.topics.get - To validate topic exists
  • pubsub.topics.publish - To publish messages

Grant the Pub/Sub Publisher role:

gcloud projects add-iam-policy-binding PROJECT_ID \
  --member="serviceAccount:SERVICE_ACCOUNT_EMAIL" \
  --role="roles/pubsub.publisher"

For more information, see the Google Cloud Authentication documentation.

Development

# Install dependencies
npm install

# Run tests
npm test

# Run tests in watch mode
npm run test:watch

# Run tests with coverage
npm run test:coverage

# Build the library
npm run build

# Type check
npm run type:check

# Lint
npm run lint

License

MIT