@anabranch/queue
v0.3.6
Published
Message queue with Task/Stream semantics. In-memory adapter with delayed messages, dead letter queues, and visibility timeout.
Readme
@anabranch/queue
Queue primitives with Task/Stream semantics for error-tolerant message processing.
Description
A queue abstraction that integrates with anabranch's Task and Stream types for composable error handling, concurrent processing, and automatic resource management.
Features
- Task/Stream Integration: Leverage Task's retry/timeout and Stream's error collection
- Multiple Adapters: In-memory implementation included, Redis/RabbitMQ/SQS coming soon
- Delayed Messages: Support for scheduled/delayed message delivery
- Dead Letter Queues: Automatic routing of failed messages after max attempts
- Batch Operations: Send multiple messages, acknowledge multiple at once
Installation
# JSR
jsr add @anabranch/queue
# Deno
deno add @anabranch/queueQuick Start
import { createInMemory, Queue } from '@anabranch/queue'
const connector = createInMemory()
const queue = await Queue.connect(connector).run()
// Send a message
const id = await queue
.send('notifications', { type: 'welcome', userId: 123 })
.run()
// Process messages with error collection
const { successes, errors } = await queue
.stream('notifications', { concurrency: 5 })
.map(async (msg) => await sendNotification(msg.data))
.tapErr((err) => logError(err))
.collect()
.then((results) => {
const successes: typeof results = []
const errors: typeof results = []
for (const r of results) {
if (r.type === 'success') successes.push(r)
else errors.push(r)
}
return { successes, errors }
})API
Queue.send
Send a message to a queue with optional delay:
await queue.send('my-queue', { key: 'value' }, { delayMs: 30_000 }).run()Queue.stream
Stream messages with concurrent processing:
const { successes, errors } = await queue
.stream('orders', { count: 10, concurrency: 10 })
.map(async (msg) => await processOrder(msg.data))
.partition()Queue.ack / Queue.nack
Acknowledge successful processing or negative acknowledge with requeue:
await queue.nack('orders', msg.id, { requeue: true, delay: 5_000 }).run()
// Or route to dead letter queue
await queue.nack('orders', msg.id, { deadLetter: true }).run()Queue.sendBatch
Send multiple messages efficiently:
const ids = await queue
.sendBatch('notifications', [
{ to: '[email protected]' },
{ to: '[email protected]' },
])
.run()Configuration
In-Memory Queue Options
const connector = createInMemory({
maxBufferSize: 1000,
queues: {
orders: {
maxAttempts: 3,
deadLetterQueue: 'orders-failed',
},
},
})Error Handling
All errors are typed for catchable handling:
QueueConnectionFailed- Connection establishment failedQueueSendFailed- Send operation failedQueueReceiveFailed- Receive operation failedQueueAckFailed- Acknowledgment failed
try {
await queue.send('my-queue', data).run()
} catch (error) {
if (error instanceof QueueSendFailed) {
console.error('Failed to send:', error.message)
}
}Related
- @anabranch/anabranch - Core Task/Stream primitives
- @anabranch/db - Database adapter pattern (inspiration for this package)
