@resilientmq/core
v2.3.1
Published
Core logic for resilient RabbitMQ event processing
Maintainers
Readme
🆕 Ignoring events with controlled errors
You can throw the IgnoredEventError from an event handler to indicate that the event should be ignored and marked as successfully processed, even if the normal logic was not executed. This is useful for cases where certain events should not be retried or sent to the DLQ.
import { IgnoredEventError } from '@resilientmq/core/dist/resilience';
const consumer = new ResilientConsumer({
// ...configuration
eventsToProcess: [
{
type: 'user.created',
handler: async (event) => {
if (event.payload.ignore) {
throw new IgnoredEventError('Event ignored by business rule');
}
// normal logic...
}
}
]
});When this error is thrown, the event is marked as DONE and will not be retried or sent to the DLQ.
@resilientmq/core
Core logic for the resilient message queue system built on top of RabbitMQ, providing middleware support, retry logic, dead-letter handling, and persistent event lifecycle management.
Table of Contents
- 📦 Installation
- 📚 Purpose
- 🧩 Main Concepts
- 🔧 Config: ResilientConsumerConfig
- 🔧 Config: ResilientPublisherConfig
- 🧩 Custom Event Storage Format
- 🚀 Example: Consumer
- 🚀 Example: Publisher
- 📊 Metrics
- 🧪 Testing
- 👥 Contributors
- 📄 License
📦 Installation
npm install @resilientmq/coreMake sure to also install the core types package:
npm install @resilientmq/types__core📚 Purpose
This package contains the runtime logic for publishing and consuming resilient events. It includes:
- A pluggable consumer with retry + DLQ logic
- Publisher with persist-before-send safety
- Middleware pipeline
- Custom logger
- Full TypeScript support
v2.3.0 Highlights
- Publisher lane isolation: realtime publish lane and pending/retry lane can run on separate RabbitMQ pools.
- Automatic idempotent persistence path when your store implements
saveEventIfNotExists. - EWMA-based adaptive pending concurrency (latency + error-rate) with configurable thresholds.
- Expanded unit-test coverage for publisher architecture and adaptive controls.
🧩 Main Concepts
| Feature | Description |
|--------|-------------|
| publish(event) | Publishes a message safely to a queue or exchange |
| consume(handler) | Starts a consumer to process incoming messages |
| ResilientConsumer | Handles connection, retry, DLQ, and auto-reconnect |
| ResilientEventPublisher | Publishes events with status persistence |
| log(level, message) | Unified logging mechanism |
| Middleware | Custom logic pipeline on message consumption |
🔧 Config: ResilientConsumerConfig
| Property | Type | Required | Description | Subtype Fields |
|----------------------------|-----------------------------|----------|------------------------------------|----------------|
| connection | string \| Options.Connect | ✅ | RabbitMQ URI or connection config | – |
| consumeQueue.queue | string | ✅ | Queue name to consume | – |
| consumeQueue.options | AssertQueueOptions | ✅ | Queue assertion options | durable, arguments |
| consumeQueue.exchanges | ExchangeConfig[] | ❌ | exchanges to bind queue to | name, type, routingKey, options |
| retryQueue.queue | string | ❌ | Retry queue for failed messages | – |
| retryQueue.options | AssertQueueOptions | ❌ | Queue options | durable, arguments |
| retryQueue.exchange | ExchangeConfig | ❌ | Exchange for retry routing | name, type, routingKey, options |
| retryQueue.ttlMs | number | ❌ | Delay before retrying | – |
| retryQueue.maxAttempts | number | ❌ | Max retries before DLQ (default 5) | – |
| deadLetterQueue.queue | string | ❌ | Final destination after retries | – |
| deadLetterQueue.options | AssertQueueOptions | ❌ | DLQ queue options | durable |
| deadLetterQueue.exchange | ExchangeConfig | ❌ | DLQ exchange | name, type, routingKey, options |
| eventsToProcess | EventProcessConfig[] | ✅ | List of handled event types | type, handler |
| store | EventStore | ❌ | Persistent layer for events | saveEvent, saveEventIfNotExists (optional), getEvent, updateEventStatus, deleteEvent, getPendingEvents (optional), batchUpdateEventStatus (optional) |
| storeConnectionRetries | number | ❌ | Max retry attempts for store connection (default: 3) | – |
| storeConnectionRetryDelayMs | number | ❌ | Delay between store retry attempts in ms (default: 1000) | – |
| cleanupConsumerPrefetch | number | ❌ | Prefetch for secondary cleanup connection used when ignoreUnknownEvents is true (default: 500, set 0 to disable) | – |
| middleware | Middleware[] | ❌ | Hooks to wrap event execution | (event, next) => Promise |
| maxUptimeMs | number | ❌ | Restart consumer after X ms | – |
| exitIfIdle | boolean | ❌ | Exit process if idle | – |
| idleCheckIntervalMs | number | ❌ | Time between idle checks | – |
| maxIdleChecks | number | ❌ | How many checks until exit | – |
🔧 Config: ResilientPublisherConfig
| Property | Type | Required | Description |
|----------|------|----------|-------------|
| connection | string \| Options.Connect | ✅ | RabbitMQ URI or config |
| queue | string | ❌ | Target queue (direct publish) |
| exchange | ExchangeConfig | ❌ | Exchange for fanout/direct |
| store | EventStore | ❌* | Event metadata persistence (optional unless instantPublish is false) |
| instantPublish | boolean | ❌ | If true (default), publishes immediately. If false, stores for later delivery |
| pendingEventsCheckIntervalMs | number | ❌ | Interval to check and send pending events (ms). Only effective when instantPublish is false |
| maxConcurrentPublishes | number | ❌ | Global backpressure limit for concurrent publish operations (default: 100) |
| maxConnections | number | ❌ | Number of RabbitMQ connections in the pool for load distribution (default: 1) |
| separatePendingConnections | boolean | ❌ | Use a dedicated connection pool for pending/retry publishing (default: true) |
| pendingMaxConnections | number | ❌ | Connection count for pending/retry lane when separated (default: maxConnections) |
| pendingEventsBatchSize | number | ❌ | Default number of pending events fetched per batch when calling processPendingEvents() |
| pendingEventsMaxPublishesPerSecond | number | ❌ | Default max number of pending events dispatched per second during processPendingEvents() |
| pendingEventsMaxConcurrentPublishes | number | ❌ | Default max number of pending events published in parallel during processPendingEvents() |
| pendingAdaptiveConcurrency | boolean | ❌ | Enable adaptive pending concurrency control (default: true) |
| pendingAdaptiveEwmaAlpha | number | ❌ | EWMA alpha for adaptive signals, range (0,1] (default: 0.2) |
| pendingAdaptiveTargetLatencyMs | number | ❌ | Optional latency target for adaptive scaling (ms) |
| pendingAdaptiveErrorThresholdSoft | number | ❌ | Soft EWMA error-rate threshold for gradual backoff (default: 0.08) |
| pendingAdaptiveErrorThresholdHard | number | ❌ | Hard EWMA error-rate threshold for aggressive backoff (default: 0.2) |
| storeConnectionRetries | number | ❌ | Max retry attempts for store connection (default: 3) |
| storeConnectionRetryDelayMs | number | ❌ | Delay between store retry attempts in ms (default: 1000) |
| metricsEnabled | boolean | ❌ | Enable runtime metrics collection for the publisher |
Note: When instantPublish is set to false, a store with getPendingEvents() method is REQUIRED.
Idempotency optimization: If your store implements saveEventIfNotExists(event): Promise<boolean>, publisher automatically uses that single-write path; otherwise it falls back to getEvent + saveEvent.
🧩 Custom Event Storage Format
You can fully control how events are stored and retrieved by providing a serializer in your EventStore implementation.
This allows you to decouple the in-memory event format from the database structure — useful for legacy systems or when mapping to existing schemas.
🔄 Example: Custom Storage Serializer
const store: EventStore = {
serializer: {
toStorageFormat(event) {
return {
_id: event.id,
body: event.payload,
customStatus: event.status
};
},
fromStorageFormat(doc) {
return {
id: doc._id,
messageId: doc._id,
payload: doc.body,
status: doc.customStatus,
type: 'custom.type'
};
}
},
async saveEvent(event) {
const doc = this.serializer.toStorageFormat(event);
await db.insert(doc);
},
async getEvent(id) {
const doc = await db.findById(id);
return doc ? this.serializer.fromStorageFormat(doc) : null;
},
async updateEventStatus(id, status) {
await db.update(id, { customStatus: status });
},
async deleteEvent(id) {
await db.delete(id);
},
// Required for processPendingEvents() — supports optional `limit` for batched retrieval
async getPendingEvents(status, limit?) {
const query = db.find({ customStatus: status }).sort({ createdAt: 1 });
return limit ? query.limit(limit) : query;
}
};🚀 Example: Consumer
import { ResilientConsumer } from '@resilientmq/core';
import mongoose from 'mongoose';
const Event = mongoose.model('Event', new mongoose.Schema({ id: String }));
const store = {
saveEvent: async (e) => Event.create(e),
getEvent: async (id) => Event.findOne({ messageId: id }),
updateEventStatus: async (id, status) => Event.updateOne({ messageId: id }, { status }),
deleteEvent: async (id) => Event.deleteOne({ messageId: id })
};
const consumer = new ResilientConsumer({
connection: 'amqp://localhost',
consumeQueue: {
queue: 'user.queue',
options: { durable: true },
exchanges: [
{ name: 'orders.events', type: 'topic', routingKey: 'order.*', options: { durable: true } },
{ name: 'notifications.events', type: 'direct', routingKey: 'notification', options: { durable: true } }
]
},
eventsToProcess: [
{ type: 'user.created', handler: async (payload) => console.log('User created:', payload) },
{ type: 'order.placed', handler: async (payload) => console.log('Order placed:', payload) },
{ type: 'notification.sent', handler: async (payload) => console.log('Notification sent:', payload) }
],
store
});
await consumer.start();🚀 Example: Publisher
import { ResilientEventPublisher } from '@resilientmq/core';
const publisher = new ResilientEventPublisher({
connection: 'amqp://localhost',
exchange: {
name: 'user.events',
type: 'fanout',
options: { durable: true }
},
maxConnections: 3 // Use 3 connections for load distribution (optional, default: 1)
});
// IMPORTANT: routingKey is now taken from each event's `routingKey` field when publishing to an exchange.
// If the event does not include `routingKey`, the publisher will send the message with no routing key.
// The exchange configuration no longer provides the routing key for per-message routing.
await publisher.publish({
id: 'evt-1',
messageId: 'msg-1',
type: 'user.created',
payload: { name: 'Alice' },
status: 'PENDING_PUBLICATION',
// Optional per-message routing key;
routingKey: 'user.created'
});Publisher Without Store
import { ResilientEventPublisher } from '@resilientmq/core';
const publisher = new ResilientEventPublisher({
connection: 'amqp://localhost',
exchange: {
name: 'user.events',
type: 'fanout',
options: { durable: true }
},
maxConnections: 5 // Optional: distribute load across 5 connections
});
await publisher.publish({
id: 'evt-2',
messageId: 'msg-2',
type: 'user.updated',
payload: { name: 'Bob' },
status: 'PENDING_PUBLICATION'
});Publisher with Pending Events Processing
import { ResilientEventPublisher } from '@resilientmq/core';
const publisher = new ResilientEventPublisher({
connection: 'amqp://localhost',
exchange: {
name: 'user.events',
type: 'fanout',
options: { durable: true }
},
store: myEventStore,
// Check for pending events every 30 seconds
pendingEventsCheckIntervalMs: 30000,
// Realtime lane pool
maxConnections: 3,
// Dedicated pending/retry lane pool (default true)
separatePendingConnections: true,
pendingMaxConnections: 2,
// Adaptive pending concurrency tuning (all optional)
pendingAdaptiveConcurrency: true,
pendingAdaptiveEwmaAlpha: 0.2,
pendingAdaptiveErrorThresholdSoft: 0.08,
pendingAdaptiveErrorThresholdHard: 0.2
});
// Store event for later delivery (e.g., when offline)
await publisher.publish({
id: 'evt-3',
messageId: 'msg-3',
type: 'user.deleted',
payload: { id: '123' },
status: 'PENDING'
}, { storeOnly: true });
// The event will be automatically sent every 30 seconds if there are pending events
// Or manually process pending events
await publisher.processPendingEvents();
// Stop periodic checks when shutting down
publisher.stopPendingEventsCheck();Key Features:
storeOnly: true: Stores the event without sending it immediately (useful for offline scenarios)pendingEventsCheckIntervalMs: Configurable interval to automatically check and send pending eventsprocessPendingEvents(): Manually trigger processing of pending eventsstopPendingEventsCheck(): Stop the periodic check for graceful shutdownmaxConnections: Distribute load across multiple RabbitMQ connections (round-robin)separatePendingConnections+pendingMaxConnections: Isolate realtime vs pending/retry traffic at connection level- Events are processed in the order returned by your store (implement sorting there if needed)
- Only connects pending lane when there are pending events to process
⚡ Pending Events Processing with Rate Limiting (v2.1.5+)
processPendingEvents() supports configurable rate limiting to control throughput and prevent overwhelming your RabbitMQ broker or downstream systems.
Configuration options:
await publisher.processPendingEvents({
batchSize: 1000, // Number of events to retrieve per batch (default: 100)
maxPublishesPerSecond: 500, // Maximum events published per second (default: same as batchSize)
maxConcurrentPublishes: 10 // Maximum concurrent publish operations (default: min(10, maxPublishesPerSecond))
});How it works:
- Retrieves
batchSizepending events from the store - Publishes events using a token bucket algorithm to respect
maxPublishesPerSecond - Maintains up to
maxConcurrentPublishesparallel operations - Updates all statuses in a single batch operation
- Repeats until no more pending events
Token Bucket Algorithm:
- Provides smooth, continuous rate limiting (not fixed 1-second windows)
- Allows burst capacity while maintaining average rate
- Refills tokens continuously based on elapsed time
Performance characteristics:
- Can achieve 500-1000 msg/s with proper configuration
- Prevents broker overload with controlled throughput
- Batch status updates reduce database overhead by 90%
Important: Events are processed in the order returned by your store. If you need chronological ordering, implement sorting in your getPendingEvents() method:
async getPendingEvents(status: EventPublishStatus, limit?: number): Promise<EventMessage[]> {
const query = EventModel.find({ status }).sort({ createdAt: 1 }); // Sort by timestamp
return limit ? query.limit(limit).exec() : query.exec();
}Example: Implementing getPendingEvents with limit
// MongoDB / Mongoose example
async getPendingEvents(status: EventPublishStatus, limit?: number): Promise<EventMessage[]> {
const query = EventModel.find({ status }).sort({ createdAt: 1 });
if (limit) {
query.limit(limit);
}
return query.exec();
}
// Prisma example
async getPendingEvents(status: EventPublishStatus, limit?: number): Promise<EventMessage[]> {
return prisma.event.findMany({
where: { status },
orderBy: { createdAt: 'asc' },
...(limit ? { take: limit } : {})
});
}🚀 Critical: Implement Batch Status Updates (v2.1.2+)
For maximum performance, you MUST implement the optional batchUpdateEventStatus() method in your EventStore.
Benefits:
- 90% reduction in database calls: From 1000 individual updates/s to ~10 batched calls/s
- 10-100x throughput improvement: Process at maximum speed without database bottleneck
- Backward compatible: Falls back to individual updates if not implemented
// MongoDB / Mongoose example
async batchUpdateEventStatus(
updates: Array<{ event: EventMessage; status: EventPublishStatus }>
): Promise<void> {
const bulkOps = updates.map(({ event, status }) => ({
updateOne: {
filter: { messageId: event.messageId },
update: { $set: { status } }
}
}));
await EventModel.bulkWrite(bulkOps);
}
// Prisma example
async batchUpdateEventStatus(
updates: Array<{ event: EventMessage; status: EventPublishStatus }>
): Promise<void> {
await prisma.$transaction(
updates.map(({ event, status }) =>
prisma.event.update({
where: { messageId: event.messageId },
data: { status }
})
)
);
}
// SQL example (using Knex)
async batchUpdateEventStatus(
updates: Array<{ event: EventMessage; status: EventPublishStatus }>
): Promise<void> {
const trx = await db.transaction();
try {
for (const { event, status } of updates) {
await trx('events')
.where({ message_id: event.messageId })
.update({ status });
}
await trx.commit();
} catch (error) {
await trx.rollback();
throw error;
}
}How it works:
- The publisher batches status updates every 100ms during
processPendingEvents() - If
batchUpdateEventStatus()is implemented, it's used automatically - If not implemented or if it fails, falls back to individual
updateEventStatus()calls - No code changes needed in your publisher - just implement the method in your store
📊 Metrics
ResilientEventPublisher supports optional real-time metrics collection.
Enable it with metricsEnabled: true in the config, then call getMetrics() at any time to get
a snapshot.
import { ResilientConsumer, ResilientEventPublisher } from '@resilientmq/core';
// Consumer (metrics are disabled in runtime for minimal overhead)
const consumer = new ResilientConsumer({
connection: 'amqp://localhost',
consumeQueue: { queue: 'my.queue' },
eventsToProcess: [{ type: 'order.created', handler: async (e) => { /* ... */ } }],
metricsEnabled: true,
});
await consumer.start();
// Consumer runtime returns undefined for metrics by design
const snap = consumer.getMetrics();
console.log(snap); // undefined// Publisher with metrics
const publisher = new ResilientEventPublisher({
connection: 'amqp://localhost',
queue: 'my.queue',
metricsEnabled: true, // <-- enable metrics
});
await publisher.publish({ messageId: 'id-1', type: 'order.created', payload: {} });
const snap = publisher.getMetrics();
console.log(snap?.messagesPublished); // 1You can also use MetricsCollector standalone for custom instrumentation:
import { MetricsCollector } from '@resilientmq/core';
const metrics = new MetricsCollector();
metrics.increment('messagesReceived');
metrics.recordProcessingTime(42);
const snapshot = metrics.getSnapshot();
metrics.reset(); // reset all countersAvailable Metrics
| Field | Description |
|-------|-------------|
| messagesReceived | Total messages received by the consumer |
| messagesProcessed | Messages successfully processed |
| messagesRetried | Messages retried at least once |
| messagesFailed | Messages that failed permanently |
| messagesSentToDLQ | Messages sent to the Dead Letter Queue |
| messagesPublished | Messages published by the publisher |
| processingErrors | Total processing errors encountered |
| avgProcessingTimeMs | Average processing time in milliseconds |
| lastActivityAt | Timestamp of the last recorded activity |
getMetrics()returnsundefinedwhenmetricsEnabledis not set or isfalse.
🧪 Testing
@resilientmq/core includes a comprehensive automated testing strategy to ensure reliability, performance, and quality.
Test Suite Overview
| Test Type | Purpose | Execution Time | Coverage | |-----------|---------|----------------|----------| | Unit Tests | Fast, isolated component testing with mocks | < 30s | 70%+ code coverage (current suite at 100% for covered source set) | | Integration Tests | End-to-end testing with real RabbitMQ | < 5min | Full integration scenarios | | Stress Tests | High-volume and high-speed load testing | < 10min | Resilience validation | | Benchmarks | Performance measurement and regression detection | < 15min | Throughput & latency metrics |
Running Tests
# Run all tests
npm test
# Run specific test suites
npm run test:unit # Unit tests only
npm run test:integration # Integration tests (requires Docker)
npm run test:stress # Stress tests (requires Docker)
npm run test:benchmark # Performance benchmarks (requires Docker)
# Run with coverage
npm run test:coverage # Generate coverage report
npm run coverage:check # Validate coverage thresholds
# Quality checks
npm run benchmark:compare # Check for performance regressions
npm run quality:check # Run all quality gatesTest Infrastructure
- Testcontainers: Automatic RabbitMQ container management for integration tests
- Jest: Test framework with TypeScript support
- Mocks & Helpers: Comprehensive test utilities for all components
- Metrics Collection: Automated performance and resource usage tracking
CI/CD Integration
All tests run automatically on:
- Every push to main/master/develop branches
- Every pull request
- Matrix testing across Node.js 18, 20, 22, 24 and 25
Quality Gates:
- ✅ Minimum 70% code coverage
- ✅ All tests must pass
- ✅ Performance regression < 10%
- ✅ Error rate under load < 1%
For detailed testing documentation, see test/README.md.
