npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

stream-engine-event-bus

v0.0.2

Published

A reusable, standalone Node.js library that abstracts event consumption from AWS. The library acts as a high-level wrapper around `@aws-sdk/client-sqs` and `@aws-sdk/client-sns`, supporting both production and development modes with automatic resource man

Readme

Stream Engine Event Bus

A reusable, standalone Node.js library that abstracts event consumption from AWS. The library acts as a high-level wrapper around @aws-sdk/client-sqs and @aws-sdk/client-sns, supporting both production and development modes with automatic resource management.

Features

  • Production Mode: Connect to existing SQS queues with zero configuration
  • Development Mode: Automatic ephemeral queue creation and cleanup for local development
  • Ownership Filtering: Built-in "noisy neighbor" protection for shared development environments
  • Event Emitter Pattern: Familiar Node.js event handling API
  • Automatic Cleanup: Dev queues are automatically removed on shutdown

Installation

pnpm add stream-engine-event-bus

Prerequisites

Production Mode

  • An existing SQS queue URL
  • AWS credentials configured (via IAM role, environment variables, or AWS credentials file)

Development Mode

  • AWS credentials with permissions to:
    • Create/delete SQS queues
    • Get/set queue attributes
    • Subscribe/unsubscribe from SNS topics
  • The ARN of the central SNS topic to subscribe to

Quick Start

Production Mode

const EventBusConsumer = require('stream-engine-event-bus');

const consumer = new EventBusConsumer({
  region: 'us-east-1',
  mode: 'prod',
  staticQueueUrl: 'https://sqs.us-east-1.amazonaws.com/123456789012/my-queue',
  checkOwnership: async (payload) => {
    // In production, you might check if the event belongs to your service
    // For example, check a tenant ID, service name, etc.
    return payload.service === 'my-service';
  }
});

// Register event handlers
// Note: Events are received in batches (up to 10), but each event is emitted individually
// All payloads are automatically parsed as JSON objects
consumer.on('user.created', (event) => {
  // event is a JSON object: { userId: '123', name: 'John Doe', email: '[email protected]' }
  console.log('User created:', event);
});

consumer.on('order.completed', (event) => {
  // event is a JSON object: { orderId: '456', total: 99.99, status: 'completed' }
  console.log('Order completed:', event);
});

// Start consuming (signal handlers are automatically set up in dev mode)
await consumer.start();

// Graceful shutdown is handled automatically when enableGracefulShutdown is enabled (default in dev mode)
// No need to manually set up signal handlers!

Development Mode

const EventBusConsumer = require('stream-engine-event-bus');

const consumer = new EventBusConsumer({
  region: 'us-east-1',
  mode: 'dev',
  snsTopicArn: 'arn:aws:sns:us-east-1:123456789012:central-event-topic',
  checkOwnership: async (payload) => {
    // Filter events based on your development needs
    // For example, only process events for your user ID or test tenant
    return payload.userId === process.env.DEV_USER_ID || 
           payload.tenantId === 'dev-tenant-123';
  }
});

// Register event handlers
// Note: Events are received in batches (up to 10), but each event is emitted individually
// All payloads are automatically parsed as JSON objects
consumer.on('user.created', (event) => {
  // event is a JSON object: { userId: '123', name: 'John Doe', email: '[email protected]' }
  console.log('User created:', event);
});

consumer.on('order.completed', (event) => {
  // event is a JSON object: { orderId: '456', total: 99.99, status: 'completed' }
  console.log('Order completed:', event);
});

// Start consuming (creates ephemeral queue automatically)
// Signal handlers are automatically set up in dev mode for graceful shutdown
await consumer.start();

// No need to manually handle signals - the library does it for you!

API Reference

Constructor

new EventBusConsumer(config)

Configuration Options

