@anabranch/queue-rabbitmq
v0.2.12
Published
RabbitMQ adapter for @anabranch/queue using amqplib. Supports all queue features with RabbitMQ queues for persistent messaging.
Downloads
1,716
Readme
@anabranch/queue-rabbitmq
RabbitMQ adapter for @anabranch/queue using amqplib.
Usage
import { Queue } from '@anabranch/queue'
import { createRabbitMQ } from '@anabranch/queue-rabbitmq'
const connector = createRabbitMQ('amqp://localhost:5672')
const queue = await Queue.connect(connector).run()
await queue.send('notifications', { userId: 123, type: 'welcome' })
const { successes, errors } = await queue
.stream('notifications')
.withConcurrency(5)
.map(async (msg) => await sendNotification(msg.data))
.partition()
await queue.close().run()API
createRabbitMQ(options)
Creates a RabbitMQ queue connector.
import { createRabbitMQ } from '@anabranch/queue-rabbitmq'
const connector = createRabbitMQ({
connection: 'amqp://localhost:5672',
prefix: 'myapp',
queues: {
orders: {
maxAttempts: 5,
deadLetterQueue: 'orders-dlq',
},
},
defaultPrefetch: 10,
})Options:
connection- RabbitMQ URL or amqplib connection optionsprefix- Key prefix for queue names (default: "abq")queues- Per-queue configurationdefaultPrefetch- Default prefetch count (default: 10)
Message Headers
Headers can be attached to messages for routing and correlation:
await queue.send('orders', order, {
headers: {
'x-correlation-id': 'abc-123',
'x-source': 'checkout-service',
},
}).run()Headers are surfaced in metadata.headers on received messages.
Delayed Messages
Note: Delayed messages require the
rabbitmq-delayed-message-exchange plugin.
Without it, specifying delayMs will throw an error.
await queue.send('notifications', reminder, { delayMs: 30_000 }).run()Dead Letter Queue
const connector = createRabbitMQ({
connection: 'amqp://localhost:5672',
queues: {
orders: {
maxAttempts: 3,
deadLetterQueue: 'orders-dlq',
},
},
})When a message exceeds max delivery attempts, it is routed to the dead letter queue with metadata about the original message.
Attempt Counting
Attempt counts are tracked in the message envelope and incremented when a
message is nacked with requeue: true. This works with all RabbitMQ versions
and queue types (classic or quorum).
Requirements
- RabbitMQ 3.8+ (for x-delivery-count support)
- For delayed messages: rabbitmq-delayed-message-exchange plugin
Environment Variables
RABBITMQ_URL- Default connection URL when no options provided (default: amqp://localhost:5672)
