stream-engine-event-bus
v0.0.2
Published
A reusable, standalone Node.js library that abstracts event consumption from AWS. The library acts as a high-level wrapper around `@aws-sdk/client-sqs` and `@aws-sdk/client-sns`, supporting both production and development modes with automatic resource man
Readme
Stream Engine Event Bus
A reusable, standalone Node.js library that abstracts event consumption from AWS. The library acts as a high-level wrapper around @aws-sdk/client-sqs and @aws-sdk/client-sns, supporting both production and development modes with automatic resource management.
Features
- Production Mode: Connect to existing SQS queues with zero configuration
- Development Mode: Automatic ephemeral queue creation and cleanup for local development
- Ownership Filtering: Built-in "noisy neighbor" protection for shared development environments
- Event Emitter Pattern: Familiar Node.js event handling API
- Automatic Cleanup: Dev queues are automatically removed on shutdown
Installation
pnpm add stream-engine-event-busPrerequisites
Production Mode
- An existing SQS queue URL
- AWS credentials configured (via IAM role, environment variables, or AWS credentials file)
Development Mode
- AWS credentials with permissions to:
- Create/delete SQS queues
- Get/set queue attributes
- Subscribe/unsubscribe from SNS topics
- The ARN of the central SNS topic to subscribe to
Quick Start
Production Mode
const EventBusConsumer = require('stream-engine-event-bus');
const consumer = new EventBusConsumer({
region: 'us-east-1',
mode: 'prod',
staticQueueUrl: 'https://sqs.us-east-1.amazonaws.com/123456789012/my-queue',
checkOwnership: async (payload) => {
// In production, you might check if the event belongs to your service
// For example, check a tenant ID, service name, etc.
return payload.service === 'my-service';
}
});
// Register event handlers
// Note: Events are received in batches (up to 10), but each event is emitted individually
// All payloads are automatically parsed as JSON objects
consumer.on('user.created', (event) => {
// event is a JSON object: { userId: '123', name: 'John Doe', email: '[email protected]' }
console.log('User created:', event);
});
consumer.on('order.completed', (event) => {
// event is a JSON object: { orderId: '456', total: 99.99, status: 'completed' }
console.log('Order completed:', event);
});
// Start consuming (signal handlers are automatically set up in dev mode)
await consumer.start();
// Graceful shutdown is handled automatically when enableGracefulShutdown is enabled (default in dev mode)
// No need to manually set up signal handlers!Development Mode
const EventBusConsumer = require('stream-engine-event-bus');
const consumer = new EventBusConsumer({
region: 'us-east-1',
mode: 'dev',
snsTopicArn: 'arn:aws:sns:us-east-1:123456789012:central-event-topic',
checkOwnership: async (payload) => {
// Filter events based on your development needs
// For example, only process events for your user ID or test tenant
return payload.userId === process.env.DEV_USER_ID ||
payload.tenantId === 'dev-tenant-123';
}
});
// Register event handlers
// Note: Events are received in batches (up to 10), but each event is emitted individually
// All payloads are automatically parsed as JSON objects
consumer.on('user.created', (event) => {
// event is a JSON object: { userId: '123', name: 'John Doe', email: '[email protected]' }
console.log('User created:', event);
});
consumer.on('order.completed', (event) => {
// event is a JSON object: { orderId: '456', total: 99.99, status: 'completed' }
console.log('Order completed:', event);
});
// Start consuming (creates ephemeral queue automatically)
// Signal handlers are automatically set up in dev mode for graceful shutdown
await consumer.start();
// No need to manually handle signals - the library does it for you!API Reference
Constructor
new EventBusConsumer(config)Configuration Options
| Option | Type | Required | Description |
|--------|------|----------|-------------|
| region | string | Yes | AWS region (e.g., 'us-east-1') |
| mode | 'prod' \| 'dev' | Yes | Runtime mode |
| staticQueueUrl | string | Yes (prod) | SQS queue URL for production mode |
| snsTopicArn | string | Yes (dev) | SNS topic ARN for development mode |
| checkOwnership | function | Yes | Async function that returns true if the event should be processed |
| enableGracefulShutdown | boolean | No | Automatically handle SIGINT/SIGTERM for graceful shutdown (default: true in dev, false in prod) |
| accessKeyId | string | No | AWS access key ID (falls back to AWS_ACCESS_KEY_ID env var) |
| secretAccessKey | string | No | AWS secret access key (falls back to AWS_SECRET_ACCESS_KEY env var) |
| sessionToken | string | No | AWS session token for temporary credentials (falls back to AWS_SESSION_TOKEN env var) |
checkOwnership Function
The ownership check function is critical for filtering events, especially in development mode where multiple developers share the same SNS topic.
Signature:
async (eventPayload: any) => Promise<boolean>Parameters:
eventPayload: The parsed event payload (after SNS envelope unwrapping)
Returns:
true: Process this eventfalse: Ignore this event (message will be deleted immediately)
Example:
checkOwnership: async (payload) => {
// Example: Only process events for your development user
if (payload.userId && payload.userId === process.env.DEV_USER_ID) {
return true;
}
// Example: Only process events for your test tenant
if (payload.tenantId && payload.tenantId === 'my-test-tenant') {
return true;
}
// Example: Process all events of a specific type
if (payload['detail-type'] === 'TestEvent') {
return true;
}
return false;
}Methods
start()
Initializes the consumer and starts the polling loop. In development mode, this will:
- Create an ephemeral SQS queue
- Set up permissions for SNS to write to the queue
- Subscribe the queue to the SNS topic
await consumer.start();stop()
Stops the polling loop and cleans up resources. In development mode, this will:
- Unsubscribe from the SNS topic
- Delete the ephemeral queue
await consumer.stop();Event Handling
The consumer extends Node.js EventEmitter, so you can use standard event handling methods. All event payloads are automatically parsed as JSON objects from the SNS message body.
// Register a handler
consumer.on('event.type', (payload) => {
// payload is a JSON object
console.log('Received event:', payload);
});
// Register a one-time handler
consumer.once('event.type', (payload) => {
// payload is a JSON object
console.log('Received event once:', payload);
});
// Remove a handler
consumer.off('event.type', handler);
// Remove all handlers for an event
consumer.removeAllListeners('event.type');Event Type Detection
The library determines the event type from the payload in this order:
payload['detail-type'](EventBridge standard)payload.type'default'(fallback)
Batch Processing
The library receives messages in batches (up to 10 messages per SQS receive call), but each message is processed and emitted individually. This means:
- Multiple events of the same type can arrive in a single batch
- Each event triggers its handler separately
- Events are processed in parallel within a batch
Example: Handling Multiple Events in a Batch
const EventBusConsumer = require('stream-engine-event-bus');
const consumer = new EventBusConsumer({
region: 'us-east-1',
mode: 'dev',
snsTopicArn: 'arn:aws:sns:us-east-1:123456789012:central-event-topic',
checkOwnership: async (payload) => {
return true; // Process all events
}
});
// Counter to track how many events we've processed
let userCreatedCount = 0;
let orderCompletedCount = 0;
// Handler for user.created events
// This handler may be called multiple times if multiple user.created events arrive in a batch
consumer.on('user.created', async (event) => {
// event is a JSON object: { userId: '123', name: 'John', email: '[email protected]' }
userCreatedCount++;
console.log(`[${userCreatedCount}] User created:`, event);
// Process the event (e.g., save to database, send notification, etc.)
await processUserCreated(event);
});
// Handler for order.completed events
consumer.on('order.completed', async (event) => {
// event is a JSON object: { orderId: '456', total: 99.99, status: 'completed' }
orderCompletedCount++;
console.log(`[${orderCompletedCount}] Order completed:`, event);
// Process the event
await processOrderCompleted(event);
});
// Helper functions
async function processUserCreated(event) {
// Example: Save to database
// await db.users.create(event);
console.log('Processing user creation:', event.userId);
}
async function processOrderCompleted(event) {
// Example: Update inventory, send email, etc.
// await db.orders.update(event.orderId, { status: 'completed' });
console.log('Processing order completion:', event.orderId);
}
await consumer.start();Example: Collecting Events from a Batch
If you need to process multiple events together (e.g., batch database inserts), you can collect them:
const EventBusConsumer = require('stream-engine-event-bus');
// Buffer to collect events
const eventBuffer = {
'user.created': [],
'order.completed': []
};
// Flush interval (process collected events every 5 seconds)
setInterval(() => {
if (eventBuffer['user.created'].length > 0) {
const events = eventBuffer['user.created'].splice(0);
console.log(`Processing ${events.length} user.created events in batch`);
// Batch insert to database
// await db.users.bulkCreate(events);
}
if (eventBuffer['order.completed'].length > 0) {
const events = eventBuffer['order.completed'].splice(0);
console.log(`Processing ${events.length} order.completed events in batch`);
// Batch update orders
// await db.orders.bulkUpdate(events);
}
}, 5000);
const consumer = new EventBusConsumer({
region: 'us-east-1',
mode: 'dev',
snsTopicArn: 'arn:aws:sns:us-east-1:123456789012:central-event-topic',
checkOwnership: async (payload) => {
return true;
}
});
// Collect events instead of processing immediately
consumer.on('user.created', (event) => {
// event is a JSON object
eventBuffer['user.created'].push(event);
});
consumer.on('order.completed', (event) => {
// event is a JSON object
eventBuffer['order.completed'].push(event);
});
await consumer.start();Example: Processing Events with Error Handling
const EventBusConsumer = require('stream-engine-event-bus');
const consumer = new EventBusConsumer({
region: 'us-east-1',
mode: 'dev',
snsTopicArn: 'arn:aws:sns:us-east-1:123456789012:central-event-topic',
checkOwnership: async (payload) => {
return true;
}
});
consumer.on('user.created', async (event) => {
// event is a JSON object
try {
// Validate the event
if (!event.userId || !event.email) {
console.error('Invalid user.created event:', event);
return;
}
// Process the event
await saveUserToDatabase(event);
await sendWelcomeEmail(event);
console.log('Successfully processed user.created:', event.userId);
} catch (error) {
// Error handling - the message will be retried if processing fails
console.error('Error processing user.created event:', error, event);
// Optionally, you can emit an error event
consumer.emit('error', { type: 'user.created', event, error });
}
});
await consumer.start();How It Works
Production Mode
- Connects directly to the provided SQS queue URL
- Starts long-polling (20 second wait time)
- Receives messages in batches (up to 10 messages)
- Parses SNS envelope and extracts the event payload
- Filters events using
checkOwnership - Emits events to registered handlers
- Deletes messages after successful processing
Development Mode
Setup Phase (
start()):- Generates a unique queue name:
dev-{hostname}-{uuid} - Creates the SQS queue
- Retrieves the queue ARN
- Generates and attaches an SQS policy allowing the SNS topic to send messages
- Subscribes the queue to the SNS topic
- Generates a unique queue name:
Runtime Phase:
- Same polling and processing logic as production mode
Cleanup Phase (
stop()):- Unsubscribes from the SNS topic
- Deletes the ephemeral queue
Message Flow
AWS EventBridge → SNS Topic → SQS Queue → EventBusConsumer → Your Handlers- Events are published to EventBridge
- EventBridge rules route events to the central SNS topic
- SNS forwards events to subscribed SQS queues
- EventBusConsumer polls the queue and processes messages
- Ownership filter determines if the event should be processed
- Events are emitted to registered handlers
Error Handling
- Polling Errors: If the polling loop encounters an error, it logs the error and waits 5 seconds before retrying
- Processing Errors: If message processing fails, the message is NOT deleted, allowing it to be retried after the visibility timeout expires
- Teardown Errors: Errors during cleanup are logged but don't throw, ensuring the process can exit cleanly
Signal Handling
The library can automatically handle process termination signals for graceful shutdown. This is enabled by default in dev mode and disabled by default in prod mode.
Automatic Signal Handling (Dev Mode)
In development mode, the library automatically sets up handlers for:
SIGINT(Ctrl+C)SIGTERM(termination signal)SIGUSR2(nodemon restart)
When these signals are received, the library will:
- Stop the polling loop
- Clean up ephemeral queues (in dev mode)
- Exit the process gracefully
You can disable this behavior by setting enableGracefulShutdown: false:
const consumer = new EventBusConsumer({
// ... other config
mode: 'dev',
enableGracefulShutdown: false, // Disable automatic signal handling
});Manual Signal Handling (Production)
In production mode, signal handling is disabled by default. If you need graceful shutdown in production, you can enable it:
const consumer = new EventBusConsumer({
// ... other config
mode: 'prod',
enableGracefulShutdown: true, // Enable automatic signal handling
});Or handle signals manually:
process.on('SIGTERM', async () => {
await consumer.stop();
process.exit(0);
});Best Practices
- Use automatic signal handling in dev: The default behavior (enabled in dev mode) ensures proper cleanup
- Disable in production if not needed: Signal handling is disabled by default in prod mode
- Implement robust ownership checks: In development, be specific about which events to process to avoid conflicts
- Handle errors in event handlers: Wrap your handler logic in try-catch blocks
- Use environment variables: Store queue URLs and topic ARNs in environment variables
- Monitor queue depth: In production, monitor your SQS queue metrics to ensure messages are being processed
Example: Complete Application
const EventBusConsumer = require('stream-engine-event-bus');
async function main() {
const consumer = new EventBusConsumer({
region: process.env.AWS_REGION || 'us-east-1',
mode: process.env.NODE_ENV === 'production' ? 'prod' : 'dev',
staticQueueUrl: process.env.SQS_QUEUE_URL,
snsTopicArn: process.env.SNS_TOPIC_ARN,
checkOwnership: async (payload) => {
// Production: process all events
if (process.env.NODE_ENV === 'production') {
return true;
}
// Development: only process events for this developer
return payload.devUserId === process.env.DEV_USER_ID;
}
});
// Register handlers
consumer.on('user.created', handleUserCreated);
consumer.on('user.updated', handleUserUpdated);
consumer.on('order.completed', handleOrderCompleted);
// Start consuming
await consumer.start();
console.log('Event bus consumer started');
// Graceful shutdown
process.on('SIGINT', async () => {
console.log('Shutting down...');
await consumer.stop();
process.exit(0);
});
process.on('SIGTERM', async () => {
console.log('Shutting down...');
await consumer.stop();
process.exit(0);
});
}
async function handleUserCreated(event) {
// event is a JSON object: { userId: '123', name: 'John', email: '[email protected]' }
console.log('User created:', event);
// Your business logic here
}
async function handleUserUpdated(event) {
// event is a JSON object: { userId: '123', name: 'John Updated', email: '[email protected]' }
console.log('User updated:', event);
// Your business logic here
}
async function handleOrderCompleted(event) {
// event is a JSON object: { orderId: '456', total: 99.99, status: 'completed' }
console.log('Order completed:', event);
// Your business logic here
}
main().catch(console.error);License
MIT
