amqp-petite-connector
v1.0.4
Published
``` npm install --save amqp-petite-connector ```
Downloads
7
Readme
Installation
npm install --save amqp-petite-connector
What it is
A small, petite and simple way to connect to an amqp instance. This package relies on amqplib for the real stuff. It is just a clean up of the connection api for the generic simple use cases.
Amqp petite connector will automatically set up your queues with messages to be ack:ed, or nack:ed if (and only if!) DLX (dead letter exchange) is enabled.
It's expected your consuming apps are idiomatic and can handle the same message over and over. Exchanges are considered events and queues commands.
It is all very opinionated.
Requirements
Node version supporting async/await.
How
The connect function takes three arguments; exchanges, queues and an connectionString. Connection string can also be set via environment variable RABBIT_URL
.
Note that in order for a queue to be created, the exchange it's binding to has to exist before.
.connect(queues, exchanges, [connectionString])
Connecting
Below example assumes connectionString is set via above mentioned environment variable.
const petiteConnector = require("amqp-petite-connector")
const exchanges = []
const queues = []
petiteConnector.connect(queues, exchanges)
Exchanges contract
An array containing the following object(s).
{
name: string,
type: string, // see amqplib for details.
options: {...} // see amqplib for details.
}
Queue definitions
An array containing the following object(s).
{
name: string,
binding: string, // Name of the exchange to bind queue to.
callback: async function (msg[, channel]), // Function to process message, expected to be async. Channel is provided as a parameter if needed.
options: {...}, // Optional. See amqplib for details. Will set durable=true if not provided.
dlx: { // Optional. See below for details.
messageTtl: int // Optional. Milliseconds until republished, default 300000 (5mins).
deadQueue: bool // Optional. Defaults to true if not set when dlx is present
}
}
Dead lettering (DLX)
DLX is supported in one specific way. If you don't like it, just catch your exceptions and handle it in a manner you like.
In this case a dead letter exchange will be set up with ${queue.name}_dlx
to show the origins of the dead letters. It will have corresponding queue setup with ${queue.name}_retry
. This one will have a message ttl attached, meaning messages will reside in the queue until ttl expires. The retry queue will after ttl republish the message to the originating exchange.
Dead lettering will happen if an exception occurs which is not handled. The message will then be nack:ed with allUpTo=false and requeue=false, which means the message will be considered a dead letter. Since RabbitMQ does it's best to keep message order it is highly likely your message will be republished to the front of your queues for immidiate consumption upon ttl expiration from the retry queue.
There is also a "dead" queue, which is enabled by default if you have DLX enabled. Dead is when a message have been retried 5 times without any luck. You should monitor the length of this queue. Disable it by specifying deadQueue=false under dlx. See examples below.
Example
const petite = require("amqp-petite-connector")
const exchanges = [{
name: "my-exchange2",
type: "fanout",
options: {
durable: true,
autoDelete: false,
}
}]
const queues = [{
name: "my-first-queue",
binding: "my-exchange-number-1",
callback: async function (msg) => {
const content = JSON.parse(msg.content.toString())
// do stuff
// if exception is thrown here, message will be ack:ed and silently dropped
// no dlx and no dead queue
}
}]
petite.connect(queues, exchanges)
Example with dead lettering
const petite = require("amqp-petite-connector")
const exchanges = [{
name: "my-exchange2",
type: "fanout",
options: {
durable: true,
autoDelete: false,
}
}]
const queues = [{
name: "my-first-queue",
binding: "my-exchange-number-1",
dlx: {
messageTtl: 30000 // 30secs until going back to binding exchange specified above
},
callback: async function (msg) => {
const content = JSON.parse(msg.content.toString())
// do stuff
// throw here to see dead lettering in action
// dead queue will be present here, since not explicitly set to false in dlx
petite.publish("my-exchange2", { monkey1: "apa", monkey2: "babian", monkey3: "chimpans" })
}
}]
petite.connect(queues, exchanges)