npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

mqbus

v1.0.0

Published

A simplified RabbitMQ client wrapper for Node.js based on amqplib/callback_api with advanced features and TypeScript support

Readme

MQBus - Super Simple RabbitMQ Wrapper

The easiest way to use RabbitMQ in Node.js! 🚀

A simplified wrapper around amqplib/callback_api that reduces boilerplate by 90%.

No more boilerplate code - just publish and subscribe. MQBus automatically handles:

  • ✅ Connection & Reconnection
  • ✅ Queue & Exchange Creation
  • ✅ Error Handling
  • ✅ Message Retry
  • ✅ All the boring stuff!

Installation

npm install mqbus

💡 New to MQBus? Check out QUICKSTART.md for a step-by-step tutorial with Docker setup!

Quick Start

import mqbus from 'mqbus';

const client = new mqbus({ url: 'amqp://localhost' });

// Send message - that's it!
await client.publish('my-queue', { hello: 'world' });

// Receive messages - that's it!
await client.subscribe('my-queue', (message) => {
  console.log('Got:', message);
});

Why MQBus?

❌ Without MQBus (amqplib):

import amqp from 'amqplib/callback_api';

amqp.connect('amqp://localhost', (err, connection) => {
  if (err) throw err;
  
  connection.createChannel((err, channel) => {
    if (err) throw err;
    
    const queue = 'my-queue';
    
    channel.assertQueue(queue, { durable: true }, (err) => {
      if (err) throw err;
      
      const msg = JSON.stringify({ hello: 'world' });
      channel.sendToQueue(queue, Buffer.from(msg), { persistent: true });
      console.log('Sent');
    });
  });
});

✅ With MQBus:

const client = new mqbus({ url: 'amqp://localhost' });
await client.publish('my-queue', { hello: 'world' });

90% less code! 🎉

Simple Examples

1. Basic Queue

const client = new mqbus();

// Subscribe (queue is created automatically!)
await client.subscribe('tasks', (task) => {
  console.log('Processing:', task);
});

// Publish (no setup needed!)
await client.publish('tasks', { name: 'Send email' });

2. Topic Exchange

// Subscribe to error logs only
await client.subscribe('error-handler', (log) => {
  console.log('Error:', log);
}, {
  exchange: 'logs',
  exchangeType: 'topic',
  routingKey: '*.error'  // matches app.error, db.error, etc.
});

// Publish to different topics
await client.publish('logs', { msg: 'Database failed' }, {
  exchange: 'logs',
  exchangeType: 'topic',
  routingKey: 'db.error'
});

3. Fanout (Broadcast)

// Multiple subscribers receive the same message
await client.subscribe('email-service', handleNotification, {
  exchange: 'notifications',
  exchangeType: 'fanout'
});

await client.subscribe('sms-service', handleNotification, {
  exchange: 'notifications',
  exchangeType: 'fanout'
});

// Broadcast to all
await client.publish('notifications', { msg: 'New user!' }, {
  exchange: 'notifications',
  exchangeType: 'fanout'
});

4. Auto-Retry on Errors

await client.subscribe('critical-tasks', async (task) => {
  // If this throws an error, message is automatically retried!
  await processTask(task);
}, {
  autoRetry: true,
  maxRetries: 3
});

5. High Performance

// Process 10 messages at once
await client.subscribe('fast-queue', processMessage, {
  prefetch: 10
});

API

Create Client

const client = new mqbus({
  url: 'amqp://localhost',           // RabbitMQ URL
  reconnectInterval: 5000,           // Reconnect delay (ms)
  onConnect: () => console.log('✅'), // Connection callback
  onError: (err) => console.error(err), // Error callback
  logger: customLogger               // Custom logger
});

Publish

await client.publish(queue, message, options?)

Simple:

await client.publish('my-queue', { data: 'hello' });

With Exchange:

await client.publish('my-key', { data: 'hello' }, {
  exchange: 'my-exchange',
  exchangeType: 'direct',  // 'direct' | 'fanout' | 'topic' | 'headers'
  routingKey: 'custom.key',
  priority: 5,             // 0-10
  expiration: 60000,       // TTL in ms
  persistent: true         // Survive restart (default: true)
});

Subscribe

await client.subscribe(queue, handler, options?)

Simple:

await client.subscribe('my-queue', (message) => {
  console.log(message);
});

With Options:

await client.subscribe('my-queue', async (message) => {
  await process(message);
}, {
  exchange: 'my-exchange',
  exchangeType: 'topic',
  routingKey: '*.important',
  prefetch: 5,             // Process N messages at once
  durable: true,           // Queue survives restart (default: true)
  autoRetry: true,         // Auto-retry on error
  maxRetries: 3            // Max retry attempts
});

Common Patterns

Work Queue (Load Balancing)

// Multiple workers share the load
const client = new mqbus();

// Worker 1
await client.subscribe('tasks', processTask, { prefetch: 1 });

