mqbus
v1.0.0
Published
A simplified RabbitMQ client wrapper for Node.js based on amqplib/callback_api with advanced features and TypeScript support
Maintainers
Readme
MQBus - Super Simple RabbitMQ Wrapper
The easiest way to use RabbitMQ in Node.js! 🚀
A simplified wrapper around amqplib/callback_api that reduces boilerplate by 90%.
No more boilerplate code - just publish and subscribe. MQBus automatically handles:
- ✅ Connection & Reconnection
- ✅ Queue & Exchange Creation
- ✅ Error Handling
- ✅ Message Retry
- ✅ All the boring stuff!
Installation
npm install mqbus💡 New to MQBus? Check out QUICKSTART.md for a step-by-step tutorial with Docker setup!
Quick Start
import mqbus from 'mqbus';
const client = new mqbus({ url: 'amqp://localhost' });
// Send message - that's it!
await client.publish('my-queue', { hello: 'world' });
// Receive messages - that's it!
await client.subscribe('my-queue', (message) => {
console.log('Got:', message);
});Why MQBus?
❌ Without MQBus (amqplib):
import amqp from 'amqplib/callback_api';
amqp.connect('amqp://localhost', (err, connection) => {
if (err) throw err;
connection.createChannel((err, channel) => {
if (err) throw err;
const queue = 'my-queue';
channel.assertQueue(queue, { durable: true }, (err) => {
if (err) throw err;
const msg = JSON.stringify({ hello: 'world' });
channel.sendToQueue(queue, Buffer.from(msg), { persistent: true });
console.log('Sent');
});
});
});✅ With MQBus:
const client = new mqbus({ url: 'amqp://localhost' });
await client.publish('my-queue', { hello: 'world' });90% less code! 🎉
Simple Examples
1. Basic Queue
const client = new mqbus();
// Subscribe (queue is created automatically!)
await client.subscribe('tasks', (task) => {
console.log('Processing:', task);
});
// Publish (no setup needed!)
await client.publish('tasks', { name: 'Send email' });2. Topic Exchange
// Subscribe to error logs only
await client.subscribe('error-handler', (log) => {
console.log('Error:', log);
}, {
exchange: 'logs',
exchangeType: 'topic',
routingKey: '*.error' // matches app.error, db.error, etc.
});
// Publish to different topics
await client.publish('logs', { msg: 'Database failed' }, {
exchange: 'logs',
exchangeType: 'topic',
routingKey: 'db.error'
});3. Fanout (Broadcast)
// Multiple subscribers receive the same message
await client.subscribe('email-service', handleNotification, {
exchange: 'notifications',
exchangeType: 'fanout'
});
await client.subscribe('sms-service', handleNotification, {
exchange: 'notifications',
exchangeType: 'fanout'
});
// Broadcast to all
await client.publish('notifications', { msg: 'New user!' }, {
exchange: 'notifications',
exchangeType: 'fanout'
});4. Auto-Retry on Errors
await client.subscribe('critical-tasks', async (task) => {
// If this throws an error, message is automatically retried!
await processTask(task);
}, {
autoRetry: true,
maxRetries: 3
});5. High Performance
// Process 10 messages at once
await client.subscribe('fast-queue', processMessage, {
prefetch: 10
});API
Create Client
const client = new mqbus({
url: 'amqp://localhost', // RabbitMQ URL
reconnectInterval: 5000, // Reconnect delay (ms)
onConnect: () => console.log('✅'), // Connection callback
onError: (err) => console.error(err), // Error callback
logger: customLogger // Custom logger
});Publish
await client.publish(queue, message, options?)Simple:
await client.publish('my-queue', { data: 'hello' });With Exchange:
await client.publish('my-key', { data: 'hello' }, {
exchange: 'my-exchange',
exchangeType: 'direct', // 'direct' | 'fanout' | 'topic' | 'headers'
routingKey: 'custom.key',
priority: 5, // 0-10
expiration: 60000, // TTL in ms
persistent: true // Survive restart (default: true)
});Subscribe
await client.subscribe(queue, handler, options?)Simple:
await client.subscribe('my-queue', (message) => {
console.log(message);
});With Options:
await client.subscribe('my-queue', async (message) => {
await process(message);
}, {
exchange: 'my-exchange',
exchangeType: 'topic',
routingKey: '*.important',
prefetch: 5, // Process N messages at once
durable: true, // Queue survives restart (default: true)
autoRetry: true, // Auto-retry on error
maxRetries: 3 // Max retry attempts
});Common Patterns
Work Queue (Load Balancing)
// Multiple workers share the load
const client = new mqbus();
// Worker 1
await client.subscribe('tasks', processTask, { prefetch: 1 });
// Worker 2
await client.subscribe('tasks', processTask, { prefetch: 1 });
// Producer
await client.publish('tasks', { job: 'send-email' });Pub/Sub (Broadcast)
// Publisher
await client.publish('news', { title: 'Breaking News' }, {
exchange: 'news',
exchangeType: 'fanout'
});
// Subscriber 1
await client.subscribe('mobile-app', handleNews, {
exchange: 'news',
exchangeType: 'fanout'
});
// Subscriber 2
await client.subscribe('web-app', handleNews, {
exchange: 'news',
exchangeType: 'fanout'
});Topic Routing
// Error logs only
await client.subscribe('errors', handleError, {
exchange: 'logs',
exchangeType: 'topic',
routingKey: '*.error'
});
// All app logs
await client.subscribe('app-logs', handleLog, {
exchange: 'logs',
exchangeType: 'topic',
routingKey: 'app.*'
});
// All logs
await client.subscribe('all-logs', handleLog, {
exchange: 'logs',
exchangeType: 'topic',
routingKey: '#'
});Error Handling
MQBus handles errors automatically, but you can also add custom handling:
const client = new mqbus({
onError: (err) => {
console.error('RabbitMQ Error:', err);
// Send to monitoring service
sendToSentry(err);
},
onClose: () => {
console.log('Connection lost, reconnecting...');
},
onConnect: () => {
console.log('Connected!');
}
});Auto-Retry
Messages that fail processing can be automatically retried:
await client.subscribe('my-queue', async (message) => {
// If this throws, message is retried automatically
await riskyOperation(message);
}, {
autoRetry: true,
maxRetries: 3 // Try up to 3 times
});Retry uses exponential backoff:
- 1st retry: after 1 second
- 2nd retry: after 2 seconds
- 3rd retry: after 4 seconds
TypeScript Support
Full TypeScript support with intellisense:
import mqbus, {
PublishOptions,
SubscribeOptions,
ExchangeType
} from 'mqbus';
const options: PublishOptions = {
exchange: 'my-exchange',
exchangeType: 'topic',
priority: 5
};Technical Details
MQBus is built on top of amqplib/callback_api (the callback-based API of amqplib), providing:
- ✅ Simpler API with sensible defaults
- ✅ Automatic resource management (queues, exchanges, bindings)
- ✅ Built-in error handling and retry logic
- ✅ Full TypeScript support with type definitions
- ✅ Production-ready with automatic reconnection
All the power of amqplib, none of the boilerplate! 🚀
Comparison
| Feature | amqplib | MQBus | |---------|---------|----------| | Lines of code | 20+ | 2-3 | | Auto-create queues | ❌ | ✅ | | Auto-reconnect | ❌ | ✅ | | Error handling | Manual | ✅ Automatic | | Message retry | Manual | ✅ Automatic | | Exchange setup | Manual | ✅ Automatic | | TypeScript | Partial | ✅ Full |
Examples
See /examples folder for complete examples:
00-super-simple.ts- Ultra simple examples01-basic.ts- Basic usage02-work-queue.ts- Load balancing03-pubsub.ts- Broadcasting04-routing.ts- Topic routing05-dlx.ts- Dead letter exchange06-priority.ts- Priority queues
Running Examples
Start RabbitMQ with Docker Compose (Recommended)
# Start RabbitMQ with management UI
docker-compose up -d
# Check if it's running
docker-compose ps
# View logs
docker-compose logs -f
# Stop when done
docker-compose downRabbitMQ Management UI: http://localhost:15672 (guest/guest)
Or use Docker directly
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-managementRun Examples
# Build the project
npm run build
# Run super simple example
npx ts-node examples/00-super-simple.ts
# Run basic example
npx ts-node examples/01-basic.ts
# Run work queue example (multiple terminals)
npx ts-node examples/02-work-queue.ts producer
npx ts-node examples/02-work-queue.ts worker 1Best Practices
- Use prefetch for performance:
await client.subscribe('queue', handler, { prefetch: 10 });- Enable retry for critical tasks:
await client.subscribe('queue', handler, {
autoRetry: true,
maxRetries: 3
});- Use topic exchanges for flexibility:
await client.subscribe('queue', handler, {
exchange: 'events',
exchangeType: 'topic',
routingKey: 'user.*'
});- Handle errors gracefully:
const client = new mqbus({
onError: (err) => logger.error(err),
onClose: () => logger.warn('Reconnecting...')
});License
MIT
Contributing
Contributions welcome! Open an issue or submit a PR.
Support
- 📖 Documentation: README.md
- 🐛 Issues: GitHub Issues
- 💬 Discussions: GitHub Discussions
Made with ❤️ to simplify RabbitMQ
Star ⭐ the repo if you find it useful!