| Option | Type | Required | Description | |--------|------|----------|-------------| | region | string | Yes | AWS region (e.g., 'us-east-1') | | mode | 'prod' \| 'dev' | Yes | Runtime mode | | staticQueueUrl | string | Yes (prod) | SQS queue URL for production mode | | snsTopicArn | string | Yes (dev) | SNS topic ARN for development mode | | checkOwnership | function | Yes | Async function that returns true if the event should be processed | | enableGracefulShutdown | boolean | No | Automatically handle SIGINT/SIGTERM for graceful shutdown (default: true in dev, false in prod) | | accessKeyId | string | No | AWS access key ID (falls back to AWS_ACCESS_KEY_ID env var) | | secretAccessKey | string | No | AWS secret access key (falls back to AWS_SECRET_ACCESS_KEY env var) | | sessionToken | string | No | AWS session token for temporary credentials (falls back to AWS_SESSION_TOKEN env var) |

checkOwnership Function

The ownership check function is critical for filtering events, especially in development mode where multiple developers share the same SNS topic.

Signature:

async (eventPayload: any) => Promise<boolean>

Parameters:

  • eventPayload: The parsed event payload (after SNS envelope unwrapping)

Returns:

  • true: Process this event
  • false: Ignore this event (message will be deleted immediately)

Example:

checkOwnership: async (payload) => {
  // Example: Only process events for your development user
  if (payload.userId && payload.userId === process.env.DEV_USER_ID) {
    return true;
  }
  
  // Example: Only process events for your test tenant
  if (payload.tenantId && payload.tenantId === 'my-test-tenant') {
    return true;
  }
  
  // Example: Process all events of a specific type
  if (payload['detail-type'] === 'TestEvent') {
    return true;
  }
  
  return false;
}

Methods

start()

Initializes the consumer and starts the polling loop. In development mode, this will:

  1. Create an ephemeral SQS queue
  2. Set up permissions for SNS to write to the queue
  3. Subscribe the queue to the SNS topic
await consumer.start();

stop()

Stops the polling loop and cleans up resources. In development mode, this will:

  1. Unsubscribe from the SNS topic
  2. Delete the ephemeral queue
await consumer.stop();

Event Handling

The consumer extends Node.js EventEmitter, so you can use standard event handling methods. All event payloads are automatically parsed as JSON objects from the SNS message body.

// Register a handler
consumer.on('event.type', (payload) => {
  // payload is a JSON object
  console.log('Received event:', payload);
});

// Register a one-time handler
consumer.once('event.type', (payload) => {
  // payload is a JSON object
  console.log('Received event once:', payload);
});

// Remove a handler
consumer.off('event.type', handler);

// Remove all handlers for an event
consumer.removeAllListeners('event.type');

Event Type Detection

The library determines the event type from the payload in this order:

  1. payload['detail-type'] (EventBridge standard)
  2. payload.type
  3. 'default' (fallback)

Batch Processing

The library receives messages in batches (up to 10 messages per SQS receive call), but each message is processed and emitted individually. This means:

  • Multiple events of the same type can arrive in a single batch
  • Each event triggers its handler separately
  • Events are processed in parallel within a batch

Example: Handling Multiple Events in a Batch

const EventBusConsumer = require('stream-engine-event-bus');

const consumer = new EventBusConsumer({
  region: 'us-east-1',
  mode: 'dev',
  snsTopicArn: 'arn:aws:sns:us-east-1:123456789012:central-event-topic',
  checkOwnership: async (payload) => {
    return true; // Process all events
  }
});

// Counter to track how many events we've processed
let userCreatedCount = 0;
let orderCompletedCount = 0;

// Handler for user.created events
// This handler may be called multiple times if multiple user.created events arrive in a batch
consumer.on('user.created', async (event) => {
  // event is a JSON object: { userId: '123', name: 'John', email: '[email protected]' }
  userCreatedCount++;
  console.log(`[${userCreatedCount}] User created:`, event);
  
  // Process the event (e.g., save to database, send notification, etc.)
  await processUserCreated(event);
});

// Handler for order.completed events
consumer.on('order.completed', async (event) => {
  // event is a JSON object: { orderId: '456', total: 99.99, status: 'completed' }
  orderCompletedCount++;
  console.log(`[${orderCompletedCount}] Order completed:`, event);
  
  // Process the event
  await processOrderCompleted(event);
});

