npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2025 – Pkg Stats / Ryan Hefner

iamqp

v2.2.1

Published

RabbitMQ wrapper for NodeJS with several produce(reply)/consume scenarios

Downloads

35

Readme

iamqp.js

Known Vulnerabilities Build

A simple AMQP (Advanced Message Queuing Protocol) wrapper written based on amqplib. Has several AMQP flavors (wrappers):

  • basic producer with the class PlainPublisher
  • basic consumer with the class PlainConsumer
  • fanout exchange producer with the class FanoutProducer
  • fanout exchange consumer with the class FanoutConsumer
  • RPC client with the class RPCClient
  • RPC server with the class RPCServer

Example usage of these is (fast) communication between:

  • processes
  • instances in distributed systems
  • web, worker communication

Each flavour has two sides; producer and a consumer. Producer (worker) publishes/distributes data/information to consumers. Special flavour are RPC instances where a client/server relation exists.

Prerequisites

A guide to installing RabbitMQ and some introduction:

Some AMQP service providers:

WoW

Install with:

npm install iamqp --save
yarn add iamqp

Common

All instances have:

  • openConnection() as to establish the connection to the RabbitMQ instance
  • closeConnection() as to close the established connection
  • isReady() as to get the info if the connection is ready
  • getEventer() as to get an instance of a Event Emitter that the code uses to signal messages and information.

Several things can be listened to on the eventer that are common to all instances:

  • error - you listen on this one when an error occurs - if you don't, the error bubbles and crashes your execution:
producer.getEventer().on('error', (err) => {
  console.log('ERROR ' + err.message);
});
  • connected - you can listen for this one as to know when the instance connects to the server:
producer.getEventer().on('connected', () => {
  console.log('... connected');
});
  • closed - you can listen for this one as to know if the connection closes:
producer.getEventer().on('closed', () => {
  console.log('... closed');
});

Individual instances then have their own events for data manipulation. However, take care, as it might happen that you set to many listeners on a given event which can be a leak.

The connection needs to be established via the mentioned openConnection() method but there is an automatic reconnect trigger if the connection fails. This trigger is however not triggered if you call the mentioned closeConnection() method.

Init/configuration

All instances require at least two parameters on initialization:

  • URI of the AMQP
  • name of the channel

The same URI and channel name is to be set on both instances that are communicating:

  • PlainConsumer <-> PlainProducer
  • FanoutConsumer <-> FanoutProducer
  • RPCClient <-> RPCServer
let instance = new iamqp['...'](amqpUri, channelName);

All instances accept an optional third argument which configures several things:

  • printMessages - if true, the messages will be printed when the logging is enabled. Defaults to true.
  • maxMessageLengthToPrint - f the messages are to printed, this is the max number of bytes that a message can be to be printed. Defaults to 257.
  • queueArguments - is to configure pairs:
    • PlainConsumer <-> PlainProducer
    • RPCClient <-> RPCServer
  • connectionOptions
QueueArguments

queueArguments are related to the configuration of the Queue (over which they are communicating):

  • messageTtl - 0 <= n < 2^32 expires messages arriving in the queue after n milliseconds
  • expires - 0 < n < 2^32 the queue will be destroyed after n milliseconds of disuse, where use means having consumers, being declared
  • maxLength - sets a maximum number of messages the queue will hold

Default options for all queueArguments are to be considered to be infinity.

When setting any of queueArguments on one instance the same value is to be set on the sister instance. This is so because they both try to assert an Queue on the same channel and if one has already asserted the Queue, the other will fail if it provides different options.

Note on this

In general, if your Queues are persistent, before changing any of the options related to them, you would need to manually delete to Queue in question as to allow the code to re-assert it.

ConnectionOptions

Configuration of the connection:

  • heartbeat - period of the connection heartbeat, in seconds. Defaults to 60.
Configuration options
const iamqp = require('iamqp');
const instance = new iamqp['...']('...', '...', {
  printMessages: true,
  maxMessageLengthToPrint: 512,
  queueArguments: {
    maxLength: 100,     // max number of queue entries
    messageTtl: 10000,  // message TTL
    expires: 60000      // remove queue after dis-use
  },
  connectionOptions: {
    heartbeat: 60
  }
});

Plain

Plain producer/consumer pair is when we need one or multiple producers (on the same channel) publishing to one consumer. Example could be multiple web instances that periodically publish some data to a central aggregator.

For detailed documentation check the classes:

  • PlainConsumer
  • PlainProducer

Plain Consumer

const iamqp              = require('iamqp');
const amqpUri            = 'amqp://localhost';
const channel            = 'amqp-channel-b';
let plainConsumer        = new iamqp.PlainConsumer(amqpUri, channel);

// This needs to be done or the errors will bubble up.
plainConsumer.getEventer().on('error', (err) => {
    console.log('consumer error:' + err.message);
});

// For when the connection is established.
plainConsumer.getEventer().on('connected', () => {
    console.log('consumer connected');
});

// Get messages.
plainConsumer.getEventer().on('message', (message) => {
    console.log('consumer message:' + message);
});

// Open the connection - this takes some time and is not sync
plainConsumer.openConnection();

// ...

// Close the connection
if (plainConsumer.closeConnection()) {
    console.log('consumer connection closing ...');
}

Plain Producer

const iamqp              = require('iamqp');
const amqpUri            = 'amqp://localhost';
const channel            = 'amqp-channel-b';
let plainProducer        = new iamqp.PlainProducer(amqpUri, channel);

// This needs to be done or the errors will bubble up.
plainProducer.getEventer().on('error', (err) => {
    console.log('producer error:' + err.message);
});

