npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

@softwarity/nestjs-amqp

v0.3.4

Published

AMQP 1.0 integration for NestJS, powered by rhea. Decorator-based publishers and consumers, RxJS-friendly, with optional DLQ browser.

Readme

@softwarity/nestjs-amqp

npm version License: MIT Node Unit tests RabbitMQ Artemis

AMQP 1.0 integration for NestJS, powered by rhea. A thin, RxJS-friendly wrapper that exposes decorator-based publishers and consumers — designed for RabbitMQ 4.x (native AMQP 1.0), Apache ActiveMQ Artemis, and Apache Qpid.

📚 Full documentation: softwarity.github.io/nestjs-amqp


⚠ Read this before your first deploy

This library does NOT create topology at runtime. It opens senders and receivers on destinations that must already exist on the broker — queues, streams, exchanges, DLX bindings, the lot. Missing topology = silent failure (the AMQP link is rejected with amqp:not-found; the rest of the connection stays up and the app looks healthy).

Declare everything broker-side via a definitions file or an IaC script. Full examples for RabbitMQ 4.x (definitions.json + docker-compose), ActiveMQ Artemis (broker.xml), and Apache Qpid live on the doc site.


Why?

@nestjs/microservices only covers AMQP 0.9.1 (via amqplib). When you want AMQP 1.0 features — long-lived sessions, link credit, source filters, message annotations, stream consumers — rhea is the canonical Node.js client. This library wraps rhea so the rest of your codebase only sees @AmqpQueue, @Consume, and Observables.

Features

  • 🎯 Decorator-based publishers (@AmqpQueue, @AmqpTopic) and consumers (@Consume, @Subscribe)
  • 🌐 Multi-broker — speak to several brokers from one service; one connection / reply stream / DLQ per broker
  • 🔄 Request/Reply via per-process correlation prefix on a shared reply stream (opt-in)
  • 📡 Broadcast/PubSub via RabbitMQ streams (@Subscribe)
  • 🔁 Built-in retry policy (maxDelivery, dlq) on work-queue consumers (opt-in)
  • 💀 Optional DLQ browser — paginate, replay, drop dead-lettered messages
  • 🧬 Pluggable wire codec — JSON by default with Date round-trip + ObjectId auto-rehydration; bring your own per broker (msgpack, protobuf, …)
  • 🔧 forRoot / forRootAsync configuration
  • ⚛️ RxJS-native — no Promise wrapper, no axios-style imperative shapes

Installation

npm install @softwarity/nestjs-amqp rhea
# peer deps you probably already have
npm install @nestjs/common @nestjs/core rxjs reflect-metadata

Getting started — the 90% case

The simplest, most common setup: one broker, fire-and-forget publish, basic consume — no DLQ, no request/reply. Declare as many queues and topics as you need; the simplification here is the feature surface, not the quantity. Reply/DLQ are opt-in features documented further down.

1. Declare your queues and topics broker-side

The library never declares topology — only opens senders/receivers on destinations that already exist. Declare whatever your service needs (one queue, ten queues, mixed work-queues and broadcast streams — same exercise). With RabbitMQ 4.x via definitions.json:

{
  "queues": [
    {
      "name": "orders.create",
      "vhost": "/",
      "durable": true,
      "auto_delete": false,
      "arguments": { "x-queue-type": "quorum" }
    },
    {
      "name": "orders.ship",
      "vhost": "/",
      "durable": true,
      "auto_delete": false,
      "arguments": { "x-queue-type": "quorum" }
    },
    {
      "name": "changes.bulletin",
      "vhost": "/",
      "durable": true,
      "auto_delete": false,
      "arguments": { "x-queue-type": "stream", "x-max-age": "1h" }
    }
  ]
}

Quorum queues for work-queue semantics (one consumer per message), stream queues for broadcast (every consumer sees every message). The library makes no assumption about how many you declare.

2. Register the module

import { Module } from '@nestjs/common';
import { AmqpModule } from '@softwarity/nestjs-amqp';

@Module({
  imports: [
    AmqpModule.forRoot({
      url: 'amqp://localhost:5672',
      username: 'guest',
      password: 'guest',
    }),
  ],
})
export class AppModule {}

