npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

@resilientmq/core

v2.3.1

Published

Core logic for resilient RabbitMQ event processing

Readme

🆕 Ignoring events with controlled errors

You can throw the IgnoredEventError from an event handler to indicate that the event should be ignored and marked as successfully processed, even if the normal logic was not executed. This is useful for cases where certain events should not be retried or sent to the DLQ.

import { IgnoredEventError } from '@resilientmq/core/dist/resilience';

const consumer = new ResilientConsumer({
  // ...configuration
  eventsToProcess: [
    {
      type: 'user.created',
      handler: async (event) => {
        if (event.payload.ignore) {
          throw new IgnoredEventError('Event ignored by business rule');
        }
        // normal logic...
      }
    }
  ]
});

When this error is thrown, the event is marked as DONE and will not be retried or sent to the DLQ.

@resilientmq/core

CI/CD Pipeline npm version Node.js Version TypeScript License: MIT

Core logic for the resilient message queue system built on top of RabbitMQ, providing middleware support, retry logic, dead-letter handling, and persistent event lifecycle management.

Table of Contents

📦 Installation

npm install @resilientmq/core

Make sure to also install the core types package:

npm install @resilientmq/types__core

📚 Purpose

This package contains the runtime logic for publishing and consuming resilient events. It includes:

  • A pluggable consumer with retry + DLQ logic
  • Publisher with persist-before-send safety
  • Middleware pipeline
  • Custom logger
  • Full TypeScript support

v2.3.0 Highlights

  • Publisher lane isolation: realtime publish lane and pending/retry lane can run on separate RabbitMQ pools.
  • Automatic idempotent persistence path when your store implements saveEventIfNotExists.
  • EWMA-based adaptive pending concurrency (latency + error-rate) with configurable thresholds.
  • Expanded unit-test coverage for publisher architecture and adaptive controls.

🧩 Main Concepts

| Feature | Description | |--------|-------------| | publish(event) | Publishes a message safely to a queue or exchange | | consume(handler) | Starts a consumer to process incoming messages | | ResilientConsumer | Handles connection, retry, DLQ, and auto-reconnect | | ResilientEventPublisher | Publishes events with status persistence | | log(level, message) | Unified logging mechanism | | Middleware | Custom logic pipeline on message consumption |


🔧 Config: ResilientConsumerConfig

| Property | Type | Required | Description | Subtype Fields | |----------------------------|-----------------------------|----------|------------------------------------|----------------| | connection | string \| Options.Connect | ✅ | RabbitMQ URI or connection config | – | | consumeQueue.queue | string | ✅ | Queue name to consume | – | | consumeQueue.options | AssertQueueOptions | ✅ | Queue assertion options | durable, arguments | | consumeQueue.exchanges | ExchangeConfig[] | ❌ | exchanges to bind queue to | name, type, routingKey, options | | retryQueue.queue | string | ❌ | Retry queue for failed messages | – | | retryQueue.options | AssertQueueOptions | ❌ | Queue options | durable, arguments | | retryQueue.exchange | ExchangeConfig | ❌ | Exchange for retry routing | name, type, routingKey, options | | retryQueue.ttlMs | number | ❌ | Delay before retrying | – | | retryQueue.maxAttempts | number | ❌ | Max retries before DLQ (default 5) | – | | deadLetterQueue.queue | string | ❌ | Final destination after retries | – | | deadLetterQueue.options | AssertQueueOptions | ❌ | DLQ queue options | durable | | deadLetterQueue.exchange | ExchangeConfig | ❌ | DLQ exchange | name, type, routingKey, options | | eventsToProcess | EventProcessConfig[] | ✅ | List of handled event types | type, handler | | store | EventStore | ❌ | Persistent layer for events | saveEvent, saveEventIfNotExists (optional), getEvent, updateEventStatus, deleteEvent, getPendingEvents (optional), batchUpdateEventStatus (optional) | | storeConnectionRetries | number | ❌ | Max retry attempts for store connection (default: 3) | – | | storeConnectionRetryDelayMs | number | ❌ | Delay between store retry attempts in ms (default: 1000) | – | | cleanupConsumerPrefetch | number | ❌ | Prefetch for secondary cleanup connection used when ignoreUnknownEvents is true (default: 500, set 0 to disable) | – | | middleware | Middleware[] | ❌ | Hooks to wrap event execution | (event, next) => Promise | | maxUptimeMs | number | ❌ | Restart consumer after X ms | – | | exitIfIdle | boolean | ❌ | Exit process if idle | – | | idleCheckIntervalMs | number | ❌ | Time between idle checks | – | | maxIdleChecks | number | ❌ | How many checks until exit | – |


