npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

@open-rlb/nestjs-amqp

v1.0.14

Published

Quick guide for using the npm package.

Readme

@open-rlb/nestjs-amqp

Quick guide for using the npm package.

Guide scope: rpc, handle, broadcast.

Installation

npm i @open-rlb/nestjs-amqp

Basic setup

AppModule

import { HttpModule } from '@nestjs/axios';
import { Module } from '@nestjs/common';
import { ConfigModule, ConfigService } from '@nestjs/config';
import { BrokerModule, ProxyModule } from '@open-rlb/nestjs-amqp';

@Module({
  imports: [
    BrokerModule.forRootAsync({
      imports: [ConfigModule],
      inject: [ConfigService],
      useFactory: async (config: ConfigService) => ({
        options: config.get('broker'),
        topics: config.get('topics'),
        appOptions: config.get('app'),
        authOptions: config.get('auth-providers'),
        gatewayOptions: config.get('gateway'),
      }),
    }),
    HttpModule,
    ProxyModule.forRoot([]),
  ],
})
export class AppModule {}

Bootstrap

If you use parseRaw: true in gateway routes:

const app = await NestFactory.create(AppModule, { rawBody: true });

Minimal config.yaml

# yaml-language-server: $schema=./schema.json

app:
  environment: development

broker:
  uri: "amqp://localhost/<vhost>"
  defaultSubscribeErrorBehavior: "ack"
  defaultPublishErrorBehavior: "reject"
  connectionManagerOptions:
    heartbeatIntervalInSeconds: 60
    reconnectTimeInSeconds: 60
    connectionOptions:
      clientProperties:
        connection_name: "connection-name"
      credentials:
        mechanism: PLAIN
        username: guest
        password: guest

  exchanges:
    - name: users-ex
      type: direct
      createExchangeIfNotExists: true
      options:
        durable: true

  queues:
    - name: users-rpc-q
      exchange: users-ex
      routingKey: users.rpc
      createQueueIfNotExists: true
      options:
        durable: true

  replyQueues:
    users-ex: users-reply-q

topics:
  - name: users-rpc
    mode: rpc
    queue: users-rpc-q

gateway:
  mode: gateway
  paths:
    - name: users-create
      method: POST
      path: /users
      dataSource: body
      topic: users-rpc
      action: user.create
      mode: rpc
  events: []

Quick usage

RPC handler with decorators

import { Injectable } from '@nestjs/common';
import { BrokerAction, BrokerParam } from '@open-rlb/nestjs-amqp';

@Injectable()
export class UsersActionService {
  @BrokerAction('users-rpc', 'user.create', 'rpc')
  async createUser(
    @BrokerParam('body', 'email') email: string,
    @BrokerParam('body', 'role') role: string,
    @BrokerParam('header', 'X-GTW-AUTH-USERID') userId: string,
  ) {
    return { id: 'usr_1', email, role, createdBy: userId };
  }
}

RPC call from code

import { Injectable } from '@nestjs/common';
import { BrokerService } from '@open-rlb/nestjs-amqp';

@Injectable()
export class UsersClientService {
  constructor(private readonly broker: BrokerService) {}

  async createUser() {
    return this.broker.requestData(
      'users-rpc',
      'user.create',
      { email: '[email protected]', role: 'admin' },
      { 'X-Tenant': 'acme' },
      5000,
    );
  }
}

Manual consumer (without decorators)

await broker.registerRpc<{ id: string }, { ok: boolean }>('health-rpc', async (event) => {
  return { ok: !!event.payload?.id };
});

await broker.registerHandler<{ invoiceId: string }>('invoice-handle', async (event) => {
  console.log(event.payload.invoiceId);
});

Quick reference

BrokerService

| Method | Use | | ---------------------------------------------------------- | ------------------------------- | | requestData(topic, action, payload?, headers?, timeout?) | RPC request/response | | registerRpc(topic, handler) | manual RPC consumer | | registerHandler(topic, handler) | handle / broadcast consumer |

Topic types (topics[].mode)

| Type | Use it when | Minimal config | Why | | ----------- | -------------------------------------- | ------------------------------------------------------- | ------------------------------------ | | rpc | you need request/response | name, mode: rpc, queue (or exchange+routingKey) | immediate response + timeout control | | handle | you need a worker on one queue | name, mode: handle, queue | simple queue consumer | | broadcast | you need one message to many consumers | name, mode: broadcast, exchange, routingKey | fanout/topic pattern | | event | you need publish without response | name, mode: event, queue or exchange+routingKey | fire-and-forget (not covered here) |

Quick snippet:

topics:
  - name: users-rpc
    mode: rpc
    queue: users-rpc-q

  - name: invoice-handle
    mode: handle
    queue: invoice-handle-q

  - name: notify-broadcast
    mode: broadcast
    exchange: notify-ex
    routingKey: notify.#

Decorators

| Decorator | Use | | ------------------------------------------------------------- | ------------------------------------ | | @BrokerAction(topic, action, type?) | binds method to topic/action | | @BrokerParam(source, name?) | maps method params from message data | | @BrokerAuth(authName, allowAnonymous?, roles?) | auth metadata | | @BrokerHTTP(method, path, dataSource?, timeout?, parseRaw?) | HTTP metadata |

@BrokerParam sources

| Source | Injected value | | ----------- | --------------------------------- | | body | payload[name or parameter name] | | body-full | full payload | | header | headers[name or parameter name] | | tag | AMQP consumer tag | | action | message action | | topic | current topic |

gateway.paths[].dataSource

| Value | Payload | | ------------ | -------------------------------- | | body | {...params, ...body} | | query | {...params, ...query} | | params | params | | body-query | {...params, ...query, ...body} | | query-body | {...params, ...body, ...query} |

Auth providers

Type: jwks

auth-providers:
  - name: gateway-jwks
    type: jwks
    issuer: https://issuer.example.com/realms/main
    jwksUri: https://issuer.example.com/certs
    algorithms: [RS256]
    jwtMap:
      - sub:userId
      - roles:roles
    headerPrefix: X-GTW-AUTH-
    uidClaim: USERID
    usernameClaim: USERNAME
    aclTopic: acl
    aclAction: can-user-do

Type: jwt

auth-providers:
  - name: gateway-jwt
    type: jwt
    secret: your-jwt-secret
    issuer: https://issuer.example.com/realms/main
    audience: your-audience
    algorithms: [HS256]
    jwtMap:
      - sub:userId
      - roles:roles
    headerPrefix: X-GTW-AUTH-
    uidClaim: USERID
    usernameClaim: USERNAME
    aclTopic: acl
    aclAction: can-user-do

Type: str-compare

auth-providers:
  - name: gateway-str
    type: str-compare
    secret: your-static-token
    headerPrefix: Bearer

Gateway path (RPC)

- name: users-create
  method: POST
  path: /users
  dataSource: body
  topic: users-rpc
  action: user.create
  mode: rpc
  timeout: 7000

Common errors

  • Topic <name> not found in configuration: check topics[].name, @BrokerAction, requestData, gateway.paths[].topic.
  • Queue <name> not found in configuration: check that topics[].queue exists in broker.queues[].
  • 401/403 from gateway: check gateway.paths[].auth, auth-providers[], ACL service when using roles.