A single broker (the name is implicit — internally 'default'). Because only one broker is configured, the brokerName argument is optional on every decorator and on the locator — the library resolves the lone broker automatically. If you want a custom name (visible as the AMQP container ID on the broker management UI), wrap in an array even with one entry: AmqpModule.forRoot([{ name: 'my-svc', url, ... }]).

3. Publish — fire and forget

import { Injectable } from '@nestjs/common';
import { AmqpQueue, AmqpTopic } from '@softwarity/nestjs-amqp';

@Injectable()
export class OrdersService {
  @AmqpQueue('orders.create')
  private readonly create!: AmqpQueue<OrderBody>;

  @AmqpQueue('orders.ship')
  private readonly ship!: AmqpQueue<OrderShipped>;

  @AmqpTopic('changes.bulletin')
  private readonly changes!: AmqpTopic<BulletinChange>;

  newOrder(body: OrderBody): void {
    this.create.emit(body);                       // fire-and-forget
  }

  notifyShipped(body: OrderShipped): void {
    this.ship.emit(body);
    this.changes.emit({ type: 'shipped', orderId: body.id, when: new Date().toISOString() });
  }
}

@AmqpQueue for work-queues (point-to-point) and @AmqpTopic for broadcast. emit() returns synchronously a booleantrue if the message was handed off to the sender, false if the broker is disabled or not connected. The boolean lets the caller fall back (e.g. NestJS EventEmitter2 for in-process delivery, a local outbox, …):

if (!this.orders.emit(body)) {
  this.bus.emit('orders.create', body);   // in-process fallback
}

Each handle is generic on the payload type — every call site is type-checked at compile time.

4. Consume

import { Injectable } from '@nestjs/common';
import { Consume, Subscribe } from '@softwarity/nestjs-amqp';

@Injectable()
export class OrdersListener {
  // The single un-annotated argument is bound to the JSON-decoded body.
  // Equivalent to writing @AmqpBody() explicitly.
  @Consume('orders.create')
  onCreate(order: OrderBody): void {
    this.svc.handle(order);
  }

  @Consume('orders.ship')
  onShip(shipped: OrderShipped): void {
    this.svc.markShipped(shipped);
  }

  @Subscribe('changes.bulletin')
  onChange(change: BulletinChange): void {
    this.realtime.publish(change);
  }
}

Start the app — you'll see a boot log section like broker 'default': 3 consumer(s) followed by one line per binding (each tagged @Consume or @Subscribe). You're done.

What's NOT in the 90% case

The bootstrap above intentionally skips three optional features. Add them à la carte:

| Feature | What you gain | What you have to do | |---|---|---| | Request / reply (send()) | Wait for a reply Observable — RPC-style. | Declare a stream queue broker-side, add replyStreamAddress to the broker config. | | Retry & DLQ | Auto-retry on handler error, then route the failed message to a DLQ. | Declare a DLX + DLQ broker-side, set { maxDelivery, dlq: true } on the decorator. | | Multiple brokers | Speak to several brokers from one service. | Pass an array to forRoot, pass brokerName on each decorator. |


Request / reply — opt-in

AmqpQueue.send() returns an Observable that resolves with the peer's reply. It needs three things:

1. Declare a stream queue broker-side

{
  "queues": [{
    "name": "my-service.replies",
    "vhost": "/",
    "durable": true,
    "auto_delete": false,
    "arguments": { "x-queue-type": "stream", "x-max-age": "5m" }
  }]
}

2. Set replyStreamAddress on the broker options

AmqpModule.forRoot({
  url: 'amqp://localhost:5672',
  username: 'guest', password: 'guest',
  replyStreamAddress: 'my-service.replies',   // ← REQUIRED for send()
});

3. Call send() on the publisher side

createOrder(body: OrderBody): Observable<OrderConfirmation> {
  return this.orders.send<OrderConfirmation>(body, { timeoutMs: 5000 });
}

4. Return a value from the consumer to auto-reply

@Consume('orders.create')
onCreate(body: OrderBody): Observable<OrderConfirmation> {
  return this.svc.create(body);   // resolved value -> auto-shipped on reply_to
}

The library generates a per-process correlation prefix at boot and filters incoming replies on the shared reply stream — every instance sees every reply but only routes its own. Trade-off: N× bandwidth per reply (negligible for low-volume RPC on a LAN).

