@scope3/eventbus
v1.0.0
Published
Type-safe event bus library with Google Cloud Pub/Sub integration
Readme
@scope3/eventbus
A flexible, type-safe event bus library for TypeScript applications with built-in Google Cloud Pub/Sub integration. Perfect for microservices, distributed systems, and event-driven architectures.
Features
- Pre-instantiated Singletons: Import and use directly - no instantiation needed
- Flexible Architecture: Generic event emitter + specialized database event emitter
- Type-Safe: Full TypeScript support with optional runtime validation
- Google Cloud Pub/Sub: Built-in integration with message ordering for both publishing and listening
- Modular Exports: Import just what you need via
/database,/emitter,/publisher, or/listener - Validation Options: Zod schemas or custom validators for database events
- Non-Blocking: Uses
process.nextTickto avoid blocking the event loop - Configurable: Custom ordering keys, logging, and behavior
- Flexible Message Routing: Attribute-based routing, catch-all handlers, and command/action pattern support
Installation
npm install @scope3/eventbusOptional Dependencies
# If using Zod schemas for database events (recommended)
npm install zodZod is an optional peer dependency. The library ships with Google Cloud Pub/Sub built-in.
Quick Start
Generic Event Emitter
For simple event emission without schema validation:
import { EventEmitter } from '@scope3/eventbus'
// Configure the emitter (typically done once at app startup)
EventEmitter.setConfig({
publisherOptions: {
projectId: 'my-gcp-project',
topicName: 'events',
},
debug: false,
})
// Emit events with any data
EventEmitter.emitEvent('user-service', 'user-created', {
id: 1,
email: '[email protected]',
name: 'John Doe',
})
EventEmitter.emitEvent('payment-service', 'payment-processed', {
id: 'txn-123',
amount: 99.99,
currency: 'USD',
})Database Event Emitter
For database events with schema validation:
import { DatabaseEventEmitter, DatabaseEventActions } from '@scope3/eventbus/database'
import { z } from 'zod'
// Configure the database event emitter
DatabaseEventEmitter.setConfig({
publisherOptions: {
projectId: 'my-gcp-project',
topicName: 'database-events',
},
debug: false,
})
// Add schemas for your database tables
DatabaseEventEmitter.addSchema('users', {
schema: z.object({
id: z.number(),
name: z.string(),
email: z.string().email(),
createdAt: z.date(),
updatedAt: z.date(),
}),
})
DatabaseEventEmitter.addSchema('posts', {
schema: z.object({
id: z.number(),
title: z.string(),
content: z.string(),
authorId: z.number(),
publishedAt: z.date().nullable(),
}),
})
// Emit database events with full type safety
DatabaseEventEmitter.event('users', DatabaseEventActions.Insert, {
id: 1,
name: 'John Doe',
email: '[email protected]',
createdAt: new Date(),
updatedAt: new Date(),
})
DatabaseEventEmitter.event('posts', DatabaseEventActions.Update, {
id: 1,
title: 'Updated Title',
content: 'Updated content',
authorId: 1,
publishedAt: new Date(),
})Architecture
The library provides two emitter classes:
EventEmitter - Generic event emitter for simple use cases
- No schema validation
- Flexible event emission with any data
- Perfect for application events, service notifications, etc.
DatabaseEventEmitter - Extends EventEmitter with database-specific features
- Schema validation (Zod or custom validators)
- Standardized database actions (Insert, Update, Delete)
- Type-safe event emission with schema enforcement
Both emitters:
- Share the same Pub/Sub publisher integration
- Support custom ordering keys for message delivery guarantees
- Use non-blocking event publishing with
process.nextTick - Provide configurable logging and debug modes
Modular Exports
Import just what you need:
// Generic event emitter (main export)
import { EventEmitter } from '@scope3/eventbus'
// Database-specific emitter and types
import { DatabaseEventEmitter, DatabaseEventActions } from '@scope3/eventbus/database'
// Base emitter class (for extending)
import { BaseEventEmitter } from '@scope3/eventbus/emitter'
// Publisher for advanced use cases
import { PubSubPublisher } from '@scope3/eventbus/publisher'
// Listeners for advanced use cases
import { PubSubListener, CommandListener } from '@scope3/eventbus/listener'API Reference
EventEmitter (Generic)
Pre-instantiated singleton for simple event emission.
Methods
setConfig(config)- Configure the emitterEventEmitter.setConfig({ publisherOptions?: { projectId?: string topicName?: string messageOrdering?: boolean enabled?: boolean }, logger?: Logger, orderingKeyGenerator?: (source: string, timestamp: number) => string, debug?: boolean })emitEvent(source, action, data)- Emit an eventEventEmitter.emitEvent('user-service', 'user-created', { id: 1, name: 'John Doe' })async validate()- Validate publisher configurationawait EventEmitter.validate()getConfig()- Get current configurationreset()- Reset the emitter (useful for testing)
DatabaseEventEmitter
Pre-instantiated singleton extending EventEmitter with schema validation.
Configuration Methods
setConfig(config)- Same as EventEmitter.setConfig()addSchema<T>(table, schemaConfig)- Add a single table schemaDatabaseEventEmitter.addSchema('users', { schema: z.object({ id: z.number() }) })setSchemas(schemas)- Set multiple schemas at onceDatabaseEventEmitter.setSchemas({ users: { schema: userSchema }, posts: { schema: postSchema } })
Event Methods
event(table, action, data)- Emit a database eventDatabaseEventEmitter.event('users', DatabaseEventActions.Insert, { id: 1, name: 'John' })async validate()- Validate publisher configurationawait DatabaseEventEmitter.validate()
Utility Methods
getSchemas()- Get all registered schemashasSchema(table)- Check if a schema exists for a tablegetConfig()- Get current configurationreset()- Reset the emitter (useful for testing)
DatabaseEventActions
Enum with standard database actions:
Insert- 'insert'Update- 'update'Delete- 'delete'
PubSubListener
Generic listener for receiving events from Google Cloud Pub/Sub with flexible routing options.
The PubSubListener supports two routing mechanisms:
- Attribute-based routing: Route messages based on Pub/Sub message attributes (
eventTypeorevent_type) - Catch-all handler: Process all messages with a wildcard
'*'handler
Basic Usage - Catch-All Handler
Perfect for processing all messages from a subscription:
import { PubSubListener } from '@scope3/eventbus/listener'
const listener = new PubSubListener({
projectId: 'my-gcp-project',
topicName: 'events',
subscriptionName: 'my-service-subscription',
})
await listener.validate()
// Process all messages
listener.addHandler('*', async (data) => {
console.log('Received message:', data)
// Your business logic here
})
// Graceful shutdown
process.on('SIGTERM', async () => {
await listener.close()
})Attribute-Based Routing
Route messages based on message attributes:
import { PubSubListener } from '@scope3/eventbus/listener'
const listener = new PubSubListener({
projectId: 'my-gcp-project',
topicName: 'events',
subscriptionName: 'my-service-subscription',
})
await listener.validate()
// Handle specific event types via message attributes
listener.addHandler('user-created', async (data) => {
console.log('User created:', data)
})
listener.addHandler('order-placed', async (data) => {
console.log('Order placed:', data)
})
// Catch-all for unhandled event types
listener.addHandler('*', async (data) => {
console.log('Unhandled event:', data)
})Messages should have an eventType or event_type attribute:
// Publisher side
await publisher.publish({
data: { userId: 123, name: 'John' },
attributes: {
eventType: 'user-created', // or event_type: 'user-created'
source: 'auth-service',
timestamp: Date.now().toString()
},
orderingKey: 'user-123'
})Configuration Methods
new PubSubListener(config)- Create a new listener instanceconst listener = new PubSubListener({ projectId?: string, // Defaults to process.env.GOOGLE_PROJECT_ID topicName?: string, // Defaults to process.env.EVENT_EMITTER_TOPIC or 'events' subscriptionName?: string, // Defaults to process.env.EVENT_EMITTER_SUBSCRIPTION or 'events-subscription' enabled?: boolean, // Defaults to true subscriptionOptions?: { autoCreate?: boolean, // Defaults to true - creates subscription if it doesn't exist deleteOnClose?: boolean // Defaults to false - deletes subscription when listener closes }, logger?: Logger })static async init(config, handlers?)- Factory method with optional handler registration// Basic usage - automatically validates const listener = await PubSubListener.init({ topicName: 'events', subscriptionName: 'my-service' }) // Returns undefined if disabled // With handlers array - registers handlers during initialization const listener = await PubSubListener.init( { topicName: 'events', subscriptionName: 'my-service' }, [ { eventType: 'user-created', enabled: true, handler: async (data) => { console.log('User created:', data) } }, { eventType: '*', // Catch-all enabled: true, handler: async (data) => { console.log('All events:', data) } } ] )
Handler Methods
addHandler(eventType, handler, enabled?)- Register a message handler// Specific event type listener.addHandler('user-created', async (data) => { // Handle the message }, true) // enabled defaults to true // Catch-all listener.addHandler('*', async (data) => { // Handle all messages })removeHandler(eventType)- Remove a handlerlistener.removeHandler('user-created')getHandlerEventTypes()- Get all registered handler event typesconst eventTypes = listener.getHandlerEventTypes() // ['user-created', 'order-placed', '*', ...]
Validation & Lifecycle
async validate()- Validate topic exists and create/check subscriptionawait listener.validate()async close()- Close the listener and clean up resourcesawait listener.close()setupGracefulShutdown()- Setup automatic shutdown handlers for SIGTERM/SIGINTlistener.setupGracefulShutdown() // Listener will automatically close on process termination // Does NOT call process.exit() - safe to use with other shutdown handlers like Terminus // Uses process.once() to allow handler chaining
Getters
getConfig()- Get current configurationgetTopicName()- Get the Pub/Sub topic namegetSubscriptionName()- Get the subscription nameisEnabled()- Check if listener is enabledisListening()- Check if actively listening for messages
CommandListener
Specialized listener for messages following the command/action pattern. Extends PubSubListener to provide routing based on command or action fields in the message payload.
Basic Usage
import { CommandListener } from '@scope3/eventbus/listener'
const listener = new CommandListener({
projectId: 'my-gcp-project',
topicName: 'events',
subscriptionName: 'my-service-subscription',
})
await listener.validate()
// Add command handlers
listener.addCommandHandler('user-created', async (data) => {
console.log('User created:', data)
// data contains the 'data' field from the message
})
listener.addCommandHandler('order-placed', async (data) => {
console.log('Order placed:', data)
})Message Format
The CommandListener expects messages in this format:
{
"command": "user-created",
"data": {
"id": 1,
"name": "John Doe",
"email": "[email protected]"
}
}Or using action instead of command:
{
"action": "user-created",
"data": { ... }
}Factory Method
const listener = await CommandListener.init(
{
topicName: 'events',
subscriptionName: 'my-service'
},
[
{
command: 'user-created',
enabled: true,
handler: async (data) => {
console.log('User created:', data)
}
},
{
command: 'order-placed',
enabled: process.env.NODE_ENV === 'production',
handler: async (data) => {
console.log('Order placed:', data)
}
}
]
)Handler Methods
addCommandHandler(command, handler, enabled?)- Register a command handlerlistener.addCommandHandler('user-created', async (data) => { // Handle the command })removeCommandHandler(command)- Remove a command handlerlistener.removeCommandHandler('user-created')getCommandHandlers()- Get all registered command namesconst commands = listener.getCommandHandlers() // ['user-created', 'order-placed', ...]
Error Handling (Both Listeners)
Handlers should handle errors internally. If a handler throws an error, the message will still be acknowledged to prevent infinite redelivery:
// PubSubListener
listener.addHandler('user-created', async (data) => {
try {
await processUser(data)
} catch (error) {
console.error('Error processing user:', error)
}
})
// CommandListener
listener.addCommandHandler('user-created', async (data) => {
try {
await processUser(data)
} catch (error) {
console.error('Error processing user:', error)
}
})Type-Safe Handlers (Both Listeners)
Use TypeScript generics for type-safe message handling:
interface UserCreatedData {
id: number
email: string
name: string
}
// PubSubListener
listener.addHandler<UserCreatedData>('user-created', async (data) => {
if (!data) return
console.log(`User ${data.name} created with email ${data.email}`)
})
// CommandListener
listener.addCommandHandler<UserCreatedData>('user-created', async (data) => {
if (!data) return
console.log(`User ${data.name} created with email ${data.email}`)
})Integration with Publisher
Use listeners alongside the publisher for full event-driven workflows:
import { PubSubPublisher } from '@scope3/eventbus/publisher'
import { PubSubListener, CommandListener } from '@scope3/eventbus/listener'
// Publisher in auth service
const publisher = new PubSubPublisher({
projectId: 'my-project',
topicName: 'events'
})
// Option 1: Publish with eventType attribute (for PubSubListener)
await publisher.publish({
data: { userId: 123, name: 'John' },
attributes: {
eventType: 'user-created',
source: 'auth-service',
timestamp: Date.now().toString()
},
orderingKey: 'user-123'
})
// Option 2: Publish with command/action in payload (for CommandListener)
await publisher.publish({
data: {
command: 'user-created',
data: { userId: 123, name: 'John' }
},
attributes: {
source: 'auth-service',
timestamp: Date.now().toString()
},
orderingKey: 'user-123'
})
// PubSubListener in email service (attribute-based routing)
const listener = new PubSubListener({
projectId: 'my-project',
topicName: 'events',
subscriptionName: 'email-service-sub'
})
listener.addHandler('user-created', async (data) => {
await sendWelcomeEmail(data)
})
// CommandListener in notification service (command/action routing)
const commandListener = new CommandListener({
projectId: 'my-project',
topicName: 'events',
subscriptionName: 'notification-service-sub'
})
commandListener.addCommandHandler('user-created', async (data) => {
await sendPushNotification(data)
})Ephemeral Subscriptions
For temporary subscriptions that should only exist while your service is running, use deleteSubscriptionOnClose:
import { PubSubListener } from '@scope3/eventbus/listener'
// Create a listener with an ephemeral subscription
const listener = new PubSubListener({
projectId: 'my-project',
topicName: 'events',
subscriptionName: `temp-sub-${process.pid}`, // Unique per process
subscriptionOptions: {
autoCreate: true, // Create on startup
deleteOnClose: true // Delete on shutdown
}
})
await listener.validate()
// Register handlers
listener.addHandler('event', async (data) => {
console.log('Processing event:', data)
})
// Setup graceful shutdown to ensure cleanup
listener.setupGracefulShutdown()
// Subscription will be automatically deleted when:
// - Process receives SIGTERM or SIGINT
// - listener.close() is calledThis pattern is useful for:
- Development and testing environments
- Horizontal scaling where each instance needs its own subscription
- Temporary workers or jobs that shouldn't leave orphaned subscriptions
Subscription Management
Control subscription creation behavior:
// Require subscription to exist (fail if not found)
const listener = new PubSubListener({
topicName: 'events',
subscriptionName: 'existing-subscription',
subscriptionOptions: {
autoCreate: false // Don't create, just error if missing
}
})
try {
await listener.validate()
} catch (error) {
console.error('Subscription does not exist:', error)
}Advanced Usage
Custom Validator
Use custom validation logic instead of Zod:
import { DatabaseEventEmitter } from '@scope3/eventbus/database'
import type { CustomValidator } from '@scope3/eventbus/database'
interface UserData {
id: number
username: string
email: string
age: number
}
const userValidator: CustomValidator<UserData> = {
validate: (data: unknown) => {
const obj = data as Record<string, unknown>
if (typeof obj.id !== 'number' || obj.id <= 0) {
return {
success: false,
error: new Error('Invalid user ID: must be a positive number'),
}
}
if (typeof obj.username !== 'string' || obj.username.length < 3) {
return {
success: false,
error: new Error('Invalid username: must be at least 3 characters'),
}
}
if (
typeof obj.email !== 'string' ||
!obj.email.match(/^[^\s@]+@[^\s@]+\.[^\s@]+$/)
) {
return {
success: false,
error: new Error('Invalid email format'),
}
}
if (typeof obj.age !== 'number' || obj.age < 0 || obj.age > 150) {
return {
success: false,
error: new Error('Invalid age: must be between 0 and 150'),
}
}
return {
success: true,
data: obj as UserData,
}
},
}
DatabaseEventEmitter.setConfig({ debug: true })
DatabaseEventEmitter.addSchema('users', { schema: userValidator })Custom Ordering Keys
Customize how ordering keys are generated for Pub/Sub message ordering:
import { EventEmitter } from '@scope3/eventbus'
EventEmitter.setConfig({
orderingKeyGenerator: (source, timestamp) => {
// Custom logic for ordering keys (e.g., sharding)
const shard = timestamp % 10
return `${source}:shard-${shard}:${timestamp}`
},
})Extending the Base Emitter
Create your own custom emitter by extending BaseEventEmitter:
import { BaseEventEmitter } from '@scope3/eventbus/emitter'
class MyCustomEmitter extends BaseEventEmitter {
// Add your custom logic here
emitWithMetadata(source: string, action: string, data: unknown, metadata: Record<string, string>) {
// Custom implementation
const enrichedData = { ...data, metadata }
return this.emitEvent(source, action, enrichedData)
}
}
export const CustomEmitter = new MyCustomEmitter()Google Cloud Pub/Sub Integration
The library includes built-in support for Google Cloud Pub/Sub with message ordering.
Configuration
import { EventEmitter } from '@scope3/eventbus'
// Configure with Pub/Sub
EventEmitter.setConfig({
publisherOptions: {
projectId: 'my-gcp-project', // Defaults to process.env.GOOGLE_PROJECT_ID
topicName: 'events', // Defaults to process.env.EVENT_EMITTER_TOPIC
messageOrdering: true, // Defaults to true
enabled: true, // Defaults to true
},
})
// Validate topic exists before use
await EventEmitter.validate()
// Emit events - they will be published to Pub/Sub
EventEmitter.emitEvent('user-service', 'user-created', { id: 1, name: 'John' })Publisher Configuration Options
projectId- Google Cloud project ID (defaults toprocess.env.GOOGLE_PROJECT_ID)topicName- Pub/Sub topic name (defaults toprocess.env.EVENT_EMITTER_TOPICor'events')messageOrdering- Enable message ordering (defaults totrue)enabled- Enable/disable publisher (defaults totrue)
Using Environment Variables
// Set environment variables
process.env.GOOGLE_PROJECT_ID = 'my-project'
process.env.EVENT_EMITTER_TOPIC = 'events'
// Use defaults from environment
EventEmitter.setConfig({
publisherOptions: {}, // Uses environment variables
})Disabling for Local Development
import { EventEmitter } from '@scope3/eventbus'
// Disable publishing for local development
EventEmitter.setConfig({
publisherOptions: {
enabled: false // Events will not be published
},
})
// Or omit publisherOptions entirely - events will only emit locally
EventEmitter.setConfig({})Google Cloud Authentication
The Pub/Sub publisher uses Application Default Credentials (ADC). The Google Cloud client library searches for credentials in the following order:
- GOOGLE_APPLICATION_CREDENTIALS environment variable - Points to a service account key JSON file
- Cloud SDK credentials - From
gcloud auth application-default login - Workload Identity - For applications running on GKE
- Service Account - For applications running on GCE, Cloud Run, Cloud Functions, etc.
Local Development
# Method 1: Service Account Key File
export GOOGLE_APPLICATION_CREDENTIALS="/path/to/service-account-key.json"
export GOOGLE_PROJECT_ID="my-gcp-project"
export EVENT_EMITTER_TOPIC="events"
# Method 2: Use gcloud CLI
gcloud auth application-default login
gcloud config set project my-gcp-projectService Account Permissions
Ensure your service account has the following IAM permissions:
pubsub.topics.get- To validate topic existspubsub.topics.publish- To publish messages
Grant the Pub/Sub Publisher role:
gcloud projects add-iam-policy-binding PROJECT_ID \
--member="serviceAccount:SERVICE_ACCOUNT_EMAIL" \
--role="roles/pubsub.publisher"For more information, see the Google Cloud Authentication documentation.
Development
# Install dependencies
npm install
# Run tests
npm test
# Run tests in watch mode
npm run test:watch
# Run tests with coverage
npm run test:coverage
# Build the library
npm run build
# Type check
npm run type:check
# Lint
npm run lintLicense
MIT
