coniglio
v1.1.1
Published
A minimal, elegant, and robust async RabbitMQ client for Node.js
Maintainers
Readme
🐇 coniglio
A minimal, elegant, and robust async RabbitMQ client for Node.js
coniglio (Italian for “rabbit”) is a modern wrapper around RabbitMQ designed to be dead-simple to use, resilient in production, and fully composable with async iterators and streaming libraries. Inspired by libraries like postgres, it gives you just the right abstraction for real-world systems without hiding the power of AMQP.
🚀 Features
- ✅
for await…ofstreaming API — process messages naturally with backpressure - ✅ Auto reconnect — connection and channel recovery handled transparently with exponential backoff + jitter
- ✅ JSON decoding by default — or opt-out for raw Buffer access
- ✅ Manual
ack()/nack()— total control over message flow - ✅ Composable — works beautifully with
p-map,exstream.js,highland, standard Node streams and more - ✅ Multiple isolated connections — supports multi-tenant, multi-env, and dynamic routing
- ✅ TypeScript support — full type inference over event data
- ✅ Zero dependencies (core version) — just
amqplibunder the hood
📦 Installation
npm install coniglio✨ Usage
import coniglio from 'coniglio'
const conn = await coniglio('amqp://localhost')
// Configure queues and exchanges
// (optional, useful for bootstrapping or tests)
await conn.configure({
queues: [
{ name: 'jobs.email', durable: true }
],
exchanges: [
{ name: 'domain.events', type: 'topic', durable: true }
]
})
// Consume messages from a queue
for await (const msg of conn.listen('jobs.email')) {
await sendEmail(msg.data)
conn.ack(msg)
}
// Publish messages to an exchange
setInterval(() => {
conn.publish('domain.events', 'jobs.email', { message: 'hello world' })
}, 200)📦 Table of contents
- Why Coniglio?
- API
- Philosophy
- Resilience
- Multiple connections
- TypeScript Integration
- Usage in Production Systems
- Coming Soon
- License
🐇 Why Coniglio?
If you’ve used amqplib, you know it’s the canonical RabbitMQ library for Node.js — powerful, stable, and low-level. But it leaves you wiring:
- reconnect logic
- message decoding
- ack/nack control
- error-safe consumption
- backpressure handling
- channel separation for pub/sub
Coniglio wraps amqplib with a modern, minimal layer built for real apps.
You get the same underlying power, but with an API that feels natural and production-ready.
✅ A quick comparison
| Feature | amqplib | coniglio ✅ |
| ------------------------------ | ------------------- | --------------------- |
| Promise API | ⚠️ Basic (thenable) | ✅ Fully async/await |
| Manual reconnects | ❌ You handle it | ✅ Built-in |
| Message streaming | ❌ No | ✅ for await...of |
| Built-in JSON decoding | ❌ Raw Buffer | ✅ On by default |
| Safe manual ack() / nack() | ✅ Yes | ✅ Ergonomic handling |
| Channel separation (pub/sub) | ❌ Manual | ✅ Automatic |
| Backpressure-friendly | ❌ Needs plumbing | ✅ Native support |
| TypeScript types | ⚠️ Community | ✅ First-class |
🐇 Coniglio is Italian for “rabbit” — simple, fast, and alert.
for await (const msg of coniglio.listen('my-queue')) {
try {
await handle(msg.body)
msg.ack()
} catch (err) {
msg.nack()
}
}📖 API
const conn = await coniglio(url, opts?)
Creates a new connection instance.
const conn = await coniglio('amqp://localhost', {
logger: console, // optional custom logger
json: true, // default: true (parse JSON messages)
prefetch: 10, // default: 10 (number of unacknowledged messages)
})If you want to use a custom logger, it should implement the Logger interface:
export interface Logger {
debug(...args: any[]): void
info(...args: any[]): void
warn(...args: any[]): void
error(...args: any[]): void
}Example with pino:
import pino from 'pino'
const log = pino({ level: 'debug' })
const conn = await coniglio('amqp://localhost', { logger: log })conn.listen(queue, opts?)
Returns an async iterator of messages consumed from a queue.
interface ListenOptions {
json?: boolean // default: true (parse JSON)
prefetch?: number // default: 10
routingKeys?: (keyof T)[] // optional filter, TypeScript-safe
}Each yielded msg: Message<T> has:
interface Message<T> {
raw: amqp.ConsumeMessage // original AMQP message
content: Buffer // raw message body
data?: T // parsed JSON if json = true
contentIsJson: boolean // true if JSON parsed successfully
routingKey: string // the routing key
}JSON-enabled example
for await (const msg of conn.listen('my-queue', { prefetch: 100 })) {
// msg.data is parsed JSON
if (msg.contentIsJson) {
processData(msg.data)
} else {
// handle parsing error
console.warn('Failed to parse JSON:', msg.content.toString())
}
conn.ack(msg)
}JSON-opt-out example
for await (const msg of conn.listen('my-queue', { json: false })) {
// msg.data === undefined
// use msg.content (Buffer) directly
processRaw(msg.content)
conn.ack(msg)
}conn.ack(msg) / conn.nack(msg, requeue = false)
Manually acknowledge or reject a message.
conn.ack(msg)
conn.nack(msg, true) // requeue = trueconn.publish(exchange, routingKey, payload, opts?)
Publish a message. Payload is serialized with JSON.stringify under the hood.
await conn.publish('domain.events', 'user.created', { userId: '123' }, { priority: 5 })- Retry: on failure, retries indefinitely with exponential backoff + jitter (1s → 30s).
- Confirm channel: ensures broker receipt before resolving.
conn.configure({ queues, exchanges })
Declare queues and exchanges explicitly. Useful for bootstrapping, tests, and dynamic setups.
await conn.configure({
exchanges: [
{ name: 'domain.events', type: 'topic', durable: true }
],
queues: [
{
name: 'jobs.email',
durable: true,
bindTo: [{ exchange: 'domain.events', routingKey: 'jobs.email' }]
}
]
})interface ConfigureOptions {
exchanges?: {
name: string
type: 'topic' | 'fanout' | 'direct' | 'headers'
durable?: boolean
autoDelete?: boolean
internal?: boolean
arguments?: Record<string, any>
}[]
queues?: {
name: string
durable?: boolean
exclusive?: boolean
autoDelete?: boolean
deadLetterExchange?: string
messageTtl?: number
maxLength?: number
arguments?: Record<string, any>
bindTo?: {
exchange: string
routingKey: string
arguments?: Record<string, any>
}[]
}[]
}await conn.close()
Gracefully close all channels and the underlying connection. Use this during application shutdown.
🧠 Philosophy
coniglio doesn’t manage your concurrency. It just delivers messages as an async generator. Use your favorite tool:
import pMap from 'p-map'
await pMap(
conn.listen('jobs.video'),
async msg => {
await transcode(msg.data)
conn.ack(msg)
},
{ concurrency: 5 }
)Or go reactive:
import { pipeline } from 'exstream'
await pipeline(
conn.listen('metrics'),
s => s.map(msg => parse(msg.data)),
s => s.forEach(logMetric)
)💪 Resilience by design
- Detects and recovers from connection or channel failures automatically
- Messages aren't lost or stuck unacknowledged
- Keeps the developer in control — no magic retries or swallowing errors
✅ Multiple connections
const prod = coniglio('amqp://prod-host')
const qa = coniglio('amqp://qa-host')
await prod.publish('events', 'prod.ready', { ok: true })
await qa.publish('events', 'qa.ready', { ok: true })🧩 TypeScript Integration
If you are using TypeScript and want full type safety over your messages, you can pass a generic type map to coniglio():
type RouteKeyMap = {
'user.created': { userId: string }
'invoice.sent': { invoiceId: string; total: number }
}
const r = await coniglio<RouteKeyMap>('amqp://localhost')
// Consume all events from a queue
for await (const msg of conn.listen('queue.main')) {
switch (msg.event) {
case 'user.created': {
msg.data.userId // ✅ typed as string
break
}
case 'invoice.sent': {
msg.data.invoiceId // ✅ typed as string
msg.data.total // ✅ typed as number
break
}
}
}
// Or filter statically by known events (with narrow typing)
for await (const msg of conn.listen('queue.main', { routeKeys: ['user.created'] })) {
msg.data.userId // ✅ typed as string
}🏗️ Usage in Production Systems
Coniglio is designed to thrive in real-world setups with high message throughput and resilience requirements.
✔️ Backpressure support
Using native for await...of, you get natural backpressure without buffering hell.
✔️ Controlled flow Acknowledge or retry only when you're ready — no leaking messages or auto-ack surprises.
✔️ Crash-safe reconnect
If RabbitMQ restarts, coniglio handles reconnection, re-initialization, and queue rebinds with zero config.
✔️ Works in microservices & monoliths Use it in a Fastify server, a background worker, or a Kubernetes job — it's just an async iterator.
✔️ Easy to observe and test You own the consumer loop. Add metrics, tracing, or mocks wherever you need — no magic, no black boxes.
🪄 Coming soon (planned)
- [ ] test suite with real RabbitMQ integration
- [ ]
conn.stream()for fullReadableStreaminterop (withAbortSignal) - [ ] Built-in metrics and logging hooks
- [ ] Retry helpers (e.g. DLQ support, customizable backoff)
📘 License
MIT — as simple and open as the API itself.
