npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2025 – Pkg Stats / Ryan Hefner

coniglio

v1.1.1

Published

A minimal, elegant, and robust async RabbitMQ client for Node.js

Readme

🐇 coniglio

A minimal, elegant, and robust async RabbitMQ client for Node.js

coniglio (Italian for “rabbit”) is a modern wrapper around RabbitMQ designed to be dead-simple to use, resilient in production, and fully composable with async iterators and streaming libraries. Inspired by libraries like postgres, it gives you just the right abstraction for real-world systems without hiding the power of AMQP.


🚀 Features

  • for await…of streaming API — process messages naturally with backpressure
  • Auto reconnect — connection and channel recovery handled transparently with exponential backoff + jitter
  • JSON decoding by default — or opt-out for raw Buffer access
  • Manual ack() / nack() — total control over message flow
  • Composable — works beautifully with p-map, exstream.js, highland, standard Node streams and more
  • Multiple isolated connections — supports multi-tenant, multi-env, and dynamic routing
  • TypeScript support — full type inference over event data
  • Zero dependencies (core version) — just amqplib under the hood

📦 Installation

npm install coniglio

✨ Usage

import coniglio from 'coniglio'

const conn = await coniglio('amqp://localhost')

// Configure queues and exchanges
// (optional, useful for bootstrapping or tests)
await conn.configure({
  queues: [
    { name: 'jobs.email', durable: true }
  ],
  exchanges: [
    { name: 'domain.events', type: 'topic', durable: true }
  ]
})

// Consume messages from a queue
for await (const msg of conn.listen('jobs.email')) {
  await sendEmail(msg.data)
  conn.ack(msg)
}

// Publish messages to an exchange
setInterval(() => {
  conn.publish('domain.events', 'jobs.email', { message: 'hello world' })
}, 200)

📦 Table of contents

  1. Why Coniglio?
  2. API
  3. Philosophy
  4. Resilience
  5. Multiple connections
  6. TypeScript Integration
  7. Usage in Production Systems
  8. Coming Soon
  9. License

🐇 Why Coniglio?

If you’ve used amqplib, you know it’s the canonical RabbitMQ library for Node.js — powerful, stable, and low-level. But it leaves you wiring:

  • reconnect logic
  • message decoding
  • ack/nack control
  • error-safe consumption
  • backpressure handling
  • channel separation for pub/sub

Coniglio wraps amqplib with a modern, minimal layer built for real apps. You get the same underlying power, but with an API that feels natural and production-ready.

✅ A quick comparison

| Feature | amqplib | coniglio ✅ | | ------------------------------ | ------------------- | --------------------- | | Promise API | ⚠️ Basic (thenable) | ✅ Fully async/await | | Manual reconnects | ❌ You handle it | ✅ Built-in | | Message streaming | ❌ No | ✅ for await...of | | Built-in JSON decoding | ❌ Raw Buffer | ✅ On by default | | Safe manual ack() / nack() | ✅ Yes | ✅ Ergonomic handling | | Channel separation (pub/sub) | ❌ Manual | ✅ Automatic | | Backpressure-friendly | ❌ Needs plumbing | ✅ Native support | | TypeScript types | ⚠️ Community | ✅ First-class |

🐇 Coniglio is Italian for “rabbit” — simple, fast, and alert.

for await (const msg of coniglio.listen('my-queue')) {
  try {
    await handle(msg.body)
    msg.ack()
  } catch (err) {
    msg.nack()
  }
}

📖 API

const conn = await coniglio(url, opts?)

Creates a new connection instance.

const conn = await coniglio('amqp://localhost', {
  logger: console, // optional custom logger
  json: true,      // default: true (parse JSON messages)
  prefetch: 10,    // default: 10 (number of unacknowledged messages)
})

If you want to use a custom logger, it should implement the Logger interface:

export interface Logger {
  debug(...args: any[]): void
  info(...args: any[]): void
  warn(...args: any[]): void
  error(...args: any[]): void
}

Example with pino:

import pino from 'pino'
const log = pino({ level: 'debug' })
const conn = await coniglio('amqp://localhost', { logger: log })

conn.listen(queue, opts?)

Returns an async iterator of messages consumed from a queue.

interface ListenOptions {
  json?: boolean           // default: true (parse JSON)
  prefetch?: number        // default: 10
  routingKeys?: (keyof T)[] // optional filter, TypeScript-safe
}

Each yielded msg: Message<T> has:

interface Message<T> {
  raw: amqp.ConsumeMessage    // original AMQP message
  content: Buffer             // raw message body
  data?: T                    // parsed JSON if json = true
  contentIsJson: boolean      // true if JSON parsed successfully
  routingKey: string          // the routing key
}

JSON-enabled example

for await (const msg of conn.listen('my-queue', { prefetch: 100 })) {
  // msg.data is parsed JSON
  if (msg.contentIsJson) {
    processData(msg.data)
  } else {
    // handle parsing error
    console.warn('Failed to parse JSON:', msg.content.toString())
  }
  conn.ack(msg)
}

