@cisstech/nestjs-pg-pubsub
v1.15.0
Published
A NestJS module to provide PostgreSQL PubSub
Downloads
2,699
Readme
@cisstech/nestjs-pg-pubsub
A NestJS module for real-time PostgreSQL change data capture using triggers and a persistent queue.
What It Does
PostgreSQL triggers detect INSERTs, UPDATEs and DELETEs on your tables. Changes are persisted in a queue table, then pulled and dispatched to typed NestJS listeners. No polling-only design, no lost messages, no external broker.

Why This Exists
PostgreSQL's LISTEN/NOTIFY is great for real-time but unreliable: notifications are fire-and-forget, lost during disconnects, and have a payload size limit. This library wraps it with a durable queue so you get both reactivity and guaranteed delivery.
Key Properties
- Durable: changes are persisted in a SQL table before notification, surviving crashes and restarts
- Reactive:
LISTEN/NOTIFYtriggers immediate processing, with a fallback poller as safety net - Ordered: messages are processed by ID, in batches, with
SELECT FOR UPDATE SKIP LOCKED - Retry with backoff: failed messages are retried with exponential backoff ($2^{n}$ minutes)
- Backpressure: concurrent notifications are coalesced so that at most one pull cycle runs at a time
- Isolated pool: uses its own
pg.Pool, independent of TypeORM, to avoid pool contention - Zero-downtime DDL: triggers are fingerprinted (MD5) and only recreated when config changes
Installation
yarn add @cisstech/nestjs-pg-pubsub pgSupports NestJS v10+ and v11+.
Quick Start
// app.module.ts
@Module({
imports: [
TypeOrmModule.forRoot({
/* ... */
}),
PgPubSubModule.forRoot({
databaseUrl: process.env.DATABASE_URL,
}),
],
providers: [UserChangeListener],
})
export class AppModule {}// user-change.listener.ts
@Injectable()
@RegisterPgTableChangeListener(User)
export class UserChangeListener implements PgTableChangeListener<User> {
async process(changes: PgTableChanges<User>, ctx: PgTableChangeContext): Promise<void> {
for (const insert of changes.INSERT) {
console.log(`New user: ${insert.data.email}`)
}
for (const update of changes.UPDATE) {
console.log(`Updated fields: ${update.data.updatedFields.join(', ')}`)
}
}
}That's it. The library auto-creates triggers, the queue table, and starts listening.
How It Works
- A PostgreSQL trigger fires on table change and inserts a row into
pg_pubsub_queue - The trigger sends a
NOTIFYwith the channel name - The library receives the notification and pulls pending messages from the queue
- Messages are dispatched to the matching
@RegisterPgTableChangeListenerclasses - Processed messages are marked as such; failed ones are retried with exponential backoff
- A background poller runs every 60s as a safety net for missed notifications
Important Constraints
- Listeners must be fast. A slow listener delays the entire batch for that table. Offload heavy work to a queue (Bull, etc.) and just enqueue from the listener.
- Use
TransactionAdapterfor transactional writes. If a listener needs to write to the DB inside a transaction, configure atransactionAdapterand mark the listener with@RegisterPgTableChangeListener(Entity, { transactional: true }). The library wraps the listener call in the adapter, passing an opaque transaction token viactx.transaction. Without the adapter, usectx.onErrorto signal failures.
Configuration
All options are optional except databaseUrl. See the full configuration reference for details.
Key tuning knobs:
| Option | Default | What it controls |
| ------------------------- | ------- | --------------------------------------------------------------------- |
| queue.batchSize | 100 | Max messages fetched per pull cycle |
| queue.drainInterval | 50ms | Pause between drain loop iterations (DB breathing room) |
| queue.processingTimeout | 5min | After this, a processing message is considered orphaned and retried |
| queue.concurrency | 5 | Max listeners executing in parallel per batch |
| transactionAdapter | - | ORM-agnostic adapter for wrapping listeners in transactions |
| pool.max | 5 | Connections in the dedicated pg-pubsub pool |
Documentation
Full documentation: https://cisstech.github.io/nestkit/docs/nestjs-pg-pubsub/getting-started
License
MIT © Mamadou Cisse
