npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

@pixellot/pxlt-rabbit-handler

v12.0.8

Published

A generic class that handles RabbitMQ connection, consume and produce functionality.

Downloads

77

Readme

pxlt-rabbit-handler

Description

A generic class which handles RabbitMQ connect, consume and produce functionalities.

npm install pxlt-rabbit-handler

Overview

A library that uses amqplib and provides out-of-the-box RabbitMQ related functionality: connect, consume, publish, disconnection recovery, and thus allowing for decoupling of business logic from Rabbit-related code, by emitting events to notify clients on any Rabbit related occurrences.

The 'connect' logic creates a single connection based on parameters provided in the object initialization.
Upon connect failure or connection closed event, an exponential backoff retry mechanism will be applied to re-obtain a connection. The maximum number of retry attempts and other relevant parameters can be set in the constructor. If not provided, defaults will be used as described in Class parameters section below.

The 'consume' logic contains a creation of a channel and consume on that channel. It accepts a callback function to be executed upon every message arrival.
It is based on a user-defined prefetch value (defaults to 1). The channel created during consume defaults to confirmation mode (using createConfirmChannel). To create a channel using createChannel, set 'confirm' parameter to false when calling consume.
One channel will be created per consumed queue.
Upon a message arrival, the provided processing callback will be invoked. Upon a connection closed event, all non-cancelled consumed queues will be automatically restored by the class if connection successfully re-established. In case of consume failure after a reconnection, connection will be closed and 'connectionClosed' event will be emitted, so it's important to listen to this event and is recommended to exit the app.

The 'publish' method accepts, apart from the mandatory parameters of exchange, routing key and message, an optional 'options' object that complies with amqplib's documentation. In addition, a 'timeout' option can be provided - if provided, an exception will be thrown if the timeout has passed before the server sent an ack/nack.
All publish/sendMessage calls will use one channel, which will be a separate channel than any consume activity.

The 'getMessage' method supports RabbitMQ's pull api, meaning pulling single messages from a specific queue.
All getMessage calls will use once channel, same as publish/sendMessage.

The 'getConnectionState' method returns the connection state: 'active', 'closed', 'error', 'blocked'.

Class parameters

  1. connString (Required): string containing connection information (username, password, endpoint, virtual host).
  2. options (Optional): an object containing any supported rabbit options, and additional custom parameters:
    • 'maxBackoffIntervalMs', the maximum interval in ms of the exponential backoff retry mechanism for the connect function. Defaults to 60000 ms if not provided
    • 'backoffFactor', the multiplication factor of the base-2 exponent. Defaults to 50 if not provided
    • 'maxBackoffCount', the maximum connect retry count desired. Defaults to 5 if not provided.
    • 'logLevel', the log level from which logging to console is enabled. Defaults to 'info' if not provided.

Emitted events

  • connectionCreated: upon connection success, will send a boolean denoting whether a disconnection preceded the newly created connection
  • connectionClosed: upon failure to achieve connection after maxBackoffCount retries
  • connectionError: upon connection error
  • connectionBlocked: upon connection blocked
  • connectionUnblocked: upon connection unblocked

Promise API example

const rabbit = require('pxlt-rabbit-handler');

const rabbitInst = new rabbit('amqp://<USERNAME>:<PASSWORD>@<CLUSTER_ENDPOINT>/<VHOST>', { heartbeat: 10, maxBackoffIntervalMs: 60000, logLevel: 'debug' });

// Consumer
// New incoming message
function processMessage1(message) {
    logger.info(Buffer.from(message.message.content).toString('utf8'));
    return Promise.resolve() // do business logic of msgs from queue1
        .then(() => rabbitInst.ack(message))
        .catch((err) => rabbitInst.nack(message));
}
function processMessage2(message) {
    logger.info(Buffer.from(message.message.content).toString('utf8'));
    return Promise.resolve() // do business logic of msgs from queue2
        .then(() => rabbitInst.ack(message))
        .catch((err) => rabbitInst.nack(message));
}

// If connection was closed and could not be re-established after maxBackoffCount or re-consume failed, exit
rabbitInst.on('connectionClosed', (err) => {
    logger.error(`Failed to connect to rabbit: ${err}, Exiting...`);
    process.exit(1);
});

rabbitInst.connect()
    .then(() => {
        const queues = [{ q: 'queueName1', cb: processMessage1 }, { q: 'queueName2', cb: processMessage2 }];
        const options = { prefetch: 10 };
        return Promise.all(queues.map((q) => rabbitInst.consume(q.q, q.cb, options).catch((err) => { logger.info(err); })));
    })
    .then(() => rabbitInst.cancelConsumer('queueName1'))
    .catch((err) => {
        logger.info(err);
    });

// Publisher
rabbitInst.connect()
    .then(() => rabbitInst.checkExchange('<EXCHANGE_NAME>'))
    .then(() => rabbitInst.publish('<EXCHANGE_NAME>', '<ROUTING_KEY>', '<MESSAGE_BUFFER>', { timeout: 10000 }))
    .then((res) => {
        console.log(res);
        return rabbitInst.gracefullyDisconnect();
    })
    .catch((err) => {
        // error logic
    });

Running Tests

Unit

npm run test

Integration

npm run test:integraion

Integration tests require an RMQ instance. There is a docker-compose.yaml that can lauch one. Alternativelly, any other instance may be used, but the needed configuration should be provided

For minikube users: PXLT_RMQ_HOST=$(minikube ip) npm run test:integration