npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

@cisstech/nestjs-pg-pubsub

v1.14.0

Published

A NestJS module to provide PostgreSQL PubSub

Readme

@cisstech/nestjs-pg-pubsub

A NestJS module for real-time PostgreSQL notifications using PubSub

CI codecov codefactor GitHub Tag npm package NPM downloads licence code style: prettier

Overview

The NestJS PG-PubSub library is a powerful tool that facilitates real-time communication between your NestJS application and PostgreSQL database using the native PostgreSQL Pub/Sub mechanism. It allows your application to listen for changes on specific database tables and respond to those changes in real-time, making it ideal for building reactive applications with immediate data synchronization and event-driven workflows.

Diagram

Features

  • Real-Time Table Change Detection: Automatically listen for INSERT, UPDATE, and DELETE events on PostgreSQL tables
  • Decorator-Based Configuration: Use intuitive decorators to register table change listeners
  • Automatic Trigger Management: Dynamically creates and manages PostgreSQL triggers
  • Event Buffering and Batching: Optimizes performance by buffering and batching events
  • Entity Mapping: Maps database column names to entity property names automatically
  • Persistent Message Queue: Messages are stored in a PostgreSQL table to prevent data loss
  • Reactive Processing: Immediately pulls and processes messages when notifications are received
  • TTL and Retry System: Implements time-to-live and automatic retries for failed message processing
  • Queue Metadata: Exposes retry count and creation timestamp to prevent stale data operations
  • Message Ordering: Preserves message processing order using row IDs
  • Error Handling: Provides mechanisms to handle and retry failed messages
  • Auto Cleanup: Automatically removes old processed messages to keep the queue size manageable
  • Multiple Subscribers: Leverages PostgreSQL's native Pub/Sub to allow multiple instances of your application to subscribe to the same database events
  • Fallback Reliability: Includes low-frequency background polling to ensure no messages are missed
  • Dedicated Connection Pool: Uses its own pg.Pool, independent of TypeORM, to avoid pool contention
  • Backpressure: Concurrent notifications are coalesced so at most one pull cycle runs at a time
  • Smart Trigger Management: Triggers are only recreated when their configuration actually changes

Installation

yarn add @cisstech/nestjs-pg-pubsub

pg (node-postgres) is a peer dependency - install it if you don't already have it:

yarn add pg

NestJS Version Compatibility

This library supports both NestJS v10 and NestJS v11:

  • ✅ NestJS v10.x
  • ✅ NestJS v11.x

Usage

1. Register the Module

// app.module.ts
import { Module } from '@nestjs/common'
import { TypeOrmModule } from '@nestjs/typeorm'
import { PgPubSubModule } from '@cisstech/nestjs-pg-pubsub'
import { UserTableChangeListener } from './user-change.listener'

@Module({
  imports: [
    TypeOrmModule.forRoot({
      /* your TypeORM config */
    }),
    PgPubSubModule.forRoot({
      databaseUrl: 'postgresql://user:password@localhost:5432/dbname',
      // Optional: SSL configuration for secure connections
      ssl: {
        rejectUnauthorized: false,
      },
      // Optional: dedicated pool config (independent of TypeORM)
      pool: {
        max: 2, // default
      },
      // Optional queue configuration
      queue: {
        maxRetries: 5,
        messageTTL: 24 * 60 * 60 * 1000, // 24 hours
        cleanupInterval: 60 * 60 * 1000, // 1 hour
        batchSize: 100, // max messages fetched per pull cycle
        table: 'pg_pubsub_queue',
      },
    }),
  ],
  providers: [UserTableChangeListener],
})
export class AppModule {}

2. Create Table Change Listeners

Create a class that implements the PgTableChangeListener<T> interface and decorate it with @RegisterPgTableChangeListener:

import { Injectable } from '@nestjs/common'
import {
  RegisterPgTableChangeListener,
  PgTableChangeListener,
  PgTableChanges,
  PgTableChangeErrorHandler,
} from '@cisstech/nestjs-pg-pubsub'
import { User } from './entities/user.entity'