JSON-opt-out example

for await (const msg of conn.listen('my-queue', { json: false })) {
  // msg.data === undefined
  // use msg.content (Buffer) directly
  processRaw(msg.content)
  conn.ack(msg)
}

conn.ack(msg) / conn.nack(msg, requeue = false)

Manually acknowledge or reject a message.

conn.ack(msg)
conn.nack(msg, true) // requeue = true

conn.publish(exchange, routingKey, payload, opts?)

Publish a message. Payload is serialized with JSON.stringify under the hood.

await conn.publish('domain.events', 'user.created', { userId: '123' }, { priority: 5 })
  • Retry: on failure, retries indefinitely with exponential backoff + jitter (1s → 30s).
  • Confirm channel: ensures broker receipt before resolving.

conn.configure({ queues, exchanges })

Declare queues and exchanges explicitly. Useful for bootstrapping, tests, and dynamic setups.

await conn.configure({
  exchanges: [
    { name: 'domain.events', type: 'topic', durable: true }
  ],
  queues: [
    {
      name: 'jobs.email',
      durable: true,
      bindTo: [{ exchange: 'domain.events', routingKey: 'jobs.email' }]
    }
  ]
})
interface ConfigureOptions {
  exchanges?: {
    name: string
    type: 'topic' | 'fanout' | 'direct' | 'headers'
    durable?: boolean
    autoDelete?: boolean
    internal?: boolean
    arguments?: Record<string, any>
  }[]
  queues?: {
    name: string
    durable?: boolean
    exclusive?: boolean
    autoDelete?: boolean
    deadLetterExchange?: string
    messageTtl?: number
    maxLength?: number
    arguments?: Record<string, any>
    bindTo?: {
      exchange: string
      routingKey: string
      arguments?: Record<string, any>
    }[]
  }[]
}

await conn.close()

Gracefully close all channels and the underlying connection. Use this during application shutdown.


🧠 Philosophy

coniglio doesn’t manage your concurrency. It just delivers messages as an async generator. Use your favorite tool:

import pMap from 'p-map'

await pMap(
  conn.listen('jobs.video'),
  async msg => {
    await transcode(msg.data)
    conn.ack(msg)
  },
  { concurrency: 5 }
)

Or go reactive:

import { pipeline } from 'exstream'

await pipeline(
  conn.listen('metrics'),
  s => s.map(msg => parse(msg.data)),
  s => s.forEach(logMetric)
)

💪 Resilience by design

  • Detects and recovers from connection or channel failures automatically
  • Messages aren't lost or stuck unacknowledged
  • Keeps the developer in control — no magic retries or swallowing errors

✅ Multiple connections

const prod = coniglio('amqp://prod-host')
const qa = coniglio('amqp://qa-host')

await prod.publish('events', 'prod.ready', { ok: true })
await qa.publish('events', 'qa.ready', { ok: true })

🧩 TypeScript Integration

If you are using TypeScript and want full type safety over your messages, you can pass a generic type map to coniglio():

type RouteKeyMap = {
  'user.created': { userId: string }
  'invoice.sent': { invoiceId: string; total: number }
}

const r = await coniglio<RouteKeyMap>('amqp://localhost')

// Consume all events from a queue
for await (const msg of conn.listen('queue.main')) {
  switch (msg.event) {
    case 'user.created': {
      msg.data.userId // ✅ typed as string
      break
    }
    case 'invoice.sent': {
      msg.data.invoiceId // ✅ typed as string
      msg.data.total // ✅ typed as number
      break
    }
  }
}

// Or filter statically by known events (with narrow typing)
for await (const msg of conn.listen('queue.main', { routeKeys: ['user.created'] })) {
  msg.data.userId // ✅ typed as string
}

🏗️ Usage in Production Systems

Coniglio is designed to thrive in real-world setups with high message throughput and resilience requirements.

✔️ Backpressure support Using native for await...of, you get natural backpressure without buffering hell.

✔️ Controlled flow Acknowledge or retry only when you're ready — no leaking messages or auto-ack surprises.

✔️ Crash-safe reconnect If RabbitMQ restarts, coniglio handles reconnection, re-initialization, and queue rebinds with zero config.

✔️ Works in microservices & monoliths Use it in a Fastify server, a background worker, or a Kubernetes job — it's just an async iterator.

✔️ Easy to observe and test You own the consumer loop. Add metrics, tracing, or mocks wherever you need — no magic, no black boxes.


🪄 Coming soon (planned)

  • [ ] test suite with real RabbitMQ integration
  • [ ] conn.stream() for full ReadableStream interop (with AbortSignal)
  • [ ] Built-in metrics and logging hooks
  • [ ] Retry helpers (e.g. DLQ support, customizable backoff)

📘 License

MIT — as simple and open as the API itself.