// Helper functions
async function processUserCreated(event) {
  // Example: Save to database
  // await db.users.create(event);
  console.log('Processing user creation:', event.userId);
}

async function processOrderCompleted(event) {
  // Example: Update inventory, send email, etc.
  // await db.orders.update(event.orderId, { status: 'completed' });
  console.log('Processing order completion:', event.orderId);
}

await consumer.start();

Example: Collecting Events from a Batch

If you need to process multiple events together (e.g., batch database inserts), you can collect them:

const EventBusConsumer = require('stream-engine-event-bus');

// Buffer to collect events
const eventBuffer = {
  'user.created': [],
  'order.completed': []
};

// Flush interval (process collected events every 5 seconds)
setInterval(() => {
  if (eventBuffer['user.created'].length > 0) {
    const events = eventBuffer['user.created'].splice(0);
    console.log(`Processing ${events.length} user.created events in batch`);
    // Batch insert to database
    // await db.users.bulkCreate(events);
  }
  
  if (eventBuffer['order.completed'].length > 0) {
    const events = eventBuffer['order.completed'].splice(0);
    console.log(`Processing ${events.length} order.completed events in batch`);
    // Batch update orders
    // await db.orders.bulkUpdate(events);
  }
}, 5000);

const consumer = new EventBusConsumer({
  region: 'us-east-1',
  mode: 'dev',
  snsTopicArn: 'arn:aws:sns:us-east-1:123456789012:central-event-topic',
  checkOwnership: async (payload) => {
    return true;
  }
});

// Collect events instead of processing immediately
consumer.on('user.created', (event) => {
  // event is a JSON object
  eventBuffer['user.created'].push(event);
});

consumer.on('order.completed', (event) => {
  // event is a JSON object
  eventBuffer['order.completed'].push(event);
});

await consumer.start();

Example: Processing Events with Error Handling

const EventBusConsumer = require('stream-engine-event-bus');

const consumer = new EventBusConsumer({
  region: 'us-east-1',
  mode: 'dev',
  snsTopicArn: 'arn:aws:sns:us-east-1:123456789012:central-event-topic',
  checkOwnership: async (payload) => {
    return true;
  }
});

consumer.on('user.created', async (event) => {
  // event is a JSON object
  try {
    // Validate the event
    if (!event.userId || !event.email) {
      console.error('Invalid user.created event:', event);
      return;
    }
    
    // Process the event
    await saveUserToDatabase(event);
    await sendWelcomeEmail(event);
    
    console.log('Successfully processed user.created:', event.userId);
  } catch (error) {
    // Error handling - the message will be retried if processing fails
    console.error('Error processing user.created event:', error, event);
    // Optionally, you can emit an error event
    consumer.emit('error', { type: 'user.created', event, error });
  }
});

await consumer.start();

How It Works

Production Mode

  1. Connects directly to the provided SQS queue URL
  2. Starts long-polling (20 second wait time)
  3. Receives messages in batches (up to 10 messages)
  4. Parses SNS envelope and extracts the event payload
  5. Filters events using checkOwnership
  6. Emits events to registered handlers
  7. Deletes messages after successful processing

Development Mode

  1. Setup Phase (start()):

    • Generates a unique queue name: dev-{hostname}-{uuid}
    • Creates the SQS queue
    • Retrieves the queue ARN
    • Generates and attaches an SQS policy allowing the SNS topic to send messages
    • Subscribes the queue to the SNS topic
  2. Runtime Phase:

    • Same polling and processing logic as production mode
  3. Cleanup Phase (stop()):

    • Unsubscribes from the SNS topic
    • Deletes the ephemeral queue

Message Flow

AWS EventBridge → SNS Topic → SQS Queue → EventBusConsumer → Your Handlers
  1. Events are published to EventBridge
  2. EventBridge rules route events to the central SNS topic
  3. SNS forwards events to subscribed SQS queues
  4. EventBusConsumer polls the queue and processes messages
  5. Ownership filter determines if the event should be processed
  6. Events are emitted to registered handlers