@Injectable()
@RegisterPgTableChangeListener(User, {
  events: ['INSERT', 'UPDATE'], // Optional: specify which events to listen for
  payloadFields: ['id', 'email'], // Optional: specify which fields to include in the payload
})
export class UserTableChangeListener implements PgTableChangeListener<User> {
  async process(changes: PgTableChanges<User>, onError?: PgTableChangeErrorHandler): Promise<void> {
    try {
      // Handle table changes here

      // Process all changes
      changes.all.forEach((change) => {
        console.log(`Change type: ${change.event} for user with id: ${change.data.id}`)

        // Access metadata for retry information
        if (change._metadata) {
          console.log(`Retry count: ${change._metadata.retry_count}`)
          console.log(`Created at: ${change._metadata.created_at}`)

          // Perform different operations on retry events
          if (change._metadata.retry_count >= 1) {
            console.log('This is a retry - consider fetching latest data from DB')
          }
        }
      })

      // Process inserts
      changes.INSERT.forEach((insert) => {
        console.log(`New user created: ${insert.data.email}`)
      })

      // Process updates
      changes.UPDATE.forEach((update) => {
        console.log(`User updated: ${update.data.new.email} (was: ${update.data.old.email})`)
        console.log(`Updated fields: ${update.data.updatedFields.join(', ')}`)
      })

      // Process deletes
      changes.DELETE.forEach((deletion) => {
        console.log(`User deleted: ${deletion.data.email}`)
      })
    } catch {
      // If processing fails, mark all messages as failed for retry
      if (onError) {
        onError(changes.all.map((change) => change.id))
      }
    }
  }
}

3. Subscribe to Custom Events

You can also subscribe to custom PostgreSQL notification events:

import { Injectable, OnModuleInit } from '@nestjs/common'
import { PgPubSubService } from '@cisstech/nestjs-pg-pubsub'

@Injectable()
export class CustomEventService implements OnModuleInit {
  constructor(private readonly pgPubSubService: PgPubSubService) {}

  async onModuleInit(): Promise<void> {
    await this.pgPubSubService.susbcribe<number>('custom-event', (payload) => {
      console.log(`Received notification:`, payload)
    })
  }
}

4. Publishing Custom Events from PostgreSQL

You can publish custom events from PostgreSQL by inserting into the queue table:

-- Example trigger function that publishes a custom event
CREATE OR REPLACE FUNCTION notify_custom_event() RETURNS TRIGGER AS $$
BEGIN
  PERFORM pg_notify('custom-event', 'Hello world');
  RETURN NEW;
END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER custom_event_trigger
  AFTER INSERT ON users
  FOR EACH ROW
  EXECUTE PROCEDURE notify_custom_event();

How It Works

Queue-Based Architecture with Reactive Processing

This library implements a hybrid queue-based approach to ensure reliable and efficient message processing:

  1. Message Storage: When a database change occurs, a trigger function:

    • Generates a payload with change details
    • Inserts the payload into a queue table
    • Sends a notification with just the message ID
  2. Message Processing:

    • The application listens for notifications and immediately pulls messages when notified
    • Messages are processed in order using SELECT FOR UPDATE SKIP LOCKED
    • A low-frequency fallback polling mechanism ensures no messages are missed
    • Successful processing marks messages as processed
  3. Reliability Features:

    • Message persistence: All changes are stored in the database
    • Retry mechanism: Failed messages are retried with exponential backoff
    • TTL management: Old messages are automatically cleaned up
    • Ordering: Messages are processed in the order they were created

This approach combines the best of both worlds: reactive performance and reliable delivery.

Connection Pool

All SQL operations (queue reads/writes, advisory locks, trigger DDL) go through a dedicated pg.Pool that is completely independent of TypeORM's connection pool. This means pg-pubsub keeps working even if your TypeORM pool is fully occupied by application queries.

The pool defaults to 2 connections, which is enough for normal operation. Adjust via pool.max if needed.

Backpressure

When PostgreSQL sends notifications faster than the application can process them, concurrent pull requests are coalesced: only one fetch cycle runs at a time, and any notifications received during processing trigger a single re-fetch once the current cycle completes. This prevents unbounded concurrent processing without dropping messages.

Trigger Change Detection

On startup, the library computes an MD5 hash of each trigger's configuration (events, fields, table, schema) and compares it to the hash stored in the pg_pubsub_trigger_meta metadata table. Triggers are only recreated when their configuration actually changes, avoiding unnecessary DDL on every restart.

Disabling Triggers for Bulk Operations

When performing bulk operations like cascade deletes, use withTriggersDisabled to prevent notifications:

// With TypeORM EntityManager
await pgPubSubService.withTriggersDisabled(async (em) => {
  await em.getRepository(Customer).delete(customerId)
})

// With raw SQL (uses dedicated pg pool)
await pgPubSubService.withTriggersDisabledRaw(async (query) => {
  await query('DELETE FROM customers WHERE id = $1', [customerId])
})

Documentation

For detailed documentation, examples, and advanced usage, please refer to the official documentation at https://cisstech.github.io/nestkit/docs/nestjs-pg-pubsub/getting-started

License

MIT © Mamadou Cisse