@bernierllc/message-queue
v1.0.2
Published
Atomic message queuing utilities for message ordering and persistence
Readme
@bernierllc/message-queue
Atomic message queuing utilities for message ordering and persistence.
Features
- Priority-based Message Ordering: Messages are processed in priority order (URGENT > HIGH > NORMAL > LOW)
- Configurable Retry Logic: Automatic retry with exponential backoff for failed message processing
- Message Expiration: TTL (Time To Live) support for automatic message cleanup
- Event System: Real-time events for queue state changes
- Message Filtering: Filter messages by priority, timestamp, metadata, and more
- Queue Statistics: Comprehensive metrics and monitoring
- Queue Management: Pause, resume, and clear queue operations
- TypeScript Support: Full type safety with comprehensive interfaces
Installation
npm install @bernierllc/message-queueDependencies:
This package depends on @bernierllc/retry-policy for exponential backoff calculations. It will be installed automatically.
Quick Start
import { MessageQueue, MessagePriority } from '@bernierllc/message-queue';
// Create a queue
const queue = new MessageQueue({
name: 'my-queue',
maxSize: 1000,
enableRetries: true,
maxRetries: 3
});
// Add messages
queue.addMessage('Hello World', MessagePriority.NORMAL);
queue.addMessage('Urgent task', MessagePriority.URGENT);
// Process messages
const processor = async (message) => {
console.log(`Processing: ${message.content}`);
// Do some work...
return true; // Success
};
await queue.processMessage(processor);API Reference
MessageQueue
The main class for managing message queues.
Constructor
new MessageQueue(config: QueueConfig, options?: QueueOptions)QueueConfig:
name: string- Queue name (required)maxSize?: number- Maximum number of messages in queuedefaultPriority?: MessagePriority- Default priority for messagesdefaultTtl?: number- Default time-to-live in millisecondsenableRetries?: boolean- Enable automatic retry on failuremaxRetries?: number- Maximum number of retry attemptsretryDelay?: number- Base delay between retries in milliseconds (default: 1000)retryMaxDelay?: number- Maximum delay between retries in milliseconds (default: 10x retryDelay)retryJitter?: boolean- Whether to add jitter to retry delays (default: true)
QueueOptions:
enableEvents?: boolean- Enable event system (default: true)enableStats?: boolean- Enable statistics tracking (default: true)enablePersistence?: boolean- Enable persistence (default: false)
Methods
addMessage(content, priority?, metadata?)
Add a message to the queue.
const result = queue.addMessage('message content', MessagePriority.HIGH, { userId: '123' });
// Returns: { success: boolean, messageId?: string, error?: string, stats?: QueueStats }getNextMessage()
Get the next message from the queue (removes it from queue).
const message = queue.getNextMessage();
// Returns: Message | nullprocessMessage(processor)
Process a message with the provided processor function.
const processor = async (message: Message) => {
// Process the message
return true; // Return true for success, false for failure
};
const result = await queue.processMessage(processor);
// Returns: { success: boolean, error?: string, stats?: QueueStats }getMessages(filter?)
Get messages matching the filter criteria.
const messages = queue.getMessages({
priority: MessagePriority.HIGH,
fromTimestamp: new Date('2023-01-01'),
limit: 10,
offset: 0
});removeMessage(messageId)
Remove a specific message from the queue.
const result = queue.removeMessage('msg_1234567890_abc123');
// Returns: { success: boolean, error?: string, stats?: QueueStats }clear()
Clear all messages from the queue.
const result = queue.clear();
// Returns: { success: boolean, stats?: QueueStats }getStats()
Get queue statistics.
const stats = queue.getStats();
// Returns: QueueStatsgetInfo()
Get comprehensive queue information.
const info = queue.getInfo();
// Returns: QueueInfoonEvent(handler)
Add an event handler.
queue.onEvent((event: QueueEvent) => {
console.log(`Event: ${event.type} - ${event.messageId}`);
});offEvent(handler)
Remove an event handler.
queue.offEvent(handler);pause()
Pause the queue (no new messages can be added).
queue.pause();resume()
Resume the queue.
queue.resume();Properties
size: number- Current number of messages in queueisEmpty: boolean- Whether queue is emptyisFull: boolean- Whether queue is at maximum capacity
Types
MessagePriority
enum MessagePriority {
LOW = 0,
NORMAL = 1,
HIGH = 2,
URGENT = 3
}Message
interface Message {
id: string;
content: any;
priority: MessagePriority;
timestamp: Date;
expiresAt?: Date;
metadata?: Record<string, any>;
retryCount?: number;
maxRetries?: number;
}QueueEvent
interface QueueEvent {
type: 'message_added' | 'message_processed' | 'message_failed' | 'queue_full' | 'queue_empty';
messageId?: string;
timestamp: Date;
data?: any;
}MessageFilter
interface MessageFilter {
priority?: MessagePriority;
fromTimestamp?: Date;
toTimestamp?: Date;
metadata?: Record<string, any>;
limit?: number;
offset?: number;
}Examples
Basic Usage
import { MessageQueue, MessagePriority } from '@bernierllc/message-queue';
const queue = new MessageQueue({
name: 'email-queue',
maxSize: 1000,
enableRetries: true,
maxRetries: 3
});
// Add messages
queue.addMessage({ to: '[email protected]', subject: 'Welcome' }, MessagePriority.NORMAL);
queue.addMessage({ to: '[email protected]', subject: 'Alert' }, MessagePriority.HIGH);
// Process messages
const emailProcessor = async (message) => {
console.log(`Sending email: ${message.content.subject}`);
// Send email logic here
return true;
};
while (!queue.isEmpty) {
await queue.processMessage(emailProcessor);
}Event-Driven Processing
const queue = new MessageQueue({ name: 'event-queue' });
// Set up event handlers
queue.onEvent((event) => {
switch (event.type) {
case 'message_added':
console.log(`Message added: ${event.messageId}`);
break;
case 'message_processed':
console.log(`Message processed: ${event.messageId}`);
break;
case 'queue_full':
console.log('Queue is full!');
break;
}
});
// Add messages
queue.addMessage('Event message 1');
queue.addMessage('Event message 2');Message Filtering
const queue = new MessageQueue({ name: 'filter-queue' });
// Add messages with metadata
queue.addMessage('User notification', MessagePriority.HIGH, {
type: 'notification',
userId: '123'
});
queue.addMessage('System log', MessagePriority.LOW, {
type: 'log',
system: 'auth'
});
// Filter by metadata
const notifications = queue.getMessages({
metadata: { type: 'notification' }
});
// Filter by priority
const highPriority = queue.getMessages({
priority: MessagePriority.HIGH
});
// Filter with pagination
const paginated = queue.getMessages({
limit: 10,
offset: 0
});Retry Logic with Exponential Backoff
const queue = new MessageQueue({
name: 'retry-queue',
enableRetries: true,
maxRetries: 3,
retryDelay: 1000, // Base delay: 1 second
retryMaxDelay: 10000, // Max delay: 10 seconds
retryJitter: true // Add jitter to prevent thundering herd
});
queue.addMessage('Failing task');
const processor = async (message) => {
// Simulate unreliable processing
if (Math.random() < 0.7) {
throw new Error('Processing failed');
}
return true;
};
await queue.processMessage(processor);
// Message will be retried with exponential backoff:
// Attempt 1: ~1 second delay
// Attempt 2: ~2 seconds delay
// Attempt 3: ~4 seconds delay
// Uses @bernierllc/retry-policy for backoff calculationError Handling
The queue provides comprehensive error handling:
const result = queue.addMessage('test');
if (!result.success) {
console.error('Failed to add message:', result.error);
}
const processResult = await queue.processMessage(processor);
if (!processResult.success) {
console.error('Failed to process message:', processResult.error);
}Performance Considerations
- Memory Usage: Messages are stored in memory by default. For large queues, consider implementing persistence.
- Processing Speed: The queue processes messages sequentially. For high-throughput scenarios, consider using multiple queues or workers.
- Event Handlers: Keep event handlers lightweight to avoid blocking queue operations.
License
This file is licensed to the client under a limited-use license. The client may use and modify this code only within the scope of the project it was delivered for. Redistribution or use in other products or commercial offerings is not permitted without written consent from Bernier LLC.
