@c11/pgmq-nestjs
v1.0.2
Published
NestJS microservice transport using [pgmq](https://github.com/tembo-io/pgmq) (PostgreSQL message queues). Exposes a custom transport strategy so you can run a hybrid app with `connectMicroservice({ strategy, options: {} })`.
Readme
@c11/pgmq-nestjs
NestJS microservice transport using pgmq (PostgreSQL message queues). Exposes a custom transport strategy so you can run a hybrid app with connectMicroservice({ strategy, options: {} }).
Install
npm install @c11/pgmq-nestjs pg
# or
yarn add @c11/pgmq-nestjs pg
# or
pnpm add @c11/pgmq-nestjs pgPeer dependency (optional): Install typeorm if you use PgMqTransportStrategy.fromDataSource() in forRootAsync:
npm install typeormRequirements:
- Node.js 18+
- PostgreSQL with the pgmq extension installed (create extension in your database if needed).
Usage
1. Register the module
Synchronous config (forRoot):
// app.module.ts
import { Module } from "@nestjs/common";
import { PgmqModule } from "@c11/pgmq-nestjs";
@Module({
imports: [
PgmqModule.forRoot({
connection: {
host: "localhost",
port: 5432,
user: "postgres",
password: "postgres",
database: "mydb",
},
queuePrefix: "nestjs_",
}),
],
})
export class AppModule {}Async config (forRootAsync):
import { Module } from "@nestjs/common";
import { ConfigModule, ConfigService } from "@nestjs/config";
import { PgmqModule } from "@c11/pgmq-nestjs";
import { PgMqTransportStrategy } from "@c11/pgmq-nestjs";
@Module({
imports: [
PgmqModule.forRootAsync({
imports: [ConfigModule],
inject: [ConfigService],
useFactory: (config: ConfigService) => {
return new PgMqTransportStrategy({
connection: {
host: config.get("DB_HOST", "localhost"),
port: config.get("DB_PORT", 5432),
user: config.get("DB_USER", "postgres"),
password: config.get("DB_PASSWORD", ""),
database: config.get("DB_NAME", "postgres"),
},
});
},
}),
],
})
export class AppModule {}With TypeORM DataSource (requires typeorm installed):
import { PgmqModule } from "@c11/pgmq-nestjs";
import { PgMqTransportStrategy } from "@c11/pgmq-nestjs";
PgmqModule.forRootAsync({
inject: [DataSource],
useFactory: (ds: DataSource) => PgMqTransportStrategy.fromDataSource(ds, { queuePrefix: "app_" }),
})2. Connect the microservice in bootstrap
// main.ts
import { NestFactory } from "@nestjs/core";
import { AppModule } from "./app.module.js";
import { PGMQ_STRATEGY, CustomStrategy } from "@c11/pgmq-nestjs";
const app = await NestFactory.create(AppModule);
const strategy = app.get(PGMQ_STRATEGY);
app.connectMicroservice<CustomStrategy>(
{ strategy, options: {} },
{ inheritAppConfig: true },
);
await app.startAllMicroservices();
await app.listen(3000);3. Use event handlers
Use @EventPattern() (and optional MessagePattern()) as with any NestJS microservice. The transport uses pgmq queues; event handler groups map to queue names (see Configuration).
Configuration
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| connection | object | required | Postgres connection: host, port, user, password, database, optional ssl. |
| queuePrefix | string | "nestjs_" | Prefix for all queue names. |
| visibilityTimeoutSeconds | number | 30 | Visibility timeout when reading messages. |
| replyQueueName | string | "nestjs_reply" | Queue used for request-response. |
| pollIntervalMs | number | 500 | Poll interval in milliseconds. |
Queue names are derived from event handler patterns (or extras.group) and sanitized to alphanumeric + underscore; length is capped at 47 characters (pgmq limit). Longer names are truncated and suffixed with a short hash.
Caveats
- pgmq extension: PostgreSQL must have the pgmq extension available. Create it with
CREATE EXTENSION IF NOT EXISTS pgmq;(or your provider’s equivalent) if the extension is installed but not yet enabled in the database. - Queue name length: pgmq limits queue names to 47 characters. Patterns or group names are sanitized and truncated with a hash suffix when longer.
- ESM: This package is published as ESM (
"type": "module"). Useimportand, if needed,.jsextensions in your own ESM resolution. - One strategy per app: The module provides a single strategy instance. For multiple distinct connections or configs, use multiple Nest apps or a single strategy with one pool.
- Request-response: Reply queue and correlation are supported; ensure
replyQueueNamedoes not clash with other queues.
API
PgmqModule.forRoot(options)/PgmqModule.forRootAsync(options)– register the transport.PGMQ_STRATEGY– injection token for the strategy (use withapp.get(PGMQ_STRATEGY)or@Inject(PGMQ_STRATEGY)).CustomStrategy– type forconnectMicroservice<CustomStrategy>({ strategy, options: {} }).PgMqTransportStrategy,PgMqTransportStrategyOptions,PgmqHandlerExtras,PgmqContext– exported for typing and advanced use.PgMqTransportStrategy.fromDataSource(ds, overrides?)– build options from a TypeORMDataSource(requirestypeorm).
License
PRIVATE