Without replyStreamAddress set on the broker, send() throws AmqpConnectionError at the call site. emit() and @Consume continue to work unchanged.

📚 Full details: doc site → Request / reply


Retry & DLQ — opt-in

Retry and DLQ are off by default (maxDelivery: 1, dlq: false) — handler errors silently drop the message.

The lib never publishes to a DLQ itself. On terminal failure with dlq: true, it calls delivery.reject() and the broker routes the message via its own DLX configuration. If the queue has no DLX broker-side, dlq: true is silently ignored (the broker discards rejected messages).

Setup with RabbitMQ 4.x

1. Declare DLX + DLQ broker-side:

{
  "exchanges": [{
    "name": "my-service.dlx",
    "vhost": "/",
    "type": "direct",
    "durable": true,
    "auto_delete": false
  }],

  "queues": [
    {
      "name": "payments.process",
      "vhost": "/",
      "durable": true,
      "arguments": {
        "x-queue-type": "quorum",
        "x-dead-letter-exchange": "my-service.dlx",
        "x-dead-letter-routing-key": "payments.process"
      }
    },
    {
      "name": "my-service.dlq",
      "vhost": "/",
      "durable": true,
      "arguments": { "x-queue-type": "quorum" }
    }
  ],

  "bindings": [{
    "source": "my-service.dlx",
    "vhost": "/",
    "destination": "my-service.dlq",
    "destination_type": "queue",
    "routing_key": "payments.process",
    "arguments": {}
  }]
}

2. Set defaultDlqAddress on the broker options (used by the DLQ admin UI as a pre-fill):

AmqpModule.forRoot({
  url: 'amqp://localhost:5672',
  username: 'guest', password: 'guest',
  defaultDlqAddress: 'my-service.dlq',
});

3. Enable the policy on the decorator:

@Consume('payments.process', { maxDelivery: 5, dlq: true })
onPayment(body: Payment): Observable<Result> {
  return this.svc.process(body);
}

Run-time behaviour: handler throws → modified(delivery_failed:true) × 4 retries → on the 5th failure → reject() → broker routes to my-service.dlx with routing key payments.processmy-service.dlq.

retryPolicy — delayed retries (in 0.3.x)

The decorator accepts a retryPolicy option that defines the timing between retries. In 0.2.x only 'immediate' is functionalfixed / exponential shapes are accepted by the type system for forward-compatibility (runtime falls back to immediate with a boot warning). Client-side scheduled republish is planned for 0.3.x.

type RetryPolicy =
  | 'immediate'
  | { kind: 'fixed';       delayMs: number }
  | { kind: 'exponential'; initialMs: number; multiplier: number; maxMs: number };

📚 Full details: doc site → Retry & DLQ


Multi-broker

Pass an array to forRoot and pass the broker name on each decorator. Each broker is independent — its own connection, reply stream, DLQ, body codec, enabled flag.

AmqpModule.forRoot([
  {
    name: 'primary',
    url: 'amqp://broker-a:5672',
    username: 'svc', password: '...',
    replyStreamAddress: 'my-svc.replies',
    defaultDlqAddress: 'my-svc.dlq',
  },
  {
    name: 'analytics',
    url: 'amqp://broker-b:5672',
    username: 'svc', password: '...',
    enabled: false,                       // per-broker kill switch
    // No reply stream / DLQ — analytics is emit-only.
  },
]);

@Injectable()
export class MixedService {
  @AmqpQueue('orders.create', 'primary')        private orders!: AmqpQueue<OrderBody>;
  @AmqpTopic('metrics.collected', 'analytics')  private metrics!: AmqpTopic<Metric>;
}

@Injectable()
export class MixedListener {
  @Consume('orders.create', 'primary', { dlq: true })
  onOrder(o: OrderBody): void { ... }

  @Subscribe('events.tick', 'analytics')
  onTick(e: TickEvent): void { ... }
}

The 2nd argument on @Consume / @Subscribe is detected at runtime — string = broker name, object = options. The forms (addr), (addr, options), (addr, brokerName), (addr, brokerName, options) are all valid.

Forgetting the broker name in a multi-broker setup throws clearly at boot.

📚 Full details: doc site → Multi-broker


