npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

@enriqcg/nestjs-amqp

v2.0.4

Published

AMQP/RabbitMQ module for NestJS with decorator support

Downloads

261

Readme

AMQP (RabbitMQ) Client for NestJS

AMQP module for NestJS with decorator support.

Note

This project is still a work-in-progress and is being actively developed. Issues and PRs are welcome!


This module injects a channel from amqp-connection-manager. Please check the Channel documentation for extra insight on how to publish messages.

Connections are recovered when the connection with the RabbitMQ broker is lost.

Installation

$ npm i --save @enriqcg/nestjs-amqp
$ npm i --save-dev @types/amqplib

The concept of a Service in @enriqcg/nestjs-amqp

This library was built to solve for the use case of wanting to load balance messages published to a 'topic' across multiple replicas of a same service. The way we make that possible is using a service definition. We consider a service a collection of replicas that run copies of the same codebase.

Using a service defiition in @enriqcg/nestjs-amqp is totally optional if you don't need to balance messages across replicas.

This library leverages RabbitMQ's exchanges, routing keys and queue bindigs to achieve this goal. Start by defining a service when importing AMQPModule by providing a name and an exchange name.

@Module({
  imports: [
    AMQPModule.forRoot({
      hostname: 'rabbitmq',
      assertQueuesByDefault: true,
      assertExchanges: [
        // we are making sure our exchange is ready
        // this is optional
        {
          name: 'example_exchange',
          type: 'topic',
        },
      ],
      service: {
        name: 'example_service',
        exchange: 'example_exchange',
      },
    }),
  ],
})
export class AppModule {}

The service name is used to register and identify replicas of a same service. You can run multiple services using this library on the same exchange (in fact, that is really powerful as one message can end up in multiple services).

Then we can set up our consumer:

@Consumer()
@Controller()
export class AppController {
  @Consume('test.event')
  async testHandler(body: unknown) {
    console.log(this.appService.getHello())
    return true
  }
}

The resulting effect of defining the service and using the @Consume decorator in this setup will be the creation of a queue with name test.event-example_service. If other replicas of this same code were to be created, they would join as consumers of the same queue, thus balancing the load of test.event messages across multiple instances.

Getting Started

Register the AMQPModule in app.module.ts and pass a configuration object:

import { Module } from '@nestjs/common'
import { AMQPModule } from '@enriqcg/nestjs-amqp'

@Module({
  imports: [
    AMQPModule.forRoot({
      hostname: 'rabbitmq',
      username: 'guest',
      password: 'guest',
      assertQueuesByDefault: true,
      assertExchanges: [
        // these exchanges will be asserted on startup
        {
          name: 'example_exchange',
          type: 'topic',
        },
        {
          name: 'fanout_exchange',
          type: 'fanout',
        },
      ],
    }),
  ],
})
export class AppModule {}

AMQPModule options reference

You can also check documentation on amqplib's Exchange and Queue assertion.

Publisher

You can now inject an AMQP Channel in your services and use it to push messages into an exchange or a queue.

import { Injectable } from '@nestjs/common'
import { InjectAMQPChannel } from '@enriqcg/nestjs-amqp'
import { Channel } from 'amqplib'

@Injectable()
export class ExampleService {
  constructor(
    @InjectAMQPChannel()
    private readonly amqpChannel: Channel,
  ) {}

  async sendToExchange() {
    this.amqpChannel.publish(
      'exchange_name',
      'routing_key',
      Buffer.from(JSON.stringify({ test: true })),
    )
  }
}

Check amqplib's reference on channel.publish().

Consumer

@enriqcg/nestjs-amqp allows you to define consumer functions using decorators in your controllers.

import { Consume } from '@enriqcg/nestjs-amqp'

@Consumer('user') // event prefix
@Controller()
export class ExampleController {
  constructor(private readonly exampleService: ExampleService) {}

  @Consume('created') // handler for user.created
  handleCreatedEvent(content: string) {
    console.log(JSON.parse(content))
    return false // message will not be acked
    return true //message will be acked
    // no return? -> message will be acked
  }

  // handler for user.updated.address
  @Consume({
    queueName: 'updated.address',
    noAck: false,
    // queue will be deleted after all consumers are dropped
    assertQueue: true,
    autoDelete: true,
  })
  handleUpdatedAddressEvent(content: string) {
    const payload = JSON.parse(content)

    try {
      // pass data to your services
      this.exampleService.update(payload)
    } catch (e) {
      console.error(e)
      return false // message will not be acked
    }

    // message will be automatically acked
  }
}

The message content is decoded to a string and provided to decorated methods. Depending on what content you published, further deserialization might be needed. (Building decorators to help decode JSON payloads is on the TODO).

Message Acknowledgment

If automatic acknowledgment is disabled for a queue (noAck = true), to ack a message, the decorated method should return a non-false value. Anything else than a false value will acknowledge a message (even void).

Connection options

interface AMQPModuleOptions {
  /**
   * The host URL for the connection
   *
   * Default value: 'localhost'
   */
  hostname?: string
  /**
   * The port of the AMQP host
   *
   * Default value: 5672
   */
  port?: number
  /**
   * The name of the connection. Only really relevant in multiple
   * connection contexts
   *
   * Default value: 'default'
   */
  name?: string
  /**
   * Service definition. Please see README.md to learn about how services
   * work in @enriqcg/nestjs-amqp
   *
   * Default value: {}
   */
  service?: {
    name: string
    exchange: string
  }
  /**
   * Makes sure that the exchanges are created and are of the same
   * type on application startup.
   *
   * Default value: []
   */
  assertExchanges?: [
    {
      /**
       * Name of the exchange to bind queues to
       *
       * A value is required
       */
      name: string
      /**
       * Name of the exchange to bind queues to
       *
       * A value is only required if the exchange is asserted
       */
      type?: 'direct' | 'topic' | 'headers' | 'fanout' | 'match'
    },
  ]
  /**
   * Assert queues by default using the @Consume decorator
   * Consumer options defined in @Consume decorator take priority
   *
   * Default value: 'default'
   */
  assertQueuesByDefault?: boolean
  /**
   * Username used for authenticating against the server
   *
   * Default value: 'guest'
   */
  username?: string
  /**
   * Password used for authenticating against the server
   *
   * Default value: 'guest'
   */
  password?: string
  /**
   * The period of the connection heartbeat in seconds
   *
   * Default value: 0
   */
  heartbeat?: number
  /**
   * What VHost shall be used
   *
   * Default value: '/'
   */
  vhost?: string
  /**
   * Wait for a full connection to the AMQP server before continuing
   * with the rest of the NestJS app initialization.
   *
   * This prevents HTTP requests and other entry-points from reaching
   * the server until there is a valid AMQP connection.
   *
   * Default value: false
   */
  wait?: boolean
}

License

MIT License

Copyright (c) 2021-present, Enrique Carpintero