🔧 Config: ResilientPublisherConfig

| Property | Type | Required | Description | |----------|------|----------|-------------| | connection | string \| Options.Connect | ✅ | RabbitMQ URI or config | | queue | string | ❌ | Target queue (direct publish) | | exchange | ExchangeConfig | ❌ | Exchange for fanout/direct | | store | EventStore | ❌* | Event metadata persistence (optional unless instantPublish is false) | | instantPublish | boolean | ❌ | If true (default), publishes immediately. If false, stores for later delivery | | pendingEventsCheckIntervalMs | number | ❌ | Interval to check and send pending events (ms). Only effective when instantPublish is false | | maxConcurrentPublishes | number | ❌ | Global backpressure limit for concurrent publish operations (default: 100) | | maxConnections | number | ❌ | Number of RabbitMQ connections in the pool for load distribution (default: 1) | | separatePendingConnections | boolean | ❌ | Use a dedicated connection pool for pending/retry publishing (default: true) | | pendingMaxConnections | number | ❌ | Connection count for pending/retry lane when separated (default: maxConnections) | | pendingEventsBatchSize | number | ❌ | Default number of pending events fetched per batch when calling processPendingEvents() | | pendingEventsMaxPublishesPerSecond | number | ❌ | Default max number of pending events dispatched per second during processPendingEvents() | | pendingEventsMaxConcurrentPublishes | number | ❌ | Default max number of pending events published in parallel during processPendingEvents() | | pendingAdaptiveConcurrency | boolean | ❌ | Enable adaptive pending concurrency control (default: true) | | pendingAdaptiveEwmaAlpha | number | ❌ | EWMA alpha for adaptive signals, range (0,1] (default: 0.2) | | pendingAdaptiveTargetLatencyMs | number | ❌ | Optional latency target for adaptive scaling (ms) | | pendingAdaptiveErrorThresholdSoft | number | ❌ | Soft EWMA error-rate threshold for gradual backoff (default: 0.08) | | pendingAdaptiveErrorThresholdHard | number | ❌ | Hard EWMA error-rate threshold for aggressive backoff (default: 0.2) | | storeConnectionRetries | number | ❌ | Max retry attempts for store connection (default: 3) | | storeConnectionRetryDelayMs | number | ❌ | Delay between store retry attempts in ms (default: 1000) | | metricsEnabled | boolean | ❌ | Enable runtime metrics collection for the publisher |

Note: When instantPublish is set to false, a store with getPendingEvents() method is REQUIRED.

Idempotency optimization: If your store implements saveEventIfNotExists(event): Promise<boolean>, publisher automatically uses that single-write path; otherwise it falls back to getEvent + saveEvent.


🧩 Custom Event Storage Format

You can fully control how events are stored and retrieved by providing a serializer in your EventStore implementation.

This allows you to decouple the in-memory event format from the database structure — useful for legacy systems or when mapping to existing schemas.

🔄 Example: Custom Storage Serializer

const store: EventStore = {
  serializer: {
    toStorageFormat(event) {
      return {
        _id: event.id,
        body: event.payload,
        customStatus: event.status
      };
    },
    fromStorageFormat(doc) {
      return {
        id: doc._id,
        messageId: doc._id,
        payload: doc.body,
        status: doc.customStatus,
        type: 'custom.type'
      };
    }
  },

  async saveEvent(event) {
    const doc = this.serializer.toStorageFormat(event);
    await db.insert(doc);
  },

  async getEvent(id) {
    const doc = await db.findById(id);
    return doc ? this.serializer.fromStorageFormat(doc) : null;
  },

  async updateEventStatus(id, status) {
    await db.update(id, { customStatus: status });
  },

  async deleteEvent(id) {
    await db.delete(id);
  },

  // Required for processPendingEvents() — supports optional `limit` for batched retrieval
  async getPendingEvents(status, limit?) {
    const query = db.find({ customStatus: status }).sort({ createdAt: 1 });
    return limit ? query.limit(limit) : query;
  }
};

