@edular/queue
v3.0.0
Published

Readme
@edular/queue
Node version required:
>=18.0.6
Installation
yarn add @edular/queueConnect
import { queue, QueueList, EventType } from "@edular/queue"
await queue.connect({
microserviceName: "tasks-api",
hostname: "localhost",
port: 5672,
username: "guest",
password: "guest",
queues: [
{ name: QueueList.MicroserviceTasks },
{ name: QueueList.UpdateUserInTasks, enableDeadLetter: true, maxRetries: 5 },
{ name: QueueList.TasksApproved, enableDeadLetter: true, maxRetries: 20 },
{ name: QueueList.PostTaskApproval },
],
})QueueOptions
| Option | Type | Default | Description |
|--------------------|-------------------|-------------|--------------------------------------------------------------|
| microserviceName | string | required | Name of the microservice, included in dead letter payloads |
| queues | QueueConfig[] | required | List of queues to configure |
| hostname | string | localhost | RabbitMQ host |
| port | number | 5672 | RabbitMQ port |
| username | string | guest | RabbitMQ username |
| password | string | guest | RabbitMQ password |
| maxRetries | number | 10 | Default max retries before dead lettering (per-queue overrides this) |
| maxReconnectDelay| number | 900 | Max delay in seconds between reconnection attempts to RabbitMQ |
| prefetch | number | 1 | Default prefetch count for all consumers |
QueueConfig
| Option | Type | Default | Description |
|--------------------|------------|-----------|------------------------------------------------------|
| name | QueueList| required | Queue name |
| enableDeadLetter | boolean | false | Whether to send to dead letter after max retries |
| maxRetries | number | inherited | Max retries before dead lettering (overrides global) |
Send a message
queue.send({
queue: QueueList.MicroserviceTasks,
message: JSON.stringify({ type: "CREATE_TASK", payload: { ... } }),
})Send to multiple queues
queue.sendMultiple(
[QueueList.Email, QueueList.Sms],
JSON.stringify({ type: "NOTIFY", payload: { ... } })
)Consume messages
The callback must return true on success, false or an Error on failure.
queue.on(EventType.Connected, () => {
queue.consume(QueueList.MicroserviceTasks, async (msg) => {
try {
const { type, payload } = JSON.parse(msg.message)
await handleMessage(type, payload)
return true
} catch (e) {
logger.error(e)
return e // real error is forwarded to dead letter if enabled
}
})
})Retry behavior
When the callback returns false or an Error, the message is re-queued with exponential backoff:
| Attempt | Delay | |---------|--------------| | 1 | ~2s | | 2 | ~4s | | 3 | ~8s | | 4 | ~16s | | 5+ | ~30s (cap) |
A random jitter of up to 1s is added to each delay to avoid thundering herd.
If enableDeadLetter: true is set for the queue and maxRetries is exceeded, the message is sent to the dead_letter queue instead of being retried.
Dead Letter Queue
When a message exceeds maxRetries, it is sent to QueueList.DeadLetter with the following payload:
{
"microserviceName": "tasks-api",
"originalQueue": "microservice_tasks",
"originalMessageId": "uuid",
"payload": "{...original message...}",
"retryCount": 10,
"failedAt": "2026-04-10T15:30:00.000Z",
"error": {
"message": "Something went wrong",
"stack": "Error: Something went wrong\n at ..."
},
"originalHeaders": {}
}To consume dead letter messages:
queue.consume(QueueList.DeadLetter, async (msg) => {
const data = JSON.parse(msg.message)
logger.error(`Dead letter from ${data.microserviceName}/${data.originalQueue}`, data)
return true
})Other methods
pause(queue) / resume(queue)
Temporarily stop and restart a consumer without losing its configuration.
await queue.pause(QueueList.MicroserviceTasks)
await queue.resume(QueueList.MicroserviceTasks)setPrefetch(queue, count)
Dynamically change the prefetch count for a queue. Restarts the consumer if active.
await queue.setPrefetch(QueueList.MicroserviceTasks, 5)getQueueStats(queue)
Get message counts for a queue.
const stats = await queue.getQueueStats(QueueList.MicroserviceTasks)
// { ready: 10, unacked: 2, total: 12 }disconnect()
Gracefully close the connection.
await queue.disconnect()How to publish a new version
- Commit your changes
yarn version major|minor|patch -m "bump %s"npm publish
