npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2025 – Pkg Stats / Ryan Hefner

nestjs-event-outbox

v1.0.1

Published

Production-ready event outbox pattern implementation for NestJS with PostgreSQL logical decoding support

Readme

nestjs-event-outbox

A production-ready event outbox pattern implementation for NestJS applications with PostgreSQL logical decoding support. This package ensures reliable event delivery and maintains transactional consistency between your application state and event publishing.

🚀 Features

  • Event Outbox Pattern: Transactionally safe event publishing
  • PostgreSQL Logical Decoding: Real-time event processing without polling
  • Database Agnostic: Support for Sequelize and TypeORM
  • NestJS Integration: First-class NestJS module with decorators
  • Distributed Processing: Built-in locking for multi-instance deployments
  • Retry Logic: Automatic retry with exponential backoff
  • Monitoring: Built-in metrics and health checks
  • Setup Scripts: Automated PostgreSQL logical replication setup

📦 Installation

npm install nestjs-event-outbox

Peer Dependencies

# Choose your ORM
npm install sequelize sequelize-typescript pg
# OR
npm install typeorm pg

# Required for NestJS
npm install @nestjs/common @nestjs/core @nestjs/schedule

🔧 Quick Start

1. Setup PostgreSQL Logical Replication

# Run the setup script
npx nestjs-event-outbox setup

# Check status
npx nestjs-event-outbox status

# Cleanup (if needed)
npx nestjs-event-outbox cleanup

2. Configure Your Module

Using Sequelize

import { Module } from '@nestjs/common';
import { EventsOutboxModule, createSequelizeAdapter } from 'nestjs-event-outbox';

@Module({
  imports: [
    EventsOutboxModule.forRoot({
      database: {
        adapter: 'sequelize',
        config: {
          sequelize: yourSequelizeInstance,
        },
      },
      dispatcher: {
        enabled: true,
        intervalMs: 5000,
      },
      eventBus: {
        logicalDecoding: {
          enabled: true,
          connection: {
            host: 'localhost',
            port: 5432,
            database: 'your_db',
            user: 'your_user',
            password: 'your_password',
          },
          slotName: 'outbox_events_slot',
          publicationName: 'outbox_events_pub',
        },
      },
    }),
  ],
})
export class AppModule {}

Using TypeORM

import { Module } from '@nestjs/common';
import { EventsOutboxModule, createTypeORMAdapter } from 'nestjs-event-outbox';

@Module({
  imports: [
    EventsOutboxModule.forRoot({
      database: {
        adapter: 'typeorm',
        config: {
          dataSource: yourDataSource,
        },
      },
      dispatcher: {
        enabled: true,
        intervalMs: 5000,
      },
      eventBus: {
        logicalDecoding: {
          enabled: true,
          connection: {
            host: 'localhost',
            port: 5432,
            database: 'your_db',
            user: 'your_user',
            password: 'your_password',
          },
        },
      },
    }),
  ],
})
export class AppModule {}

3. Publish Events

import { Injectable } from '@nestjs/common';
import { EventOutboxService } from 'nestjs-event-outbox';

@Injectable()
export class UserService {
  constructor(private readonly eventService: EventOutboxService) {}

  async createUser(userData: CreateUserDto) {
    // Your business logic
    const user = await this.userRepository.save(userData);

    // Publish event transactionally
    await this.eventService.publishEvent('user.created', {
      userId: user.id,
      email: user.email,
      createdAt: user.createdAt,
    });

    return user;
  }
}

4. Handle Events

import { Injectable } from '@nestjs/common';
import { OnOutboxEvent } from 'nestjs-event-outbox';

@Injectable()
export class NotificationService {
  @OnOutboxEvent('user.created')
  async handleUserCreated(payload: any) {
    console.log('New user created:', payload);
    // Send welcome email, update analytics, etc.
  }

  @OnOutboxEvent('user.updated')
  async handleUserUpdated(payload: any) {
    console.log('User updated:', payload);
    // Handle user update logic
  }
}

🛠 Configuration

Environment Variables

# Database Configuration
DB_HOST=localhost
DB_PORT=5432
DB_NAME=your_database
DB_USERNAME=your_user
DB_PASSWORD=your_password

# Logical Replication
OUTBOX_LOGICAL_DECODING_ENABLED=true
OUTBOX_REPLICATION_SLOT_NAME=outbox_events_slot
OUTBOX_PUBLICATION_NAME=outbox_events_pub
OUTBOX_TABLE_NAME=events_outbox

# Dispatcher Configuration
OUTBOX_DISPATCHER_ENABLED=true
OUTBOX_DISPATCHER_INTERVAL_MS=5000
OUTBOX_DISPATCHER_BATCH_SIZE=100
OUTBOX_DISPATCHER_LOCK_TIMEOUT_MS=30000

Async Configuration

import { ConfigModule, ConfigService } from '@nestjs/config';