🚀 Example: Consumer

import { ResilientConsumer } from '@resilientmq/core';
import mongoose from 'mongoose';

const Event = mongoose.model('Event', new mongoose.Schema({ id: String }));
const store = {
  saveEvent: async (e) => Event.create(e),
  getEvent: async (id) => Event.findOne({ messageId: id }),
  updateEventStatus: async (id, status) => Event.updateOne({ messageId: id }, { status }),
  deleteEvent: async (id) => Event.deleteOne({ messageId: id })
};

const consumer = new ResilientConsumer({
  connection: 'amqp://localhost',
  consumeQueue: {
    queue: 'user.queue',
    options: { durable: true },
    exchanges: [
      { name: 'orders.events', type: 'topic', routingKey: 'order.*', options: { durable: true } },
      { name: 'notifications.events', type: 'direct', routingKey: 'notification', options: { durable: true } }
    ]
  },
  eventsToProcess: [
    { type: 'user.created', handler: async (payload) => console.log('User created:', payload) },
    { type: 'order.placed', handler: async (payload) => console.log('Order placed:', payload) },
    { type: 'notification.sent', handler: async (payload) => console.log('Notification sent:', payload) }
  ],
  store
});

await consumer.start();

🚀 Example: Publisher

import { ResilientEventPublisher } from '@resilientmq/core';

const publisher = new ResilientEventPublisher({
  connection: 'amqp://localhost',
  exchange: {
    name: 'user.events',
    type: 'fanout',
    options: { durable: true }
  },
  maxConnections: 3  // Use 3 connections for load distribution (optional, default: 1)
});

// IMPORTANT: routingKey is now taken from each event's `routingKey` field when publishing to an exchange.
// If the event does not include `routingKey`, the publisher will send the message with no routing key.
// The exchange configuration no longer provides the routing key for per-message routing.

await publisher.publish({
  id: 'evt-1',
  messageId: 'msg-1',
  type: 'user.created',
  payload: { name: 'Alice' },
  status: 'PENDING_PUBLICATION',
  // Optional per-message routing key;
  routingKey: 'user.created'
});

Publisher Without Store

import { ResilientEventPublisher } from '@resilientmq/core';

const publisher = new ResilientEventPublisher({
  connection: 'amqp://localhost',
  exchange: {
    name: 'user.events',
    type: 'fanout',
    options: { durable: true }
  },
  maxConnections: 5  // Optional: distribute load across 5 connections
});

await publisher.publish({
  id: 'evt-2',
  messageId: 'msg-2',
  type: 'user.updated',
  payload: { name: 'Bob' },
  status: 'PENDING_PUBLICATION'
});

Publisher with Pending Events Processing

import { ResilientEventPublisher } from '@resilientmq/core';

const publisher = new ResilientEventPublisher({
  connection: 'amqp://localhost',
  exchange: {
    name: 'user.events',
    type: 'fanout',
    options: { durable: true }
  },
  store: myEventStore,
  // Check for pending events every 30 seconds
  pendingEventsCheckIntervalMs: 30000,
  // Realtime lane pool
  maxConnections: 3,
  // Dedicated pending/retry lane pool (default true)
  separatePendingConnections: true,
  pendingMaxConnections: 2,
  // Adaptive pending concurrency tuning (all optional)
  pendingAdaptiveConcurrency: true,
  pendingAdaptiveEwmaAlpha: 0.2,
  pendingAdaptiveErrorThresholdSoft: 0.08,
  pendingAdaptiveErrorThresholdHard: 0.2
});

// Store event for later delivery (e.g., when offline)
await publisher.publish({
  id: 'evt-3',
  messageId: 'msg-3',
  type: 'user.deleted',
  payload: { id: '123' },
  status: 'PENDING'
}, { storeOnly: true });

// The event will be automatically sent every 30 seconds if there are pending events

// Or manually process pending events
await publisher.processPendingEvents();

// Stop periodic checks when shutting down
publisher.stopPendingEventsCheck();

