@vvlad1973/queues-manager
v2.0.1
Published
RabbitMQ-compatible in-memory message queue with priority support, TTL, retries, and exchange routing
Maintainers
Readme
@vvlad1973/queues-manager
RabbitMQ-compatible in-memory message queue with priority support, TTL, retries, and exchange routing.
Features
- RabbitMQ-compatible API for seamless migration
- Dual backend support: In-Memory and RabbitMQ
- Priority queues with configurable levels
- Message TTL (Time To Live)
- Automatic retry with backoff strategies
- Dead Letter Queue (DLQ) support
- Exchange routing (Direct, Topic, Fanout)
- Consumer management with prefetch limiting
- Message acknowledgment (ack/nack/reject)
- Metrics and statistics collection
- Full TypeScript support
Installation
npm install @vvlad1973/queues-managerFor RabbitMQ backend support:
npm install @vvlad1973/queues-manager amqplibQuick Start
In-Memory Backend
import { InMemoryConnection } from '@vvlad1973/queues-manager';
// Create connection
const connection = new InMemoryConnection();
const channel = await connection.createChannel();
// Create queue
await channel.assertQueue('tasks', {
maxLength: 1000,
maxPriority: 10,
messageTtl: 60000, // 1 minute
});
// Send message
channel.sendToQueue('tasks', { task: 'process-data' }, { priority: 5 });
// Consume messages
await channel.consume('tasks', async (msg) => {
console.log('Received:', msg.getContent());
channel.ack(msg);
});
// Close connection
await connection.close();RabbitMQ Backend
import { ConnectionFactory, ConnectionType } from '@vvlad1973/queues-manager';
// Create RabbitMQ connection
const connection = await ConnectionFactory.createRabbitMQ({
url: 'amqp://localhost',
});
const channel = await connection.createChannel();
// Same API as in-memory
await channel.assertQueue('tasks');
channel.sendToQueue('tasks', { task: 'process-data' });
await channel.consume('tasks', async (msg) => {
console.log('Received:', msg.getContent());
channel.ack(msg);
});Advanced Examples
Priority Queue
const channel = await connection.createChannel();
await channel.assertQueue('priority-tasks', {
maxPriority: 10,
});
// High priority
channel.sendToQueue('priority-tasks', { urgent: true }, { priority: 9 });
// Normal priority
channel.sendToQueue('priority-tasks', { urgent: false }, { priority: 5 });
// Low priority
channel.sendToQueue('priority-tasks', { urgent: false }, { priority: 1 });Message TTL and Dead Letter Queue
await channel.assertQueue('dlq'); // Dead letter queue
await channel.assertQueue('tasks', {
messageTtl: 30000, // Messages expire after 30 seconds
deadLetterQueue: 'dlq',
});
channel.sendToQueue('tasks', { data: 'expires soon' });
// After 30 seconds, message moves to DLQ
await channel.consume('dlq', async (msg) => {
console.log('Expired message:', msg.getContent());
channel.ack(msg);
});Automatic Retry with Backoff
await channel.assertQueue('retry-tasks', {
maxRetries: 3,
retryDelay: 1000, // 1 second base delay
retryBackoff: 'exponential', // exponential backoff
});
await channel.consume('retry-tasks', async (msg) => {
try {
await processTask(msg.getContent());
channel.ack(msg);
} catch (error) {
// Auto-retry with exponential backoff
channel.nack(msg, false, true);
}
});Exchange Routing
// Create exchange
await channel.assertExchange('logs', 'topic');
// Bind queues
await channel.assertQueue('error-logs');
await channel.assertQueue('info-logs');
await channel.bindQueue('error-logs', 'logs', 'error.*');
await channel.bindQueue('info-logs', 'logs', 'info.*');
// Publish messages
channel.publish('logs', 'error.database', { msg: 'DB connection failed' });
channel.publish('logs', 'info.startup', { msg: 'Server started' });
// Consume from specific queue
await channel.consume('error-logs', async (msg) => {
console.log('Error:', msg.getContent());
channel.ack(msg);
});Consumer Prefetch Limiting
const channel = await connection.createChannel();
// Limit unacked messages per consumer
channel.prefetch(5);
await channel.consume('tasks', async (msg) => {
await processLongTask(msg.getContent());
channel.ack(msg);
});Metrics and Monitoring
import { MetricsCollector } from '@vvlad1973/queues-manager';
const metrics = new MetricsCollector();
// Track message lifecycle
metrics.recordMessagePublished('tasks');
metrics.recordMessageConsumed('tasks');
metrics.recordMessageAcknowledged('tasks');
// Get statistics
const stats = metrics.getQueueStats('tasks');
console.log('Published:', stats.published);
console.log('Consumed:', stats.consumed);
console.log('Acknowledged:', stats.acknowledged);
console.log('Failed:', stats.failed);API Reference
Connection
InMemoryConnection
const connection = new InMemoryConnection();
const channel = await connection.createChannel();
await connection.close();RabbitMQConnection
import { RabbitMQConnection } from '@vvlad1973/queues-manager';
const connection = new RabbitMQConnection({
url: 'amqp://localhost',
socketOptions: { heartbeat: 30 },
});
await connection.connect();
const channel = await connection.createChannel();
await connection.close();ConnectionFactory
import { ConnectionFactory, ConnectionType } from '@vvlad1973/queues-manager';
// In-memory
const inMemory = await ConnectionFactory.createInMemory();
// RabbitMQ
const rabbitMQ = await ConnectionFactory.createRabbitMQ({
url: 'amqp://localhost',
});
// Factory pattern
const connection = ConnectionFactory.create({
type: ConnectionType.RABBITMQ,
options: { url: 'amqp://localhost' },
});Channel Operations
Queue Management
// Assert queue
await channel.assertQueue('queue-name', {
durable: true,
autoDelete: false,
exclusive: false,
maxLength: 1000,
maxPriority: 10,
messageTtl: 60000,
deadLetterExchange: 'dlx',
deadLetterQueue: 'dlq',
maxRetries: 3,
retryDelay: 1000,
});
// Delete queue
await channel.deleteQueue('queue-name', {
ifUnused: true,
ifEmpty: true,
});
// Purge queue
await channel.purgeQueue('queue-name');Exchange Management
// Assert exchange
await channel.assertExchange('exchange-name', 'topic', {
durable: true,
autoDelete: false,
internal: false,
});
// Delete exchange
await channel.deleteExchange('exchange-name', {
ifUnused: true,
});
// Bind queue to exchange
await channel.bindQueue('queue-name', 'exchange-name', 'routing.key');
// Unbind queue
await channel.unbindQueue('queue-name', 'exchange-name', 'routing.key');Publishing
// Send to queue
channel.sendToQueue('queue-name', { data: 'value' }, {
priority: 5,
persistent: true,
expiration: '60000',
headers: { 'x-custom': 'value' },
correlationId: 'abc123',
replyTo: 'reply-queue',
});
// Publish to exchange
channel.publish('exchange-name', 'routing.key', Buffer.from('data'), {
priority: 5,
persistent: true,
mandatory: true,
});Consuming
// Consume messages
const { consumerTag } = await channel.consume(
'queue-name',
async (msg) => {
console.log(msg.getContent());
channel.ack(msg);
},
{
noAck: false,
exclusive: false,
priority: 0,
}
);
// Cancel consumer
await channel.cancel(consumerTag);
// Get single message
const msg = await channel.get('queue-name', { noAck: false });
if (msg) {
console.log(msg.getContent());
channel.ack(msg);
}Message Handling
// Acknowledge
channel.ack(message);
channel.ack(message, true); // Ack all up to this delivery tag
// Negative acknowledge
channel.nack(message);
channel.nack(message, false, true); // Single message, requeue
// Reject
channel.reject(message);
channel.reject(message, false); // Don't requeueMessage Object
interface Message {
content: Buffer;
fields: {
deliveryTag: number;
redelivered: boolean;
exchange: string;
routingKey: string;
consumerTag?: string;
};
properties: {
contentType?: string;
contentEncoding?: string;
headers?: Record<string, any>;
priority?: number;
correlationId?: string;
replyTo?: string;
expiration?: string;
messageId?: string;
timestamp?: number;
appId?: string;
};
// Helper methods
getContent<T = any>(): T;
acknowledged: boolean;
rejected: boolean;
}Migration from v1.x
Before (v1.x)
import Queue from '@vvlad1973/simple-queue';
const queue = new Queue<string>(10);
queue.enqueue('task1', 5);
const item = queue.dequeue();After (v2.x)
import { InMemoryConnection } from '@vvlad1973/queues-manager';
const connection = new InMemoryConnection();
const channel = await connection.createChannel();
await channel.assertQueue('tasks', { maxLength: 10, maxPriority: 10 });
channel.sendToQueue('tasks', 'task1', { priority: 5 });
const msg = await channel.get('tasks');
if (msg) {
const item = msg.getContent();
channel.ack(msg);
}Or continue using the old Queue class:
import Queue from '@vvlad1973/queues-manager/core/queue';
const queue = new Queue<string>(10);
queue.enqueue('task1', 5);
const item = queue.dequeue();Testing
# Run tests
npm test
# Run tests with coverage
npm run test:coverage
# Watch mode
npm run test:watchPerformance
The in-memory backend uses a binary search insertion algorithm for O(log n) enqueue operations and O(1) dequeue operations.
License
MIT License with Commercial Use
Contributing
Contributions are welcome! Please feel free to submit a Pull Request.
Support
For issues and questions, please use the GitHub issue tracker.