// For when the connection is established.
plainProducer.getEventer().on('connected', () => {
    console.log('producer connected');
});

// Open the connection - this takes some time and is not sync
plainProducer.openConnection();

// Publish a single message.
let isPublishing = plainProducer.publish({
    'a': 3
});

// Close the connection
if (plainProducer.closeConnection()) {
    console.log('producer connection closing ...');
}

Fanout

Fanout is when each producer (there can be many on the same channel) publishes a message that gets "faned-out" to all the fanout consumer on the same channel. Every consumer on the channel gets a copy of the message.

Multiple producers thus publish their messages to multiple consumers. Example could be a worker instance (or many) that publishes information to many web instances.

For detailed documentation check the classes:

  • FanoutConsumer
  • FanoutProducer

Fanout Consumer

const iamqp              = require('iamqp');
const amqpUri            = 'amqp://localhost';
const fanoutChannel      = 'fanout-channel-a';
let fanoutConsumer       = new iamqp.FanoutConsumer(amqpUri, fanoutChannel);

// This needs to be done or the errors will bubble up.
fanoutConsumer.getEventer().on('error', (err) => {
    console.log('fanout consumer error:' + err.message);
});

// For when the connection is established.
fanoutConsumer.getEventer().on('connected', () => {
    console.log('fanout consumer connected');
});

// Get messages as fanout by the producer.
fanoutConsumer.getEventer().on('message', (message) => {
    console.log('fanout consumer message:' + message);
});

// Open the connection - this takes some time and is not sync
fanoutConsumer.openConnection();

// ...

// Close the connection
if (fanoutConsumer.closeConnection()) {
    console.log('fanout consumer connection closing ...');
}

Fanout Producer

const iamqp              = require('iamqp');
const amqpUri            = 'amqp://localhost';
const fanoutChannel      = 'fanout-channel-a';
let fanoutProducer       = new iamqp.FanoutProducer(amqpUri, fanoutChannel);

// This needs to be done or the errors will bubble up.
fanoutProducer.getEventer().on('error', (err) => {
    console.log('fanout producer error:' + err.message);
});

// For when the connection is established.
fanoutProducer.getEventer().on('connected', () => {
    console.log('fanout producer connected');
});

// Open the connection - this takes some time and is not sync
fanoutProducer.openConnection();

// ...

// Fanout a single message.
let isFanout = fanoutProducer.fanout({
    'a': 3
});

// ...

// Close the connection
if (fanoutProducer.closeConnection()) {
    console.log('fanout producer connection closing ...');
}

RPC

RemoteProcedureCall is when communication of type "ask -> reply" happens. The RPC client calls the RPC server which (after some time) replies to the message. Each call (on each client) produces an unique ID string called correlation ID that the user also gets on a reply as to be able to distinguish what reply is for which question.

Multiple RPC client can thus perform multiple calls to the server. An example of this usage is if the server queries a DB on behalf of the clients. One must take care as to not overload the server instance.

An example; create an RPC client:

const iamqp              = require('iamqp');
const amqpUri            = 'amqp://localhost';
const rpcChannel         = 'rpc-channel-a';
let rpcClient            = new iamqp.RPCClient(amqpUri, rpcChannel);

... setup listeners:

rpcClient.getEventer().on('error', (err) => {
    console.log('RPC client error:' + err.message);
});

rpcClient.getEventer().on('connected', () => {
    console.log('RPC client connected');
});

rpcClient.getEventer().on('reply', (reply, correlationId) => {
    console.log('RPC client got a reply (' + correlationId + '): ' + reply);
});

rpcClient.openConnection();

Each reply gives the correlation ID - the same one that we get when making a call. Each call produces it's own unique correlation ID.

let correlationIdA = rpcClient.callRemote({'a': 5});

On the RPC server side we then have:

const iamqp              = require('iamqp');
const amqpUri            = 'amqp://localhost';
const rpcChannel         = 'rpc-channel-a';
let rpcServer            = new iamqp.RPCServer(amqpUri, rpcChannel);

... setup listeners:

rpcServer.getEventer().on('error', (err) => {
    console.log('RPC server error:' + err.message);
});

rpcServer.getEventer().on('connected', () => {
    console.log('RPC server connected');
});

rpcServer.getEventer().on('call', (callData, pCorrelationId) => {
    console.log('RPC server call (' + pCorrelationId + '): ' + callData);
    callData.a += 1;
    let isServerResponding = rpcServer.respond(callData, pCorrelationId);
});

rpcServer.openConnection();

For detailed documentation check the classes:

  • RPCClient
  • RPCServer

Logging

The module has the capability to log to:

  • the terminal
  • the Linux system log

Terminal

The terminal logging is to be enabled with a environmental variable:

DBTRC_TERM_F=OK

Syslog

The Linux system log logging is to be enabled with a environmental variable:

DBTRC_SYSLOG_F=OK

To enable this, open /etc/rsyslog.conf and enable UDP on port 514. This is usually just commented and needs uncommenting:

# provides UDP syslog reception
module(load="imudp")
input(type="imudp" port="514")

... and restart the service with ./etc/init.d/rsyslog restart. The logs should appear in /var/log/syslog. This was tested on Ubuntu 16.04.

Documentation

JSDoc generated documentation is in artifacts/docs/index.html.

Source

The repository is located on bitbucket.

Test

Snyk test badge included; report available on snyk

Integration tests:

yarn run test
DBTRC_TERM_F=OK yarn run test

Lint testing via:

yarn run test:lint

License

The MIT License (MIT) (look for the LICENSE file in the root of the module).