Key Features:

  • storeOnly: true: Stores the event without sending it immediately (useful for offline scenarios)
  • pendingEventsCheckIntervalMs: Configurable interval to automatically check and send pending events
  • processPendingEvents(): Manually trigger processing of pending events
  • stopPendingEventsCheck(): Stop the periodic check for graceful shutdown
  • maxConnections: Distribute load across multiple RabbitMQ connections (round-robin)
  • separatePendingConnections + pendingMaxConnections: Isolate realtime vs pending/retry traffic at connection level
  • Events are processed in the order returned by your store (implement sorting there if needed)
  • Only connects pending lane when there are pending events to process

⚡ Pending Events Processing with Rate Limiting (v2.1.5+)

processPendingEvents() supports configurable rate limiting to control throughput and prevent overwhelming your RabbitMQ broker or downstream systems.

Configuration options:

await publisher.processPendingEvents({
  batchSize: 1000,                    // Number of events to retrieve per batch (default: 100)
  maxPublishesPerSecond: 500,         // Maximum events published per second (default: same as batchSize)
  maxConcurrentPublishes: 10          // Maximum concurrent publish operations (default: min(10, maxPublishesPerSecond))
});

How it works:

  1. Retrieves batchSize pending events from the store
  2. Publishes events using a token bucket algorithm to respect maxPublishesPerSecond
  3. Maintains up to maxConcurrentPublishes parallel operations
  4. Updates all statuses in a single batch operation
  5. Repeats until no more pending events

Token Bucket Algorithm:

  • Provides smooth, continuous rate limiting (not fixed 1-second windows)
  • Allows burst capacity while maintaining average rate
  • Refills tokens continuously based on elapsed time

Performance characteristics:

  • Can achieve 500-1000 msg/s with proper configuration
  • Prevents broker overload with controlled throughput
  • Batch status updates reduce database overhead by 90%

Important: Events are processed in the order returned by your store. If you need chronological ordering, implement sorting in your getPendingEvents() method:

async getPendingEvents(status: EventPublishStatus, limit?: number): Promise<EventMessage[]> {
  const query = EventModel.find({ status }).sort({ createdAt: 1 }); // Sort by timestamp
  return limit ? query.limit(limit).exec() : query.exec();
}

Example: Implementing getPendingEvents with limit

// MongoDB / Mongoose example
async getPendingEvents(status: EventPublishStatus, limit?: number): Promise<EventMessage[]> {
  const query = EventModel.find({ status }).sort({ createdAt: 1 });
  if (limit) {
    query.limit(limit);
  }
  return query.exec();
}

// Prisma example
async getPendingEvents(status: EventPublishStatus, limit?: number): Promise<EventMessage[]> {
  return prisma.event.findMany({
    where: { status },
    orderBy: { createdAt: 'asc' },
    ...(limit ? { take: limit } : {})
  });
}

🚀 Critical: Implement Batch Status Updates (v2.1.2+)

For maximum performance, you MUST implement the optional batchUpdateEventStatus() method in your EventStore.

Benefits:

  • 90% reduction in database calls: From 1000 individual updates/s to ~10 batched calls/s
  • 10-100x throughput improvement: Process at maximum speed without database bottleneck
  • Backward compatible: Falls back to individual updates if not implemented
// MongoDB / Mongoose example
async batchUpdateEventStatus(
  updates: Array<{ event: EventMessage; status: EventPublishStatus }>
): Promise<void> {
  const bulkOps = updates.map(({ event, status }) => ({
    updateOne: {
      filter: { messageId: event.messageId },
      update: { $set: { status } }
    }
  }));
  
  await EventModel.bulkWrite(bulkOps);
}

// Prisma example
async batchUpdateEventStatus(
  updates: Array<{ event: EventMessage; status: EventPublishStatus }>
): Promise<void> {
  await prisma.$transaction(
    updates.map(({ event, status }) =>
      prisma.event.update({
        where: { messageId: event.messageId },
        data: { status }
      })
    )
  );
}

// SQL example (using Knex)
async batchUpdateEventStatus(
  updates: Array<{ event: EventMessage; status: EventPublishStatus }>
): Promise<void> {
  const trx = await db.transaction();
  try {
    for (const { event, status } of updates) {
      await trx('events')
        .where({ message_id: event.messageId })
        .update({ status });
    }
    await trx.commit();
  } catch (error) {
    await trx.rollback();
    throw error;
  }
}