Quick reference

Decorators

@AmqpQueue(address, brokerName?)        // Property → AmqpQueue<T> (emit + send)
@AmqpTopic(address, brokerName?)        // Property → AmqpTopic<T> (emit only)

@Consume(address, brokerName?, options?)        // Method, work-queue consumer
@Subscribe(address, brokerName?, options?)   // Method, stream/topic consumer

brokerName is optional when a single broker is configured. With several brokers, omitting it throws at boot.

Parameter decorators

@AmqpBody()                // T — decoded body (also: a single un-annotated param is implicit @AmqpBody())
@AmqpAddress()             // string — the @Subscribe address
@AmqpDeliveryCount()       // number — 1-based attempt count
@AmqpHeader()              // MessageHeader — durable, priority, ttl, delivery_count
@AmqpProperties()          // MessageProperties — full standard properties
@AmqpProperty(name)        // one field of message.properties
@AmqpAppProperties()       // Record<string, unknown> — full application_properties
@AmqpAppProperty(name)     // one field of application_properties
@AmqpSettler()             // AmqpSettler — manual accept/release/reject
@AmqpContext()             // AmqpContext — full envelope + settle helpers

Runtime resolution — AmqpDestinations

Inject AmqpDestinations to resolve a publish handle dynamically (tenant-scoped queues, dispatchers):

@Injectable()
export class DynamicPublisher {
  constructor(private readonly amqp: AmqpDestinations) {}

  publish(tenantId: string, body: OrderBody): void {
    this.amqp.queue<OrderBody>(`orders.${tenantId}`).emit(body);
  }
}

DLQ browser — DlqAdminModule (opt-in)

@Module({
  imports: [
    AmqpModule.forRoot({ url: '...', /* ... */ }),
    DlqAdminModule,   // adds /admin/dlq/... routes
  ],
})
export class AppModule {}

Routes (single-broker shortcut):

POST /admin/dlq/sessions                            { dlqAddress, pageSize? }
GET  /admin/dlq/sessions/:token
POST /admin/dlq/sessions/:token/next-page
POST /admin/dlq/sessions/:token/messages/:idx/replay
POST /admin/dlq/sessions/:token/messages/:idx/drop
POST /admin/dlq/sessions/:token/close

Multi-broker variant: POST /admin/dlq/:broker/sessions { ... } to scope the open-session to a specific broker. Other routes work off the session token (the session knows its broker).

⚠️ Auth not included. The controller is unguarded — wrap with your own Guard, or sub-class and redeclare with your decorators. openedBy is read from req.user.username ?? req.user.id ?? 'anonymous'.

Serialization / Deserialization — per broker

AmqpModule.forRoot([
  { name: 'primary',   url: '...', /* default JSON codec */ },
  { name: 'analytics', url: '...', bodyCodec: new MsgpackCodec() },
]);

Default JsonBodyCodec:

  • UTF-8 JSON
  • Round-trips Date via { "$date": "<ISO>" }
  • Encodes ObjectId-like values as { "$oid": "<hex>" }; decode auto-detects mongoose / bson and returns a real ObjectId instance if installed, else the marker object

Errors

| Class | Where it surfaces | |---|---| | AmqpConnectionError | Connection-level issues, send() when AMQP is disabled or no reply stream is configured on the broker | | AmqpTimeoutError | send() Observable when no reply arrives in time. Carries address, correlationId, timeoutMs | | AmqpHandlerError | Reserved for future use | | AmqpError | Abstract base — if (err instanceof AmqpError) … |

Known limitations

  • In-flight send() across reconnects — if a reconnect happens between sending and receiving the reply, the reply is lost (we re-subscribe with streamOffset: 'next'). The pending call times out.
  • topic.send() (scatter-gather RPC) — not supported. Build aggregation in user code on top of emit() if needed.
  • @Subscribe replay — hardcoded to streamOffset: 'next'. PR welcome for a dedicated @SubscribeStream exposing the option.
  • Delayed retry (retryPolicy) — only 'immediate' is functional in 0.2.x. fixed / exponential shapes accepted by the type system; runtime falls back to immediate with a boot warning.

License

MIT © François ACHACHE

Contributing

PRs welcome. Run npm test && npm run lint && npm run build before submitting.