npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

nestjs-pgmq

v0.0.3

Published

Postgres Message Queue (PGMQ) module for NestJS

Readme

Postgres Message Queue for NestJS.

npm licence

📖 Introduction

nestjs-pgmq is a robust module that integrates PGMQ (Postgres Message Queue) into the NestJS ecosystem. It allows you to build distributed, background processing systems using your existing PostgreSQL database, eliminating the need for additional infrastructure like Redis.

It provides a Developer Experience (DX) heavily inspired by @nestjs/bull, making migration easy.

Key Features

  • 🏗 Zero Infrastructure Overhead: Uses your existing Postgres instance.

  • 🦄 Bull-like API: Familiar decorators @Processor, @Process, and @InjectQueue.

  • 🛡 Transactional Safety: Support for Transactional Outbox pattern (WIP).

  • ⚡ High Performance: Uses SKIP LOCKED for concurrent job processing.

  • 🔍 Observability: Automatic injection of correlationId, producerId, and timestamps.

  • 💀 Dead Letter Queues (DLQ): Automatic handling of failed jobs with full stack traces.

  • 🛑 Graceful Shutdown: Ensures no jobs are lost during deployment/restart.


🔌 Prerequisites

This library requires a PostgreSQL database with the pgmq extension installed.

Using Docker (Recommended):

docker run -d --name pgmq -p 5432:5432 -e POSTGRES_PASSWORD=postgres ghcr.io/pgmq/pg18-pgmq:v1.7.0

Manual Installation:

CREATE EXTENSION IF NOT EXISTS pgmq CASCADE;

📦 Installation

pnpm add nestjs-pgmq pg

# or

npm install nestjs-pgmq pg

🚀 Quick Start

1. Register the Module

Import PgmqModule in your root AppModule. You can configure the connection asynchronously.


// src/app.module.ts
import {Module} from '@nestjs/common';
import {PgmqModule} from 'nestjs-pgmq';

@Module({
  imports: [
    // 1. Configure the global connection
    PgmqModule.forRootAsync({
      useFactory: () => ({
        connectionString: 'postgres://postgres:postgres@localhost:5432/db',
      }),
    }),

    // 2. Register a queue
    PgmqModule.registerQueue({
      name: 'notifications',
    }),
  ],
})
export class AppModule {
}

2. Create a Processor (Consumer)

Define a class to handle jobs. Use @Processor for the queue name and @Process for specific job names.

// src/notifications.processor.ts
import {Processor, Process} from 'nestjs-pgmq';
import {Job} from 'nestjs-pgmq';

@Processor('notifications')
export class NotificationsProcessor {

  @Process('send-email')
  async handleEmail(job: Job<{ email: string; body: string }>) {
    console.log(`Sending email to ${job.data.email}...`);

    // Perform your logic here.
    // If it throws an error, the job will be retried.
    // If it succeeds, the job is archived.

  }
}

3. Inject Queue & Add Jobs (Producer)

Inject the queue into your service to dispatch jobs.

// src/users.service.ts
import {Injectable} from '@nestjs/common';
import {InjectQueue, PgmqQueue} from 'nestjs-pgmq';

@Injectable()
export class UsersService {
  constructor(@InjectQueue('notifications') private readonly queue: PgmqQueue) {
  }

  async registerUser(email: string) {
    // Add a job to the queue
    await this.queue.add('send-email', {
      email,
      body: 'Welcome to our platform!',
    });
  }
}

⚙️ Configuration & Features

Metadata & Observability (Automatic Headers)

Every message sent via nestjs-pgmq is automatically enriched with metadata headers to help with debugging and tracing in distributed systems.

You don't need to do anything; the library automatically adds:

  • correlationId: Unique Trace ID (UUID).

  • messageId: Unique Message ID.

  • producerId: Hostname and PID of the sender.

  • appVersion: Application version from package.json.

  • createdAt: Timestamp.

You can also pass custom headers:

    await this.queue.add(
      'process-order',
      {orderId: 123},
      {correlationId: 'req-abc-123'} // Override correlationId to match HTTP Request
    );

Error Handling & Dead Letter Queue (DLQ)

The module implements a robust "Envelope Pattern" for error handling.

  1. Retries: If your processor throws an error, the job remains in the queue and becomes visible again after the Visibility Timeout (default: 30s).

  2. Max Retries: If a job fails more than 5 times (configurable), it is moved to a DLQ.

  3. DLQ Format: The failed job is moved to a queue named <queue_name>_dlq. The payload is wrapped in an envelope containing the Exception Message and Stack Trace.

Example DLQ Message:

{
  "headers": {
    "errorType": "Error",
    "errorMessage": "Connection timeout",
    "stackTrace": "Error: Connection timeout\n at EmailService.send...",
    "retryCount": 5,
    "failedAt": "2023-10-25T12:00:00Z",
    "originalQueue": "notifications"
  },
  "body": {
    "jobName": "send-email",
    "data": {
      "email": "[email protected]"
    }
  }
}

Concurrency

The worker uses intelligent polling with SKIP LOCKED. By default, it processes jobs in batches to maximize throughput while ensuring safety.

🤝 Contributing

This project is a Monorepo managed with pnpm.

  1. Clone the repo.

  2. Install dependencies: pnpm install.

  3. Run the example app:

cd examples/basic-app
pnpm start:dev
  1. Watch library changes:
cd packages/nestjs-pgmq
pnpm build --watch

✨ Contributors

Thanks goes to these incredible people:

📝 License

This project is MIT licensed.