nestjs-event-outbox
v1.0.1
Published
Production-ready event outbox pattern implementation for NestJS with PostgreSQL logical decoding support
Maintainers
Readme
nestjs-event-outbox
A production-ready event outbox pattern implementation for NestJS applications with PostgreSQL logical decoding support. This package ensures reliable event delivery and maintains transactional consistency between your application state and event publishing.
🚀 Features
- Event Outbox Pattern: Transactionally safe event publishing
- PostgreSQL Logical Decoding: Real-time event processing without polling
- Database Agnostic: Support for Sequelize and TypeORM
- NestJS Integration: First-class NestJS module with decorators
- Distributed Processing: Built-in locking for multi-instance deployments
- Retry Logic: Automatic retry with exponential backoff
- Monitoring: Built-in metrics and health checks
- Setup Scripts: Automated PostgreSQL logical replication setup
📦 Installation
npm install nestjs-event-outboxPeer Dependencies
# Choose your ORM
npm install sequelize sequelize-typescript pg
# OR
npm install typeorm pg
# Required for NestJS
npm install @nestjs/common @nestjs/core @nestjs/schedule🔧 Quick Start
1. Setup PostgreSQL Logical Replication
# Run the setup script
npx nestjs-event-outbox setup
# Check status
npx nestjs-event-outbox status
# Cleanup (if needed)
npx nestjs-event-outbox cleanup2. Configure Your Module
Using Sequelize
import { Module } from '@nestjs/common';
import { EventsOutboxModule, createSequelizeAdapter } from 'nestjs-event-outbox';
@Module({
imports: [
EventsOutboxModule.forRoot({
database: {
adapter: 'sequelize',
config: {
sequelize: yourSequelizeInstance,
},
},
dispatcher: {
enabled: true,
intervalMs: 5000,
},
eventBus: {
logicalDecoding: {
enabled: true,
connection: {
host: 'localhost',
port: 5432,
database: 'your_db',
user: 'your_user',
password: 'your_password',
},
slotName: 'outbox_events_slot',
publicationName: 'outbox_events_pub',
},
},
}),
],
})
export class AppModule {}Using TypeORM
import { Module } from '@nestjs/common';
import { EventsOutboxModule, createTypeORMAdapter } from 'nestjs-event-outbox';
@Module({
imports: [
EventsOutboxModule.forRoot({
database: {
adapter: 'typeorm',
config: {
dataSource: yourDataSource,
},
},
dispatcher: {
enabled: true,
intervalMs: 5000,
},
eventBus: {
logicalDecoding: {
enabled: true,
connection: {
host: 'localhost',
port: 5432,
database: 'your_db',
user: 'your_user',
password: 'your_password',
},
},
},
}),
],
})
export class AppModule {}3. Publish Events
import { Injectable } from '@nestjs/common';
import { EventOutboxService } from 'nestjs-event-outbox';
@Injectable()
export class UserService {
constructor(private readonly eventService: EventOutboxService) {}
async createUser(userData: CreateUserDto) {
// Your business logic
const user = await this.userRepository.save(userData);
// Publish event transactionally
await this.eventService.publishEvent('user.created', {
userId: user.id,
email: user.email,
createdAt: user.createdAt,
});
return user;
}
}4. Handle Events
import { Injectable } from '@nestjs/common';
import { OnOutboxEvent } from 'nestjs-event-outbox';
@Injectable()
export class NotificationService {
@OnOutboxEvent('user.created')
async handleUserCreated(payload: any) {
console.log('New user created:', payload);
// Send welcome email, update analytics, etc.
}
@OnOutboxEvent('user.updated')
async handleUserUpdated(payload: any) {
console.log('User updated:', payload);
// Handle user update logic
}
}🛠 Configuration
Environment Variables
# Database Configuration
DB_HOST=localhost
DB_PORT=5432
DB_NAME=your_database
DB_USERNAME=your_user
DB_PASSWORD=your_password
# Logical Replication
OUTBOX_LOGICAL_DECODING_ENABLED=true
OUTBOX_REPLICATION_SLOT_NAME=outbox_events_slot
OUTBOX_PUBLICATION_NAME=outbox_events_pub
OUTBOX_TABLE_NAME=events_outbox
# Dispatcher Configuration
OUTBOX_DISPATCHER_ENABLED=true
OUTBOX_DISPATCHER_INTERVAL_MS=5000
OUTBOX_DISPATCHER_BATCH_SIZE=100
OUTBOX_DISPATCHER_LOCK_TIMEOUT_MS=30000Async Configuration
import { ConfigModule, ConfigService } from '@nestjs/config';
@Module({
imports: [
EventsOutboxModule.forRootAsync({
imports: [ConfigModule],
useFactory: async (configService: ConfigService) => ({
database: {
adapter: 'sequelize',
config: {
sequelize: createSequelizeInstance(configService),
},
},
dispatcher: {
enabled: configService.get('OUTBOX_DISPATCHER_ENABLED', true),
intervalMs: configService.get('OUTBOX_DISPATCHER_INTERVAL_MS', 5000),
},
eventBus: {
logicalDecoding: {
enabled: configService.get('OUTBOX_LOGICAL_DECODING_ENABLED', false),
connection: {
host: configService.get('DB_HOST'),
port: configService.get('DB_PORT'),
database: configService.get('DB_NAME'),
user: configService.get('DB_USERNAME'),
password: configService.get('DB_PASSWORD'),
},
},
},
}),
inject: [ConfigService],
}),
],
})
export class AppModule {}📊 Monitoring
Health Checks
import { Injectable } from '@nestjs/common';
import { EventOutboxService } from 'nestjs-event-outbox';
@Injectable()
export class HealthService {
constructor(private readonly eventService: EventOutboxService) {}
async checkEventOutbox() {
const metrics = await this.eventService.getEventMetrics();
return {
status: metrics.failed > 100 ? 'unhealthy' : 'healthy',
metrics,
};
}
}Metrics
const metrics = await eventService.getEventMetrics();
console.log({
pending: metrics.pending, // Events waiting to be processed
processing: metrics.processing, // Events currently being processed
completed: metrics.completed, // Successfully processed events
failed: metrics.failed, // Events that failed processing
});🔄 Event Processing Modes
1. Logical Decoding (Recommended)
Real-time event processing using PostgreSQL's logical replication:
eventBus: {
logicalDecoding: {
enabled: true,
connection: { /* PostgreSQL config */ },
slotName: 'outbox_events_slot',
publicationName: 'outbox_events_pub',
autoAcknowledge: true,
acknowledgeTimeoutSeconds: 30,
},
}2. Polling Mode
Traditional polling-based event processing:
dispatcher: {
enabled: true,
intervalMs: 5000, // Poll every 5 seconds
batchSize: 100, // Process 100 events per batch
lockTimeoutMs: 30000, // Lock timeout for distributed processing
}📚 API Reference
EventOutboxService
publishEvent(topic, payload, options?)
Publishes an event to the outbox for reliable delivery.
await eventService.publishEvent('order.created', {
orderId: '123',
customerId: '456',
total: 99.99,
}, {
maxRetries: 5,
priority: 'high',
});getEventMetrics()
Returns current event processing metrics.
const metrics = await eventService.getEventMetrics();retryFailedEvents(maxRetries?)
Retries failed events by resetting them to pending status.
const retriedCount = await eventService.retryFailedEvents(3);cleanupCompletedEvents(olderThanDays?)
Cleans up old completed events to prevent database bloat.
const deletedCount = await eventService.cleanupCompletedEvents(30);Database Adapters
Sequelize Adapter
import { createSequelizeAdapter } from 'nestjs-event-outbox';
const adapter = createSequelizeAdapter({
sequelize: yourSequelizeInstance,
tableName: 'events_outbox', // optional
});TypeORM Adapter
import { createTypeORMAdapter } from 'nestjs-event-outbox';
const adapter = createTypeORMAdapter({
dataSource: yourDataSource,
// OR for legacy support
connection: yourConnection,
});🚨 PostgreSQL Setup
Prerequisites
- PostgreSQL 10+ with logical replication support
- Configure
postgresql.conf:wal_level = logical max_replication_slots = 4 max_wal_senders = 4 - Restart PostgreSQL service
Manual Setup
If you prefer manual setup over the automated script:
-- Create publication
CREATE PUBLICATION outbox_events_pub FOR TABLE events_outbox;
-- Create replication slot
SELECT pg_create_logical_replication_slot('outbox_events_slot', 'pgoutput');
-- Grant permissions
GRANT REPLICATION ON DATABASE your_db TO your_user;
GRANT SELECT ON events_outbox TO your_user;🔧 Troubleshooting
Common Issues
"logical decoding requires wal_level >= logical"
- Update
postgresql.confwithwal_level = logical - Restart PostgreSQL
- Update
"replication slot already exists"
- Check existing slots:
SELECT * FROM pg_replication_slots; - Use cleanup script:
npx nestjs-event-outbox cleanup
- Check existing slots:
High memory usage
- Monitor replication lag: Use provided monitoring queries
- Ensure your application is consuming events regularly
Events not being processed
- Check if dispatcher is enabled
- Verify logical decoding configuration
- Check database locks and connection issues
Monitoring Queries
-- Check replication slot status
SELECT slot_name, active, restart_lsn, confirmed_flush_lsn,
pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) as lag
FROM pg_replication_slots
WHERE slot_name = 'outbox_events_slot';
-- Check WAL disk usage
SELECT pg_size_pretty(sum(size)) as wal_size FROM pg_ls_waldir();
-- Monitor event processing
SELECT status, COUNT(*) FROM events_outbox GROUP BY status;📄 License
MIT License - see LICENSE file for details.
🤝 Contributing
Contributions are welcome! Please read our contributing guidelines and submit pull requests to our repository.
