connection-manager-pkg
v2.0.2
Published
connection manager utility for rabbitmq
Readme
🐇 AMQP Connection Manager
A robust Node.js connection manager for RabbitMQ/AMQP with automatic reconnection, graceful shutdown, and connection pooling.
Features
- Singleton Pattern - Single shared connection across your application
- Automatic Reconnection - Configurable retry logic with exponential backoff
- Graceful Shutdown - Proper cleanup on SIGINT/SIGTERM signals
- Connection Pooling - Reuses existing connections to avoid overhead
- Event-Driven - Subscribe to connection events for custom handling
- Thread-Safe - Prevents race conditions during concurrent connection attempts
Installation
npm install connection-manager-pkgQuick Start
const ConnectionManager = require('./ConnectionManager');
// Configure the connection
ConnectionManager.configure({
url: 'amqp://localhost:5672',
maxRetries: 5,
timeout: 5000
});
// Get a channel
const channel = await ConnectionManager.getChannel();
// Use the channel
await channel.assertQueue('my-queue');
await channel.sendToQueue('my-queue', Buffer.from('Hello World'));API Reference
configure(options)
Configure the connection manager. Must be called before connecting.
Parameters:
url(string, required) - AMQP connection URL (e.g.,amqp://user:pass@localhost:5672)maxRetries(number, optional) - Maximum reconnection attempts. Default:5timeout(number, optional) - Delay between reconnection attempts in milliseconds. Default:5000
Example:
ConnectionManager.configure({
url: 'amqp://admin:[email protected]:5672',
maxRetries: 10,
timeout: 3000
});connect()
Establishes a connection to the AMQP broker. Returns the channel if successful.
Returns: Promise<Channel> - An amqplib channel instance
Example:
try {
const channel = await ConnectionManager.connect();
console.log('Connected to RabbitMQ');
} catch (error) {
console.error('Connection failed:', error);
}getChannel()
Returns the existing channel or creates a new connection if none exists.
Returns: Promise<Channel> - An amqplib channel instance
Example:
const channel = await ConnectionManager.getChannel();
await channel.publish('exchange', 'routing.key', Buffer.from('message'));on(event, listener)
Subscribe to connection manager events.
Events:
new-channel- Emitted when a new channel is created (passes the channel as an argument)
Example:
ConnectionManager.on('new-channel', (channel) => {
console.log('New channel created, re-asserting queues...');
channel.assertQueue('my-queue', { durable: true });
});Usage Examples
Basic Producer
const ConnectionManager = require('./ConnectionManager');
ConnectionManager.configure({
url: 'amqp://localhost:5672'
});
async function sendMessage(queue, message) {
const channel = await ConnectionManager.getChannel();
await channel.assertQueue(queue, { durable: true });
channel.sendToQueue(queue, Buffer.from(message), { persistent: true });
console.log(`Sent: ${message}`);
}
sendMessage('tasks', 'Process this task');Basic Consumer
const ConnectionManager = require('./ConnectionManager');
ConnectionManager.configure({
url: 'amqp://localhost:5672'
});
async function consumeMessages(queue) {
const channel = await ConnectionManager.getChannel();
await channel.assertQueue(queue, { durable: true });
channel.consume(queue, (msg) => {
if (msg) {
console.log(`Received: ${msg.content.toString()}`);
channel.ack(msg);
}
});
}
consumeMessages('tasks');Connection Lifecycle
- Configuration - Call
configure()with connection parameters - Connection - Call
connect()orgetChannel()to establish connection - Auto-Reconnect - On connection loss, automatically retries with configurable backoff
- Graceful Shutdown - On SIGINT/SIGTERM, closes connection cleanly
Error Handling
The connection manager handles errors automatically:
- Connection errors - Automatically triggers reconnection logic
- Connection closure - Reconnects unless shutting down
- Max retries exceeded - Logs failure and stops reconnection attempts
Best Practices
- Configure once - Call
configure()at application startup - Reuse channels - Use
getChannel()instead ofconnect()for subsequent calls - Listen to events - Subscribe to
new-channelif needed
Requirements
- Node.js 14.x or higher
- amqplib ^0.10.0
License
ISC
Contributing
Contributions are welcome! Please submit pull requests or open issues for bugs and feature requests.