// Worker 2  
await client.subscribe('tasks', processTask, { prefetch: 1 });

// Producer
await client.publish('tasks', { job: 'send-email' });

Pub/Sub (Broadcast)

// Publisher
await client.publish('news', { title: 'Breaking News' }, {
  exchange: 'news',
  exchangeType: 'fanout'
});

// Subscriber 1
await client.subscribe('mobile-app', handleNews, {
  exchange: 'news',
  exchangeType: 'fanout'
});

// Subscriber 2
await client.subscribe('web-app', handleNews, {
  exchange: 'news',
  exchangeType: 'fanout'
});

Topic Routing

// Error logs only
await client.subscribe('errors', handleError, {
  exchange: 'logs',
  exchangeType: 'topic',
  routingKey: '*.error'
});

// All app logs
await client.subscribe('app-logs', handleLog, {
  exchange: 'logs',
  exchangeType: 'topic',
  routingKey: 'app.*'
});

// All logs
await client.subscribe('all-logs', handleLog, {
  exchange: 'logs',
  exchangeType: 'topic',
  routingKey: '#'
});

Error Handling

MQBus handles errors automatically, but you can also add custom handling:

const client = new mqbus({
  onError: (err) => {
    console.error('RabbitMQ Error:', err);
    // Send to monitoring service
    sendToSentry(err);
  },
  onClose: () => {
    console.log('Connection lost, reconnecting...');
  },
  onConnect: () => {
    console.log('Connected!');
  }
});

Auto-Retry

Messages that fail processing can be automatically retried:

await client.subscribe('my-queue', async (message) => {
  // If this throws, message is retried automatically
  await riskyOperation(message);
}, {
  autoRetry: true,
  maxRetries: 3  // Try up to 3 times
});

Retry uses exponential backoff:

  • 1st retry: after 1 second
  • 2nd retry: after 2 seconds
  • 3rd retry: after 4 seconds

TypeScript Support

Full TypeScript support with intellisense:

import mqbus, { 
  PublishOptions, 
  SubscribeOptions,
  ExchangeType 
} from 'mqbus';

const options: PublishOptions = {
  exchange: 'my-exchange',
  exchangeType: 'topic',
  priority: 5
};

Technical Details

MQBus is built on top of amqplib/callback_api (the callback-based API of amqplib), providing:

  • ✅ Simpler API with sensible defaults
  • ✅ Automatic resource management (queues, exchanges, bindings)
  • ✅ Built-in error handling and retry logic
  • ✅ Full TypeScript support with type definitions
  • ✅ Production-ready with automatic reconnection

All the power of amqplib, none of the boilerplate! 🚀

Comparison

| Feature | amqplib | MQBus | |---------|---------|----------| | Lines of code | 20+ | 2-3 | | Auto-create queues | ❌ | ✅ | | Auto-reconnect | ❌ | ✅ | | Error handling | Manual | ✅ Automatic | | Message retry | Manual | ✅ Automatic | | Exchange setup | Manual | ✅ Automatic | | TypeScript | Partial | ✅ Full |

Examples

See /examples folder for complete examples:

  • 00-super-simple.ts - Ultra simple examples
  • 01-basic.ts - Basic usage
  • 02-work-queue.ts - Load balancing
  • 03-pubsub.ts - Broadcasting
  • 04-routing.ts - Topic routing
  • 05-dlx.ts - Dead letter exchange
  • 06-priority.ts - Priority queues

Running Examples

Start RabbitMQ with Docker Compose (Recommended)

# Start RabbitMQ with management UI
docker-compose up -d

# Check if it's running
docker-compose ps

# View logs
docker-compose logs -f

# Stop when done
docker-compose down

RabbitMQ Management UI: http://localhost:15672 (guest/guest)

Or use Docker directly

docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management

Run Examples

# Build the project
npm run build

# Run super simple example
npx ts-node examples/00-super-simple.ts

# Run basic example
npx ts-node examples/01-basic.ts

# Run work queue example (multiple terminals)
npx ts-node examples/02-work-queue.ts producer
npx ts-node examples/02-work-queue.ts worker 1

Best Practices

  1. Use prefetch for performance:
await client.subscribe('queue', handler, { prefetch: 10 });
  1. Enable retry for critical tasks:
await client.subscribe('queue', handler, { 
  autoRetry: true, 
  maxRetries: 3 
});
  1. Use topic exchanges for flexibility:
await client.subscribe('queue', handler, {
  exchange: 'events',
  exchangeType: 'topic',
  routingKey: 'user.*'
});
  1. Handle errors gracefully:
const client = new mqbus({
  onError: (err) => logger.error(err),
  onClose: () => logger.warn('Reconnecting...')
});

License

MIT

Contributing

Contributions welcome! Open an issue or submit a PR.

Support


Made with ❤️ to simplify RabbitMQ

Star ⭐ the repo if you find it useful!