uor-rabbit
v1.0.0
Published
RabbitMQ utility library for UOR Budget applications
Readme
UOR Rabbit
A reusable RabbitMQ utility library for UOR Budget applications that provides both producer and consumer functionality.
Features
- Simple, configurable RabbitMQ connection management
- Message producer for sending messages to queues
- Message consumer for receiving and processing messages
- Automatic reconnection handling
- Event-based communication
- Flexible message format support
- Error handling and logging
Installation
npm install uor-rabbitUsage
Producer Example
const { RabbitProducer } = require('uor-rabbit');
// Initialize producer with custom options
const producer = new RabbitProducer({
url: process.env.RABBITMQ_URL || 'amqp://localhost',
queueName: process.env.RABBITMQ_QUEUE || 'telegram_notifications',
logger: console // or your custom logger
});
// Connect to RabbitMQ
await producer.initialize();
// Send a message
await producer.sendMessage({
receivers: ['123456789'],
body: 'Hello from UOR Budget!',
data: { timestamp: new Date().toISOString() }
});
// Or send to multiple receivers
await producer.sendMessage({
receivers: ['123456789', '987654321'],
body: 'Broadcast message',
data: { type: 'notification' }
});
// Listen for events
producer.on('messageSent', (message) => {
console.log('Message sent successfully:', message);
});
producer.on('error', (error) => {
console.error('RabbitMQ error:', error.message);
});
// Close connection when done
await producer.close();Consumer Example
const { RabbitConsumer } = require('uor-rabbit');
// Initialize consumer with custom options
const consumer = new RabbitConsumer({
url: process.env.RABBITMQ_URL || 'amqp://localhost',
queueName: process.env.RABBITMQ_QUEUE || 'telegram_notifications',
logger: console // or your custom logger
});
// Connect to RabbitMQ
await consumer.initialize();
// Register message handler
consumer.onMessage(async (receivers, body, data) => {
console.log(`Processing message for ${receivers.length} receivers`);
for (const userId of receivers) {
// Process each receiver
console.log(`Sending message to user ${userId}: ${body}`);
}
});
// Start consuming messages
await consumer.startConsuming();
// Listen for events
consumer.on('message', (message) => {
console.log('Received message:', message);
});
consumer.on('error', (error) => {
console.error('RabbitMQ error:', error.message);
});
// Close connection when done
await consumer.close();Configuration Options
Both RabbitProducer and RabbitConsumer accept the following options:
| Option | Description | Default | |--------|-------------|---------| | url | RabbitMQ connection URL | process.env.RABBITMQ_URL or 'amqp://localhost' | | queueName | Queue name | process.env.RABBITMQ_QUEUE or 'telegram_notifications' | | queueOptions | Queue options | { durable: true } | | reconnectInterval | Reconnect interval in ms | 5000 | | maxReconnectAttempts | Maximum reconnect attempts | 3 | | logger | Logger instance | console | | messageOptions | Message options (Producer only) | { persistent: true } | | consumeOptions | Consume options (Consumer only) | {} |
Message Format
The library supports the following message format:
{
"receivers": ["123456789", "987654321"],
"body": "Your notification message",
"data": { "key": "value" }
}The receivers field must be a non-empty array of receiver IDs.
Events
Common Events
connected: Emitted when connected to RabbitMQerror: Emitted on general errorsconnectionError: Emitted on connection errorsconnectionClosed: Emitted when connection is closed unexpectedlyreconnected: Emitted when successfully reconnectedreconnectError: Emitted when reconnection failsmaxReconnectAttemptsReached: Emitted when maximum reconnect attempts are reachedclosed: Emitted when connection is closedcloseError: Emitted when there's an error closing the connection
Producer Events
messageSent: Emitted when a message is sent successfullysendError: Emitted when there's an error sending a message
Consumer Events
consumingStarted: Emitted when consuming startsconsumeError: Emitted when there's an error starting to consumemessage: Emitted when a message is received and validatedmessageError: Emitted when there's an error processing a messageinvalidMessage: Emitted when a message is invalid
License
AGPL-3.0
