npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

@resilientmq/core

v1.2.6

Published

Core logic for resilient RabbitMQ event processing

Readme

@resilientmq/core

CI/CD Pipeline npm version Node.js Version TypeScript License: MIT

Core logic for the resilient message queue system built on top of RabbitMQ, providing middleware support, retry logic, dead-letter handling, and persistent event lifecycle management.

Table of Contents

📦 Installation

npm install @resilientmq/core

Make sure to also install the core types package:

npm install @resilientmq/types__core

📚 Purpose

This package contains the runtime logic for publishing and consuming resilient events. It includes:

  • A pluggable consumer with retry + DLQ logic
  • Publisher with persist-before-send safety
  • Middleware pipeline
  • Custom logger
  • Full TypeScript support

🧩 Main Concepts

| Feature | Description | |--------|-------------| | publish(event) | Publishes a message safely to a queue or exchange | | consume(handler) | Starts a consumer to process incoming messages | | ResilientConsumer | Handles connection, retry, DLQ, and auto-reconnect | | ResilientEventPublisher | Publishes events with status persistence | | log(level, message) | Unified logging mechanism | | Middleware | Custom logic pipeline on message consumption |


🔧 Config: ResilientConsumerConfig

| Property | Type | Required | Description | Subtype Fields | |----------------------------|-----------------------------|----------|------------------------------------|----------------| | connection | string \| Options.Connect | ✅ | RabbitMQ URI or connection config | – | | consumeQueue.queue | string | ✅ | Queue name to consume | – | | consumeQueue.options | AssertQueueOptions | ✅ | Queue assertion options | durable, arguments | | consumeQueue.exchanges | ExchangeConfig[] | ❌ | exchanges to bind queue to | name, type, routingKey, options | | retryQueue.queue | string | ❌ | Retry queue for failed messages | – | | retryQueue.options | AssertQueueOptions | ❌ | Queue options | durable, arguments | | retryQueue.exchange | ExchangeConfig | ❌ | Exchange for retry routing | name, type, routingKey, options | | retryQueue.ttlMs | number | ❌ | Delay before retrying | – | | retryQueue.maxAttempts | number | ❌ | Max retries before DLQ (default 5) | – | | deadLetterQueue.queue | string | ❌ | Final destination after retries | – | | deadLetterQueue.options | AssertQueueOptions | ❌ | DLQ queue options | durable | | deadLetterQueue.exchange | ExchangeConfig | ❌ | DLQ exchange | name, type, routingKey, options | | eventsToProcess | EventProcessConfig[] | ✅ | List of handled event types | type, handler | | store | EventStore | ❌ | Persistent layer for events | saveEvent, getEvent, updateEventStatus, deleteEvent | | storeConnectionRetries | number | ❌ | Max retry attempts for store connection (default: 3) | – | | storeConnectionRetryDelayMs | number | ❌ | Delay between store retry attempts in ms (default: 1000) | – | | middleware | Middleware[] | ❌ | Hooks to wrap event execution | (event, next) => Promise | | maxUptimeMs | number | ❌ | Restart consumer after X ms | – | | exitIfIdle | boolean | ❌ | Exit process if idle | – | | idleCheckIntervalMs | number | ❌ | Time between idle checks | – | | maxIdleChecks | number | ❌ | How many checks until exit | – |


🔧 Config: ResilientPublisherConfig

| Property | Type | Required | Description | |----------|------|----------|-------------| | connection | string \| Options.Connect | ✅ | RabbitMQ URI or config | | queue | string | ❌ | Target queue (direct publish) | | exchange | ExchangeConfig | ❌ | Exchange for fanout/direct | | store | EventStore | ❌* | Event metadata persistence (optional unless instantPublish is false) | | instantPublish | boolean | ❌ | If true (default), publishes immediately. If false, stores for later delivery | | pendingEventsCheckIntervalMs | number | ❌ | Interval to check and send pending events (ms). Only effective when instantPublish is false | | storeConnectionRetries | number | ❌ | Max retry attempts for store connection (default: 3) | | storeConnectionRetryDelayMs | number | ❌ | Delay between store retry attempts in ms (default: 1000) |

Note: When instantPublish is set to false, a store with getPendingEvents() method is REQUIRED.


🧩 Custom Event Storage Format

You can fully control how events are stored and retrieved by providing a serializer in your EventStore implementation.

This allows you to decouple the in-memory event format from the database structure — useful for legacy systems or when mapping to existing schemas.

🔄 Example: Custom Storage Serializer

const store: EventStore = {
  serializer: {
    toStorageFormat(event) {
      return {
        _id: event.id,
        body: event.payload,
        customStatus: event.status
      };
    },
    fromStorageFormat(doc) {
      return {
        id: doc._id,
        messageId: doc._id,
        payload: doc.body,
        status: doc.customStatus,
        type: 'custom.type'
      };
    }
  },

  async saveEvent(event) {
    const doc = this.serializer.toStorageFormat(event);
    await db.insert(doc);
  },

  async getEvent(id) {
    const doc = await db.findById(id);
    return doc ? this.serializer.fromStorageFormat(doc) : null;
  },

  async updateEventStatus(id, status) {
    await db.update(id, { customStatus: status });
  },

  async deleteEvent(id) {
    await db.delete(id);
  }
};

🚀 Example: Consumer

import { ResilientConsumer } from '@resilientmq/core';
import mongoose from 'mongoose';

