npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2025 – Pkg Stats / Ryan Hefner

@bernierllc/message-queue

v1.0.2

Published

Atomic message queuing utilities for message ordering and persistence

Readme

@bernierllc/message-queue

Atomic message queuing utilities for message ordering and persistence.

Features

  • Priority-based Message Ordering: Messages are processed in priority order (URGENT > HIGH > NORMAL > LOW)
  • Configurable Retry Logic: Automatic retry with exponential backoff for failed message processing
  • Message Expiration: TTL (Time To Live) support for automatic message cleanup
  • Event System: Real-time events for queue state changes
  • Message Filtering: Filter messages by priority, timestamp, metadata, and more
  • Queue Statistics: Comprehensive metrics and monitoring
  • Queue Management: Pause, resume, and clear queue operations
  • TypeScript Support: Full type safety with comprehensive interfaces

Installation

npm install @bernierllc/message-queue

Dependencies: This package depends on @bernierllc/retry-policy for exponential backoff calculations. It will be installed automatically.

Quick Start

import { MessageQueue, MessagePriority } from '@bernierllc/message-queue';

// Create a queue
const queue = new MessageQueue({
  name: 'my-queue',
  maxSize: 1000,
  enableRetries: true,
  maxRetries: 3
});

// Add messages
queue.addMessage('Hello World', MessagePriority.NORMAL);
queue.addMessage('Urgent task', MessagePriority.URGENT);

// Process messages
const processor = async (message) => {
  console.log(`Processing: ${message.content}`);
  // Do some work...
  return true; // Success
};

await queue.processMessage(processor);

API Reference

MessageQueue

The main class for managing message queues.

Constructor

new MessageQueue(config: QueueConfig, options?: QueueOptions)

QueueConfig:

  • name: string - Queue name (required)
  • maxSize?: number - Maximum number of messages in queue
  • defaultPriority?: MessagePriority - Default priority for messages
  • defaultTtl?: number - Default time-to-live in milliseconds
  • enableRetries?: boolean - Enable automatic retry on failure
  • maxRetries?: number - Maximum number of retry attempts
  • retryDelay?: number - Base delay between retries in milliseconds (default: 1000)
  • retryMaxDelay?: number - Maximum delay between retries in milliseconds (default: 10x retryDelay)
  • retryJitter?: boolean - Whether to add jitter to retry delays (default: true)

QueueOptions:

  • enableEvents?: boolean - Enable event system (default: true)
  • enableStats?: boolean - Enable statistics tracking (default: true)
  • enablePersistence?: boolean - Enable persistence (default: false)

Methods

addMessage(content, priority?, metadata?)

Add a message to the queue.

const result = queue.addMessage('message content', MessagePriority.HIGH, { userId: '123' });
// Returns: { success: boolean, messageId?: string, error?: string, stats?: QueueStats }
getNextMessage()

Get the next message from the queue (removes it from queue).

const message = queue.getNextMessage();
// Returns: Message | null
processMessage(processor)

Process a message with the provided processor function.

const processor = async (message: Message) => {
  // Process the message
  return true; // Return true for success, false for failure
};

const result = await queue.processMessage(processor);
// Returns: { success: boolean, error?: string, stats?: QueueStats }
getMessages(filter?)

Get messages matching the filter criteria.

const messages = queue.getMessages({
  priority: MessagePriority.HIGH,
  fromTimestamp: new Date('2023-01-01'),
  limit: 10,
  offset: 0
});
removeMessage(messageId)

Remove a specific message from the queue.

const result = queue.removeMessage('msg_1234567890_abc123');
// Returns: { success: boolean, error?: string, stats?: QueueStats }
clear()

Clear all messages from the queue.

const result = queue.clear();
// Returns: { success: boolean, stats?: QueueStats }
getStats()

Get queue statistics.

const stats = queue.getStats();
// Returns: QueueStats
getInfo()

Get comprehensive queue information.

const info = queue.getInfo();
// Returns: QueueInfo
onEvent(handler)

Add an event handler.

queue.onEvent((event: QueueEvent) => {
  console.log(`Event: ${event.type} - ${event.messageId}`);
});
offEvent(handler)

Remove an event handler.

queue.offEvent(handler);
pause()

Pause the queue (no new messages can be added).

queue.pause();
resume()

Resume the queue.

queue.resume();

Properties

  • size: number - Current number of messages in queue
  • isEmpty: boolean - Whether queue is empty
  • isFull: boolean - Whether queue is at maximum capacity