Error Handling

  • Polling Errors: If the polling loop encounters an error, it logs the error and waits 5 seconds before retrying
  • Processing Errors: If message processing fails, the message is NOT deleted, allowing it to be retried after the visibility timeout expires
  • Teardown Errors: Errors during cleanup are logged but don't throw, ensuring the process can exit cleanly

Signal Handling

The library can automatically handle process termination signals for graceful shutdown. This is enabled by default in dev mode and disabled by default in prod mode.

Automatic Signal Handling (Dev Mode)

In development mode, the library automatically sets up handlers for:

  • SIGINT (Ctrl+C)
  • SIGTERM (termination signal)
  • SIGUSR2 (nodemon restart)

When these signals are received, the library will:

  1. Stop the polling loop
  2. Clean up ephemeral queues (in dev mode)
  3. Exit the process gracefully

You can disable this behavior by setting enableGracefulShutdown: false:

const consumer = new EventBusConsumer({
  // ... other config
  mode: 'dev',
  enableGracefulShutdown: false, // Disable automatic signal handling
});

Manual Signal Handling (Production)

In production mode, signal handling is disabled by default. If you need graceful shutdown in production, you can enable it:

const consumer = new EventBusConsumer({
  // ... other config
  mode: 'prod',
  enableGracefulShutdown: true, // Enable automatic signal handling
});

Or handle signals manually:

process.on('SIGTERM', async () => {
  await consumer.stop();
  process.exit(0);
});

Best Practices

  1. Use automatic signal handling in dev: The default behavior (enabled in dev mode) ensures proper cleanup
  2. Disable in production if not needed: Signal handling is disabled by default in prod mode
  3. Implement robust ownership checks: In development, be specific about which events to process to avoid conflicts
  4. Handle errors in event handlers: Wrap your handler logic in try-catch blocks
  5. Use environment variables: Store queue URLs and topic ARNs in environment variables
  6. Monitor queue depth: In production, monitor your SQS queue metrics to ensure messages are being processed

Example: Complete Application

const EventBusConsumer = require('stream-engine-event-bus');

async function main() {
  const consumer = new EventBusConsumer({
    region: process.env.AWS_REGION || 'us-east-1',
    mode: process.env.NODE_ENV === 'production' ? 'prod' : 'dev',
    staticQueueUrl: process.env.SQS_QUEUE_URL,
    snsTopicArn: process.env.SNS_TOPIC_ARN,
    checkOwnership: async (payload) => {
      // Production: process all events
      if (process.env.NODE_ENV === 'production') {
        return true;
      }
      
      // Development: only process events for this developer
      return payload.devUserId === process.env.DEV_USER_ID;
    }
  });

  // Register handlers
  consumer.on('user.created', handleUserCreated);
  consumer.on('user.updated', handleUserUpdated);
  consumer.on('order.completed', handleOrderCompleted);

  // Start consuming
  await consumer.start();
  console.log('Event bus consumer started');

  // Graceful shutdown
  process.on('SIGINT', async () => {
    console.log('Shutting down...');
    await consumer.stop();
    process.exit(0);
  });

  process.on('SIGTERM', async () => {
    console.log('Shutting down...');
    await consumer.stop();
    process.exit(0);
  });
}

async function handleUserCreated(event) {
  // event is a JSON object: { userId: '123', name: 'John', email: '[email protected]' }
  console.log('User created:', event);
  // Your business logic here
}

async function handleUserUpdated(event) {
  // event is a JSON object: { userId: '123', name: 'John Updated', email: '[email protected]' }
  console.log('User updated:', event);
  // Your business logic here
}

async function handleOrderCompleted(event) {
  // event is a JSON object: { orderId: '456', total: 99.99, status: 'completed' }
  console.log('Order completed:', event);
  // Your business logic here
}

main().catch(console.error);

License

MIT