@Module({
  imports: [
    EventsOutboxModule.forRootAsync({
      imports: [ConfigModule],
      useFactory: async (configService: ConfigService) => ({
        database: {
          adapter: 'sequelize',
          config: {
            sequelize: createSequelizeInstance(configService),
          },
        },
        dispatcher: {
          enabled: configService.get('OUTBOX_DISPATCHER_ENABLED', true),
          intervalMs: configService.get('OUTBOX_DISPATCHER_INTERVAL_MS', 5000),
        },
        eventBus: {
          logicalDecoding: {
            enabled: configService.get('OUTBOX_LOGICAL_DECODING_ENABLED', false),
            connection: {
              host: configService.get('DB_HOST'),
              port: configService.get('DB_PORT'),
              database: configService.get('DB_NAME'),
              user: configService.get('DB_USERNAME'),
              password: configService.get('DB_PASSWORD'),
            },
          },
        },
      }),
      inject: [ConfigService],
    }),
  ],
})
export class AppModule {}

📊 Monitoring

Health Checks

import { Injectable } from '@nestjs/common';
import { EventOutboxService } from 'nestjs-event-outbox';

@Injectable()
export class HealthService {
  constructor(private readonly eventService: EventOutboxService) {}

  async checkEventOutbox() {
    const metrics = await this.eventService.getEventMetrics();

    return {
      status: metrics.failed > 100 ? 'unhealthy' : 'healthy',
      metrics,
    };
  }
}

Metrics

const metrics = await eventService.getEventMetrics();
console.log({
  pending: metrics.pending,      // Events waiting to be processed
  processing: metrics.processing, // Events currently being processed
  completed: metrics.completed,   // Successfully processed events
  failed: metrics.failed,        // Events that failed processing
});

🔄 Event Processing Modes

1. Logical Decoding (Recommended)

Real-time event processing using PostgreSQL's logical replication:

eventBus: {
  logicalDecoding: {
    enabled: true,
    connection: { /* PostgreSQL config */ },
    slotName: 'outbox_events_slot',
    publicationName: 'outbox_events_pub',
    autoAcknowledge: true,
    acknowledgeTimeoutSeconds: 30,
  },
}

2. Polling Mode

Traditional polling-based event processing:

dispatcher: {
  enabled: true,
  intervalMs: 5000,           // Poll every 5 seconds
  batchSize: 100,             // Process 100 events per batch
  lockTimeoutMs: 30000,       // Lock timeout for distributed processing
}

📚 API Reference

EventOutboxService

publishEvent(topic, payload, options?)

Publishes an event to the outbox for reliable delivery.

await eventService.publishEvent('order.created', {
  orderId: '123',
  customerId: '456',
  total: 99.99,
}, {
  maxRetries: 5,
  priority: 'high',
});

getEventMetrics()

Returns current event processing metrics.

const metrics = await eventService.getEventMetrics();

retryFailedEvents(maxRetries?)

Retries failed events by resetting them to pending status.

const retriedCount = await eventService.retryFailedEvents(3);

cleanupCompletedEvents(olderThanDays?)

Cleans up old completed events to prevent database bloat.

const deletedCount = await eventService.cleanupCompletedEvents(30);

Database Adapters

Sequelize Adapter

import { createSequelizeAdapter } from 'nestjs-event-outbox';

const adapter = createSequelizeAdapter({
  sequelize: yourSequelizeInstance,
  tableName: 'events_outbox', // optional
});

TypeORM Adapter

import { createTypeORMAdapter } from 'nestjs-event-outbox';

const adapter = createTypeORMAdapter({
  dataSource: yourDataSource,
  // OR for legacy support
  connection: yourConnection,
});

🚨 PostgreSQL Setup

Prerequisites

  1. PostgreSQL 10+ with logical replication support
  2. Configure postgresql.conf:
    wal_level = logical
    max_replication_slots = 4
    max_wal_senders = 4
  3. Restart PostgreSQL service

Manual Setup

If you prefer manual setup over the automated script:

-- Create publication
CREATE PUBLICATION outbox_events_pub FOR TABLE events_outbox;

-- Create replication slot
SELECT pg_create_logical_replication_slot('outbox_events_slot', 'pgoutput');

-- Grant permissions
GRANT REPLICATION ON DATABASE your_db TO your_user;
GRANT SELECT ON events_outbox TO your_user;

🔧 Troubleshooting

Common Issues

  1. "logical decoding requires wal_level >= logical"

    • Update postgresql.conf with wal_level = logical
    • Restart PostgreSQL
  2. "replication slot already exists"

    • Check existing slots: SELECT * FROM pg_replication_slots;
    • Use cleanup script: npx nestjs-event-outbox cleanup
  3. High memory usage

    • Monitor replication lag: Use provided monitoring queries
    • Ensure your application is consuming events regularly
  4. Events not being processed

    • Check if dispatcher is enabled
    • Verify logical decoding configuration
    • Check database locks and connection issues

Monitoring Queries

-- Check replication slot status
SELECT slot_name, active, restart_lsn, confirmed_flush_lsn,
       pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) as lag
FROM pg_replication_slots
WHERE slot_name = 'outbox_events_slot';

-- Check WAL disk usage
SELECT pg_size_pretty(sum(size)) as wal_size FROM pg_ls_waldir();

-- Monitor event processing
SELECT status, COUNT(*) FROM events_outbox GROUP BY status;

📄 License

MIT License - see LICENSE file for details.

🤝 Contributing

Contributions are welcome! Please read our contributing guidelines and submit pull requests to our repository.

📞 Support