const Event = mongoose.model('Event', new mongoose.Schema({ id: String }));
const store = {
  saveEvent: async (e) => Event.create(e),
  getEvent: async (id) => Event.findOne({ messageId: id }),
  updateEventStatus: async (id, status) => Event.updateOne({ messageId: id }, { status }),
  deleteEvent: async (id) => Event.deleteOne({ messageId: id })
};

const consumer = new ResilientConsumer({
  connection: 'amqp://localhost',
  consumeQueue: {
    queue: 'user.queue',
    options: { durable: true },
    exchanges: [
      { name: 'orders.events', type: 'topic', routingKey: 'order.*', options: { durable: true } },
      { name: 'notifications.events', type: 'direct', routingKey: 'notification', options: { durable: true } }
    ]
  },
  eventsToProcess: [
    { type: 'user.created', handler: async (payload) => console.log('User created:', payload) },
    { type: 'order.placed', handler: async (payload) => console.log('Order placed:', payload) },
    { type: 'notification.sent', handler: async (payload) => console.log('Notification sent:', payload) }
  ],
  store
});

await consumer.start();

🚀 Example: Publisher

import { ResilientEventPublisher } from '@resilientmq/core';

const publisher = new ResilientEventPublisher({
  connection: 'amqp://localhost',
  exchange: {
    name: 'user.events',
    type: 'fanout',
    options: { durable: true }
  }
});

// IMPORTANT: routingKey is now taken from each event's `routingKey` field when publishing to an exchange.
// If the event does not include `routingKey`, the publisher will send the message with no routing key.
// The exchange configuration no longer provides the routing key for per-message routing.

await publisher.publish({
  id: 'evt-1',
  messageId: 'msg-1',
  type: 'user.created',
  payload: { name: 'Alice' },
  status: 'PENDING_PUBLICATION',
  // Optional per-message routing key;
  routingKey: 'user.created'
});

Publisher Without Store

import { ResilientEventPublisher } from '@resilientmq/core';

const publisher = new ResilientEventPublisher({
  connection: 'amqp://localhost',
  exchange: {
    name: 'user.events',
    type: 'fanout',
    options: { durable: true }
  }
});

await publisher.publish({
  id: 'evt-2',
  messageId: 'msg-2',
  type: 'user.updated',
  payload: { name: 'Bob' },
  status: 'PENDING_PUBLICATION'
});

Publisher with Pending Events Processing

import { ResilientEventPublisher } from '@resilientmq/core';

const publisher = new ResilientEventPublisher({
  connection: 'amqp://localhost',
  exchange: {
    name: 'user.events',
    type: 'fanout',
    options: { durable: true }
  },
  store: myEventStore,
  // Check for pending events every 30 seconds
  pendingEventsCheckIntervalMs: 30000
});

// Store event for later delivery (e.g., when offline)
await publisher.publish({
  id: 'evt-3',
  messageId: 'msg-3',
  type: 'user.deleted',
  payload: { id: '123' },
  status: 'PENDING'
}, { storeOnly: true });

// The event will be automatically sent every 30 seconds if there are pending events

// Or manually process pending events
await publisher.processPendingEvents();

// Stop periodic checks when shutting down
publisher.stopPendingEventsCheck();

Key Features:

  • storeOnly: true: Stores the event without sending it immediately (useful for offline scenarios)
  • pendingEventsCheckIntervalMs: Configurable interval to automatically check and send pending events
  • processPendingEvents(): Manually trigger processing of all pending events
  • stopPendingEventsCheck(): Stop the periodic check for graceful shutdown
  • Events are automatically sorted and sent in chronological order (oldest first)
  • Only connects to RabbitMQ when there are actually pending events to process

🧪 Testing

@resilientmq/core includes a comprehensive automated testing strategy to ensure reliability, performance, and quality.

Test Suite Overview

| Test Type | Purpose | Execution Time | Coverage | |-----------|---------|----------------|----------| | Unit Tests | Fast, isolated component testing with mocks | < 30s | 70%+ code coverage | | Integration Tests | End-to-end testing with real RabbitMQ | < 5min | Full integration scenarios | | Stress Tests | High-volume and high-speed load testing | < 10min | Resilience validation | | Benchmarks | Performance measurement and regression detection | < 15min | Throughput & latency metrics |

Running Tests

# Run all tests
npm test

# Run specific test suites
npm run test:unit              # Unit tests only
npm run test:integration       # Integration tests (requires Docker)
npm run test:stress           # Stress tests (requires Docker)
npm run test:benchmark        # Performance benchmarks (requires Docker)

# Run with coverage
npm run test:coverage         # Generate coverage report
npm run coverage:check        # Validate coverage thresholds

# Quality checks
npm run benchmark:compare     # Check for performance regressions
npm run quality:check         # Run all quality gates

Test Infrastructure

  • Testcontainers: Automatic RabbitMQ container management for integration tests
  • Jest: Test framework with TypeScript support
  • Mocks & Helpers: Comprehensive test utilities for all components
  • Metrics Collection: Automated performance and resource usage tracking

CI/CD Integration

All tests run automatically on:

  • Every push to main/master/develop branches
  • Every pull request
  • Matrix testing across Node.js 18, 20, 22 and 24

Quality Gates:

  • ✅ Minimum 70% code coverage
  • ✅ All tests must pass
  • ✅ Performance regression < 10%
  • ✅ Error rate under load < 1%

For detailed testing documentation, see test/README.md.


👥 Contributors


📄 License

MIT