npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2024 – Pkg Stats / Ryan Hefner

rab-q

v2.1.0

Published

A tiny (opinionated) wrapper over amqplib for RabbitMQ publish/subscribe pattern

Downloads

119

Readme

Table of Contents

Features

  • minimal dependencies: only amqplib and uuid are needed
  • promises-based: async/await support makes it easy to use
  • small api: 4 methods only, there's not much to learn :wink:
  • instantiation-able: the library itself is a Class ; get as many independent instance as needed
  • events logging: choose your own log transport
  • disconnections-proof: auto reconnect/resend when errors happen

Our use case at @RadioFrance

This wrapper was tailored made for our microservices architecture.

We use RabbitMQ as the bus in the publish-subscribe pattern. A message is never sent directly to a queue, it is always sent to a configured exchange.

Every message should be acknowledged by returning a value : ACK, NACK, REJECT (message.ACK, message.NACK or message.REJECT in the message object).

A non-acknowledged message will be redelivered one time to the exchange. A rejected message will be directly deleted or redirected to the dead letter exchange if configured.

Our queues and exchanges are defined by a external engine. rab-q doesn't provide API to create, assert or delete these.

Installation

$ npm install rab-q

Usage

const RabQ = require('rab-q');

const rabQ = new RabQ({
  exchange: 'your_topic_exchange',
  queues: 'queue_bind_to_exchange'
});

rabQ.on('error', err => {
  console.error(err);
});
rabQ.on('log', log => {
  console[log.level](log.msg, log.err || '');
});

rabQ.subscribesTo(/.*/, message => {
  // Do stuff!

  // Then acknowledge message by returning a constant
  return message.ACK;
});

rabQ.start()
  .then(() => {
    rabQ.publish('yourRoutingKeyHere', {your: 'message'});
  });

API

new RabQ(options)

options

username

Type: string Default: guest

User to connect on RabbitMQ.

password

Type: string Default: guest

Password associate to username.

protocol

Type: amqp|amqps Default: amqp

Connection protocol.

hostname

Type: string Default: localhost

Hostname to trying to get connection.

port

Type: number Default: 5672

Port of connection.

socketOptions

Type: object Default: null

socketOptions for amqplib

https://amqp-node.github.io/amqplib/channel_api.html#connect

vhost

Type: string Default: /

Selected virtual host.

exchange

Required Type: string

Exchange to publish messages.

queues

Type: Array<string> string

Consumes messages from these queues.

If this option is omitted or has a falsey value, server will create random names.

maxMessages

Type: number Default: 10

Defines the number of messages prefetched by channel. Once the max number of messages is reached, RabbitMQ will wait for some messages to be acknowledged before proceeding with new messages.

nackDelay

Type: number Default: 0

Defines a delay in milliseconds before a message is rejected with NACK.

reconnectInterval

Type: number (milliseconds) Default: 0

Time in milliseconds before trying to reconnect when connection is lost.

autoAck

Type: boolean Default: false

Enables auto acknowledgment.

autoReconnect

Type: boolean Default: true

Enables auto reconnection if an error happens while connecting to the server.

validators

Type: Object of function

validators.consumer

Type: boolean Default: return true

Function run before each message treatment. If it return a false value, the message is reject.

beforeHook

Type: function Default: () => {} Parameters: message (see below)

Function run before each message treatment, can modify message.

afterHook

Type: function Default: () => {} Parameters: message (see below), subscriberResult (ACK/NACK/REJECT)

Function run after each message treatment.

prePublish

Type: function Default: null Parameters: routingKey, content, properties (see publish method) Must return an object with {routingKey, content, properties}

Function run before each publish call

rabQ.start()

Starts a connection.

Returns a promise resolved with true for a successful connection or false if a connection already exists.

rabQ.stop()

Stops and closes a connection.

rabQ.publish(routingKey, content, properties)

Publishes a message on exchange rabQ.exchange.

routingKey

Required Type: string

A regular expression to match the routing keys of consumed messages.

content

Required Type: Object

A message object to send to the exchange.

properties

Type: Object Default: {}

Adds RabbitMQ properties to the message. See http://www.squaremobius.net/amqp.node/channel_api.html#channel_publish (options)

If properties does not have a headers key, properties is considered to be headers.

You can provide a property x-query-token in headers to trace the lifecyle of a request. If not provided a new UUID will be generated.

rabQ.subscribesTo(patternMatch, action)

Adds a Subscriber on consumed messages filtered by the routing key

patternMatch

Required Type: RegExp

A regular expression to match on the routing keys of consumed messages.

action(message)

Required Type: Function

A function should acknowledge (or not) messages by returning either a value or a resolved promise with value. ('ACK', 'NACK' or 'REJECT')

message is a object with following properties:

  • ACK string: the constant returned for a positive acknowledgment
  • NACK string: the constant returned for a negative acknowledgment. NACK will re-queuing message one time.
  • REJECT string: the constant returned for a rejection without redelivery.
  • content Object: content of the received message
  • rk string: routing key of the message
  • queue string: queue name where the message is consumed
  • token string: a UUID to identify the message
  • consumeAt number: timestamp when the message is consumed

Events

'log'

Emits a log object with the following properties:

  • level string: can be info, warn, error
  • token string: the UUID to identify the message
  • msg string: the message,
  • err Object: the original error if log level is error

'error'

Emits an Error if something goes wrong with the connection. If you don't catch this event the process will sto

See Also

License

CECILL-B