How it works:

  • The publisher batches status updates every 100ms during processPendingEvents()
  • If batchUpdateEventStatus() is implemented, it's used automatically
  • If not implemented or if it fails, falls back to individual updateEventStatus() calls
  • No code changes needed in your publisher - just implement the method in your store

📊 Metrics

ResilientEventPublisher supports optional real-time metrics collection. Enable it with metricsEnabled: true in the config, then call getMetrics() at any time to get a snapshot.

import { ResilientConsumer, ResilientEventPublisher } from '@resilientmq/core';

// Consumer (metrics are disabled in runtime for minimal overhead)
const consumer = new ResilientConsumer({
  connection: 'amqp://localhost',
  consumeQueue: { queue: 'my.queue' },
  eventsToProcess: [{ type: 'order.created', handler: async (e) => { /* ... */ } }],
  metricsEnabled: true,
});

await consumer.start();

// Consumer runtime returns undefined for metrics by design
const snap = consumer.getMetrics();
console.log(snap); // undefined
// Publisher with metrics
const publisher = new ResilientEventPublisher({
  connection: 'amqp://localhost',
  queue: 'my.queue',
  metricsEnabled: true,   // <-- enable metrics
});

await publisher.publish({ messageId: 'id-1', type: 'order.created', payload: {} });

const snap = publisher.getMetrics();
console.log(snap?.messagesPublished); // 1

You can also use MetricsCollector standalone for custom instrumentation:

import { MetricsCollector } from '@resilientmq/core';

const metrics = new MetricsCollector();
metrics.increment('messagesReceived');
metrics.recordProcessingTime(42);
const snapshot = metrics.getSnapshot();
metrics.reset(); // reset all counters

Available Metrics

| Field | Description | |-------|-------------| | messagesReceived | Total messages received by the consumer | | messagesProcessed | Messages successfully processed | | messagesRetried | Messages retried at least once | | messagesFailed | Messages that failed permanently | | messagesSentToDLQ | Messages sent to the Dead Letter Queue | | messagesPublished | Messages published by the publisher | | processingErrors | Total processing errors encountered | | avgProcessingTimeMs | Average processing time in milliseconds | | lastActivityAt | Timestamp of the last recorded activity |

getMetrics() returns undefined when metricsEnabled is not set or is false.


🧪 Testing

@resilientmq/core includes a comprehensive automated testing strategy to ensure reliability, performance, and quality.

Test Suite Overview

| Test Type | Purpose | Execution Time | Coverage | |-----------|---------|----------------|----------| | Unit Tests | Fast, isolated component testing with mocks | < 30s | 70%+ code coverage (current suite at 100% for covered source set) | | Integration Tests | End-to-end testing with real RabbitMQ | < 5min | Full integration scenarios | | Stress Tests | High-volume and high-speed load testing | < 10min | Resilience validation | | Benchmarks | Performance measurement and regression detection | < 15min | Throughput & latency metrics |

Running Tests

# Run all tests
npm test

# Run specific test suites
npm run test:unit              # Unit tests only
npm run test:integration       # Integration tests (requires Docker)
npm run test:stress           # Stress tests (requires Docker)
npm run test:benchmark        # Performance benchmarks (requires Docker)

# Run with coverage
npm run test:coverage         # Generate coverage report
npm run coverage:check        # Validate coverage thresholds

# Quality checks
npm run benchmark:compare     # Check for performance regressions
npm run quality:check         # Run all quality gates

Test Infrastructure

  • Testcontainers: Automatic RabbitMQ container management for integration tests
  • Jest: Test framework with TypeScript support
  • Mocks & Helpers: Comprehensive test utilities for all components
  • Metrics Collection: Automated performance and resource usage tracking

CI/CD Integration

All tests run automatically on:

  • Every push to main/master/develop branches
  • Every pull request
  • Matrix testing across Node.js 18, 20, 22, 24 and 25

Quality Gates:

  • ✅ Minimum 70% code coverage
  • ✅ All tests must pass
  • ✅ Performance regression < 10%
  • ✅ Error rate under load < 1%

For detailed testing documentation, see test/README.md.


👥 Contributors


📄 License

MIT