@cisstech/nestjs-pg-pubsub
v1.14.0
Published
A NestJS module to provide PostgreSQL PubSub
Readme
@cisstech/nestjs-pg-pubsub
A NestJS module for real-time PostgreSQL notifications using PubSub
Overview
The NestJS PG-PubSub library is a powerful tool that facilitates real-time communication between your NestJS application and PostgreSQL database using the native PostgreSQL Pub/Sub mechanism. It allows your application to listen for changes on specific database tables and respond to those changes in real-time, making it ideal for building reactive applications with immediate data synchronization and event-driven workflows.

Features
- Real-Time Table Change Detection: Automatically listen for INSERT, UPDATE, and DELETE events on PostgreSQL tables
- Decorator-Based Configuration: Use intuitive decorators to register table change listeners
- Automatic Trigger Management: Dynamically creates and manages PostgreSQL triggers
- Event Buffering and Batching: Optimizes performance by buffering and batching events
- Entity Mapping: Maps database column names to entity property names automatically
- Persistent Message Queue: Messages are stored in a PostgreSQL table to prevent data loss
- Reactive Processing: Immediately pulls and processes messages when notifications are received
- TTL and Retry System: Implements time-to-live and automatic retries for failed message processing
- Queue Metadata: Exposes retry count and creation timestamp to prevent stale data operations
- Message Ordering: Preserves message processing order using row IDs
- Error Handling: Provides mechanisms to handle and retry failed messages
- Auto Cleanup: Automatically removes old processed messages to keep the queue size manageable
- Multiple Subscribers: Leverages PostgreSQL's native Pub/Sub to allow multiple instances of your application to subscribe to the same database events
- Fallback Reliability: Includes low-frequency background polling to ensure no messages are missed
- Dedicated Connection Pool: Uses its own
pg.Pool, independent of TypeORM, to avoid pool contention - Backpressure: Concurrent notifications are coalesced so at most one pull cycle runs at a time
- Smart Trigger Management: Triggers are only recreated when their configuration actually changes
Installation
yarn add @cisstech/nestjs-pg-pubsubpg (node-postgres) is a peer dependency - install it if you don't already have it:
yarn add pgNestJS Version Compatibility
This library supports both NestJS v10 and NestJS v11:
- ✅ NestJS v10.x
- ✅ NestJS v11.x
Usage
1. Register the Module
// app.module.ts
import { Module } from '@nestjs/common'
import { TypeOrmModule } from '@nestjs/typeorm'
import { PgPubSubModule } from '@cisstech/nestjs-pg-pubsub'
import { UserTableChangeListener } from './user-change.listener'
@Module({
imports: [
TypeOrmModule.forRoot({
/* your TypeORM config */
}),
PgPubSubModule.forRoot({
databaseUrl: 'postgresql://user:password@localhost:5432/dbname',
// Optional: SSL configuration for secure connections
ssl: {
rejectUnauthorized: false,
},
// Optional: dedicated pool config (independent of TypeORM)
pool: {
max: 2, // default
},
// Optional queue configuration
queue: {
maxRetries: 5,
messageTTL: 24 * 60 * 60 * 1000, // 24 hours
cleanupInterval: 60 * 60 * 1000, // 1 hour
batchSize: 100, // max messages fetched per pull cycle
table: 'pg_pubsub_queue',
},
}),
],
providers: [UserTableChangeListener],
})
export class AppModule {}2. Create Table Change Listeners
Create a class that implements the PgTableChangeListener<T> interface and decorate it with @RegisterPgTableChangeListener:
import { Injectable } from '@nestjs/common'
import {
RegisterPgTableChangeListener,
PgTableChangeListener,
PgTableChanges,
PgTableChangeErrorHandler,
} from '@cisstech/nestjs-pg-pubsub'
import { User } from './entities/user.entity'
@Injectable()
@RegisterPgTableChangeListener(User, {
events: ['INSERT', 'UPDATE'], // Optional: specify which events to listen for
payloadFields: ['id', 'email'], // Optional: specify which fields to include in the payload
})
export class UserTableChangeListener implements PgTableChangeListener<User> {
async process(changes: PgTableChanges<User>, onError?: PgTableChangeErrorHandler): Promise<void> {
try {
// Handle table changes here
// Process all changes
changes.all.forEach((change) => {
console.log(`Change type: ${change.event} for user with id: ${change.data.id}`)
// Access metadata for retry information
if (change._metadata) {
console.log(`Retry count: ${change._metadata.retry_count}`)
console.log(`Created at: ${change._metadata.created_at}`)
// Perform different operations on retry events
if (change._metadata.retry_count >= 1) {
console.log('This is a retry - consider fetching latest data from DB')
}
}
})
// Process inserts
changes.INSERT.forEach((insert) => {
console.log(`New user created: ${insert.data.email}`)
})
// Process updates
changes.UPDATE.forEach((update) => {
console.log(`User updated: ${update.data.new.email} (was: ${update.data.old.email})`)
console.log(`Updated fields: ${update.data.updatedFields.join(', ')}`)
})
// Process deletes
changes.DELETE.forEach((deletion) => {
console.log(`User deleted: ${deletion.data.email}`)
})
} catch {
// If processing fails, mark all messages as failed for retry
if (onError) {
onError(changes.all.map((change) => change.id))
}
}
}
}3. Subscribe to Custom Events
You can also subscribe to custom PostgreSQL notification events:
import { Injectable, OnModuleInit } from '@nestjs/common'
import { PgPubSubService } from '@cisstech/nestjs-pg-pubsub'
@Injectable()
export class CustomEventService implements OnModuleInit {
constructor(private readonly pgPubSubService: PgPubSubService) {}
async onModuleInit(): Promise<void> {
await this.pgPubSubService.susbcribe<number>('custom-event', (payload) => {
console.log(`Received notification:`, payload)
})
}
}4. Publishing Custom Events from PostgreSQL
You can publish custom events from PostgreSQL by inserting into the queue table:
-- Example trigger function that publishes a custom event
CREATE OR REPLACE FUNCTION notify_custom_event() RETURNS TRIGGER AS $$
BEGIN
PERFORM pg_notify('custom-event', 'Hello world');
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER custom_event_trigger
AFTER INSERT ON users
FOR EACH ROW
EXECUTE PROCEDURE notify_custom_event();How It Works
Queue-Based Architecture with Reactive Processing
This library implements a hybrid queue-based approach to ensure reliable and efficient message processing:
Message Storage: When a database change occurs, a trigger function:
- Generates a payload with change details
- Inserts the payload into a queue table
- Sends a notification with just the message ID
Message Processing:
- The application listens for notifications and immediately pulls messages when notified
- Messages are processed in order using
SELECT FOR UPDATE SKIP LOCKED - A low-frequency fallback polling mechanism ensures no messages are missed
- Successful processing marks messages as processed
Reliability Features:
- Message persistence: All changes are stored in the database
- Retry mechanism: Failed messages are retried with exponential backoff
- TTL management: Old messages are automatically cleaned up
- Ordering: Messages are processed in the order they were created
This approach combines the best of both worlds: reactive performance and reliable delivery.
Connection Pool
All SQL operations (queue reads/writes, advisory locks, trigger DDL) go through a dedicated pg.Pool that is completely independent of TypeORM's connection pool. This means pg-pubsub keeps working even if your TypeORM pool is fully occupied by application queries.
The pool defaults to 2 connections, which is enough for normal operation. Adjust via pool.max if needed.
Backpressure
When PostgreSQL sends notifications faster than the application can process them, concurrent pull requests are coalesced: only one fetch cycle runs at a time, and any notifications received during processing trigger a single re-fetch once the current cycle completes. This prevents unbounded concurrent processing without dropping messages.
Trigger Change Detection
On startup, the library computes an MD5 hash of each trigger's configuration (events, fields, table, schema) and compares it to the hash stored in the pg_pubsub_trigger_meta metadata table. Triggers are only recreated when their configuration actually changes, avoiding unnecessary DDL on every restart.
Disabling Triggers for Bulk Operations
When performing bulk operations like cascade deletes, use withTriggersDisabled to prevent notifications:
// With TypeORM EntityManager
await pgPubSubService.withTriggersDisabled(async (em) => {
await em.getRepository(Customer).delete(customerId)
})
// With raw SQL (uses dedicated pg pool)
await pgPubSubService.withTriggersDisabledRaw(async (query) => {
await query('DELETE FROM customers WHERE id = $1', [customerId])
})Documentation
For detailed documentation, examples, and advanced usage, please refer to the official documentation at https://cisstech.github.io/nestkit/docs/nestjs-pg-pubsub/getting-started
License
MIT © Mamadou Cisse
