npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

@gedai/nestjs-amqp

v0.0.8

Published

AMQP component for gedai project

Downloads

102

Readme

@gedai/nestjs-amqp

Description

This package serves as a comprehensive enhancement to the @golevelup/nestjs-rabbitmq, offering a seamless integration with NestJS applications. It extends functionalities to enable easy subscription to messages from an exchange while introducing several additional features to streamline message handling.

Prerequisites

This package requires the installation of the following dependencies:

  • @gedai/nestjs-core

This package seamlessly integrates with RabbitMQ's X-Delayed Message Plugin to handle the delayed retrial of messages, optimizing message delivery and processing. A RabbitMQ Server with the plugin installed is needed in order for this package to work.

Getting Started

Step 1: Installation

Install the necessary packages with your favorite Package Manager.

$ npm install @gedai/nestjs-core @gedai/nestjs-amqp @nestjs/config

Step 2: Configuration Setup

Create a common NestJS @Injectable() provider class for your subscription handlers.

// app.subscription.ts
import { AmqpHeaders, AmqpPayload, AmqpSubscription } from '@gedai/nestjs-amqp';
import { Injectable, Logger } from '@nestjs/common';
import { AppService } from './app.service';

@Injectable()
export class AppSubscription {
  private readonly logger = new Logger(this.constructor.name);

  constructor(private readonly appService: AppService) {}

  @AmqpSubscription({
    exchange: 'my.exchange',
    queue: 'my.consumer1',
    routingKey: '#',
    channel: 'myChannel1',
    prefetch: 5,
  })
  async getHello(@AmqpPayload() data: any, @AmqpHeaders() headers: any) {
    this.logger.log('Got a message', 'Consumer 1');
  }
}

In your app.module.ts, import the required modules and set up the necessary dependencies.

// app.module.ts
import { AmqpModule } from '@gedai/nestjs-amqp';
import { ContextModule } from '@gedai/nestjs-core';
import { Module } from '@nestjs/common';
import { ConfigModule, ConfigService } from '@nestjs/config';
import { AppController } from './app.controller';
import { AppService } from './app.service';
import { AppSubscription } from './app.subscription';

@Module({
  imports: [
    ConfigModule.forRoot({ isGlobal: true }),
    ContextModule.forRoot({}),
    AmqpModule.forRootAsync({
      inject: [ConfigService],
      useFactory: (config: ConfigService) => ({
        url: config.getOrThrow('AMQP_URL'),
        exchanges: [
          { name: 'my.exchange' },
          // ::keep layout::
        ],
      }),
    }),
  ],
  controllers: [AppController],
  providers: [AppService, AppSubscription],
})
export class AppModule {}

Features

Retrial Policy

Enables the definition and implementation of retrial policies for consumers, ensuring robustness in message delivery. To integrate a retrial policy into your subscription handler, use the @AmqpRetrialPolicy decorator as follows:

// app.subscription.ts
import { Injectable, Logger } from '@nestjs/common';
import {
  AmqpHeaders,
  AmqpPayload,
  AmqpRetrialPolicy,
  AmqpSubscription,
} from '@gedai/nestjs-amqp';
import { AppService } from './app.service';

@Injectable()
export class AppSubscription {
  private readonly logger = new Logger(this.constructor.name);

  @AmqpSubscription({
    exchange: 'my.exchange',
    queue: 'my.consumer1',
    routingKey: '#',
    channel: 'myChannel1',
    prefetch: 10,
  })
  // Apply Retrial Policy, delay timing in seconds
  @AmqpRetrialPolicy({ maxAttempts: 3, delay: 5, maxDelay: 60 })
  async getHello(@AmqpPayload() data: any, @AmqpHeaders() headers: any) {
    this.logger.log('Received a message', 'Consumer 1');
  }
}

Throttling Policy

Facilitates the implementation of throttling policies to regulate message consumption and processing, enhancing system stability under heavy loads. To integrate a throttling policy into your subscription handler, first set the prefetch: 1 on the handler and use the @AmqpThrottlePolicy decorator as follows:

// app.subscription.ts
import { Injectable, Logger } from '@nestjs/common';
import {
  AmqpHeaders,
  AmqpPayload,
  AmqpSubscription,
  AmqpThrottlePolicy,
} from '@gedai/nestjs-amqp';
import { AppService } from './app.service';

@Injectable()
export class AppSubscription {
  private readonly logger = new Logger(this.constructor.name);

  @AmqpSubscription({
    exchange: 'my.exchange',
    queue: 'my.consumer1',
    routingKey: '#',
    channel: 'myChannel1',
    prefetch: 1,
  })
  // Apply Throttling Policy
  @AmqpThrottlePolicy(5) //for 5 messages per second rate
  async getHello(@AmqpPayload() data: any, @AmqpHeaders() headers: any) {
    this.logger.log('Received a message', 'Consumer 1');
  }
}

Message Inspection

Provides tools for comprehensive message inspection, empowering developers to gain insights into message content and structure for effective debugging and monitoring.

It can be configured by providing trafficInspection: { mode: 'all' } to AmqpModule and supports the values all, none, inbound, or outbound.

Message Validation

Supports message validation mechanisms, ensuring that incoming messages adhere to predefined schemas or criteria, thereby maintaining data integrity and system reliability. To set up validation for your DTOs, integrate them into your subscription handlers as follows:

// app.subscription.ts
import { Injectable, Logger } from '@nestjs/common';
import { IsString } from 'class-validator';
import {
  AmqpHeaders,
  AmqpPayload,
  AmqpThrottlePolicy,
  AmqpSubscription,
} from '@gedai/nestjs-amqp';
import { AppService } from './app.service';

// Define DTOs with validation decorators
class DogDTO {
  @IsString()
  name: string;

  @IsString()
  breed: string;
}

@Injectable()
export class AppSubscription {
  private readonly logger = new Logger(this.constructor.name);

  constructor(private readonly appService: AppService) {}

  @AmqpSubscription({
    exchange: 'my.exchange',
    queue: 'my.consumer1',
    routingKey: '#',
    channel: 'myChannel1',
  })
  async getHello(
    @AmqpPayload()
    data: DogDTO /* Map DTOs in Handlers decorated with @AmqpPayload() */,
    @AmqpHeaders() headers: any,
  ) {
    // Your message handling logic here
    this.logger.log('Received a message', 'Consumer 1');
  }
}

If a message fails validation it will go directly to the dead letter queue ignoring any retrial policy.

Retrial Architecture

This module utilizes the RabbitMQ Plugin X-Delayed-Message to facilitate delayed retrials.

Upon error detection, the message is dispatched to delayed.retrial.v1.exchange, with the original queue serving as the routing key. Subsequently, after the specified delay period, it is forwarded to delayed.retrial.v1.rerouter.queue. This queue is configured with the AMQP Default Exchange as its dead letter exchange and is set to expire messages immediately upon receipt.

Consequently, upon reaching the queue, messages are expired and directed to the dead letter exchange, utilizing the original queue as the routing key. The Default Exchange then reroutes the message back into the original queue for consumption.

In the event maximum attempts is reached and the message continues to fail, it is then redirected to the Dead Letter Queue (DLQ). If no retrial policy is provided or if the message fails validation, it is directly routed to the DLQ.

License

Gedai is MIT licensed.