Types

MessagePriority

enum MessagePriority {
  LOW = 0,
  NORMAL = 1,
  HIGH = 2,
  URGENT = 3
}

Message

interface Message {
  id: string;
  content: any;
  priority: MessagePriority;
  timestamp: Date;
  expiresAt?: Date;
  metadata?: Record<string, any>;
  retryCount?: number;
  maxRetries?: number;
}

QueueEvent

interface QueueEvent {
  type: 'message_added' | 'message_processed' | 'message_failed' | 'queue_full' | 'queue_empty';
  messageId?: string;
  timestamp: Date;
  data?: any;
}

MessageFilter

interface MessageFilter {
  priority?: MessagePriority;
  fromTimestamp?: Date;
  toTimestamp?: Date;
  metadata?: Record<string, any>;
  limit?: number;
  offset?: number;
}

Examples

Basic Usage

import { MessageQueue, MessagePriority } from '@bernierllc/message-queue';

const queue = new MessageQueue({
  name: 'email-queue',
  maxSize: 1000,
  enableRetries: true,
  maxRetries: 3
});

// Add messages
queue.addMessage({ to: '[email protected]', subject: 'Welcome' }, MessagePriority.NORMAL);
queue.addMessage({ to: '[email protected]', subject: 'Alert' }, MessagePriority.HIGH);

// Process messages
const emailProcessor = async (message) => {
  console.log(`Sending email: ${message.content.subject}`);
  // Send email logic here
  return true;
};

while (!queue.isEmpty) {
  await queue.processMessage(emailProcessor);
}

Event-Driven Processing

const queue = new MessageQueue({ name: 'event-queue' });

// Set up event handlers
queue.onEvent((event) => {
  switch (event.type) {
    case 'message_added':
      console.log(`Message added: ${event.messageId}`);
      break;
    case 'message_processed':
      console.log(`Message processed: ${event.messageId}`);
      break;
    case 'queue_full':
      console.log('Queue is full!');
      break;
  }
});

// Add messages
queue.addMessage('Event message 1');
queue.addMessage('Event message 2');

Message Filtering

const queue = new MessageQueue({ name: 'filter-queue' });

// Add messages with metadata
queue.addMessage('User notification', MessagePriority.HIGH, { 
  type: 'notification', 
  userId: '123' 
});

queue.addMessage('System log', MessagePriority.LOW, { 
  type: 'log', 
  system: 'auth' 
});

// Filter by metadata
const notifications = queue.getMessages({ 
  metadata: { type: 'notification' } 
});

// Filter by priority
const highPriority = queue.getMessages({ 
  priority: MessagePriority.HIGH 
});

// Filter with pagination
const paginated = queue.getMessages({ 
  limit: 10, 
  offset: 0 
});

Retry Logic with Exponential Backoff

const queue = new MessageQueue({
  name: 'retry-queue',
  enableRetries: true,
  maxRetries: 3,
  retryDelay: 1000,        // Base delay: 1 second
  retryMaxDelay: 10000,    // Max delay: 10 seconds
  retryJitter: true        // Add jitter to prevent thundering herd
});

queue.addMessage('Failing task');

const processor = async (message) => {
  // Simulate unreliable processing
  if (Math.random() < 0.7) {
    throw new Error('Processing failed');
  }
  return true;
};

await queue.processMessage(processor);
// Message will be retried with exponential backoff:
// Attempt 1: ~1 second delay
// Attempt 2: ~2 seconds delay  
// Attempt 3: ~4 seconds delay
// Uses @bernierllc/retry-policy for backoff calculation

Error Handling

The queue provides comprehensive error handling:

const result = queue.addMessage('test');
if (!result.success) {
  console.error('Failed to add message:', result.error);
}

const processResult = await queue.processMessage(processor);
if (!processResult.success) {
  console.error('Failed to process message:', processResult.error);
}

Performance Considerations

  • Memory Usage: Messages are stored in memory by default. For large queues, consider implementing persistence.
  • Processing Speed: The queue processes messages sequentially. For high-throughput scenarios, consider using multiple queues or workers.
  • Event Handlers: Keep event handlers lightweight to avoid blocking queue operations.

License

This file is licensed to the client under a limited-use license. The client may use and modify this code only within the scope of the project it was delivered for. Redistribution or use in other products or commercial offerings is not permitted without written consent from Bernier LLC.