@message-in-the-middle/persistence-core
v0.1.3
Published
Persistence interfaces and replay manager for message-middleware
Maintainers
Readme
@message-in-the-middle/persistence-core
⚠️ Work in Progress Is this library production-ready? No. Is this library safe? No. When will it be ready? Soon™ (maybe tomorrow, maybe never). Why is it public? Experiment
message-in-the-middle is to Express.js what your message queue processing is to HTTP request processing. Just as Express provides a middleware pattern for HTTP requests, this library provides a middleware pattern for processing queue messages.
Why This Exists
Processing queue messages usually means copy-pasting the same boilerplate: parse JSON, validate, log, retry, deduplicate, route to handlers. This library lets you compose that logic as middlewares.
Persistence interfaces and replay manager for message-middleware. Store, query, and replay messages for debugging, audit trails, and compliance.
This package provides the core persistence interfaces, middleware, and replay functionality. For actual storage implementations, use specific store packages like @message-in-the-middle/store-memory or @message-in-the-middle/store-mysql.
Features
- 📦 Store Interfaces - Standard interfaces for message persistence
- 🔄 Message Replay - Replay failed or stored messages
- 🎯 Persistence Middleware - Automatically store messages during processing
- 🔍 Query API - Find messages by status, error type, date range
- 📊 Message Status Tracking - Track PROCESSING, SUCCEEDED, FAILED, ARCHIVED states
- 🎨 TypeScript - Full type safety
Installation
# npm
npm install @message-in-the-middle/persistence-core @message-in-the-middle/core
# pnpm
pnpm add @message-in-the-middle/persistence-core @message-in-the-middle/core
# yarn
yarn add @message-in-the-middle/persistence-core @message-in-the-middle/coreNote: This package only provides interfaces. You also need a store implementation:
# For development/testing
pnpm add @message-in-the-middle/store-memory
# For production
pnpm add @message-in-the-middle/store-mysqlQuick Start
1. Setup Store and Middleware
import { MessageMiddlewareManager } from '@message-in-the-middle/core';
import { PersistenceInboundMiddleware } from '@message-in-the-middle/persistence-core';
import { InMemoryMessageStore } from '@message-in-the-middle/store-memory';
// Create store
const store = new InMemoryMessageStore();
// Create manager and add persistence
const manager = new MessageMiddlewareManager();
manager.addInboundMiddleware(
new PersistenceInboundMiddleware(store, {
storeOn: ['error'] // Store only failed messages
})
);
// Add your other middlewares...
manager.addInboundMiddleware(new ParseJsonInboundMiddleware());2. Query Stored Messages
import { MessageStatus } from '@message-in-the-middle/persistence-core';
// Find failed messages
const failed = await store.findByStatus(MessageStatus.FAILED);
// Find by error type
const validationErrors = await store.findByError('ValidationError');
// Find specific message
const message = await store.findById('msg-123');3. Replay Failed Messages
import { MessageReplayManager } from '@message-in-the-middle/persistence-core';
// Create replay manager
const replayManager = new MessageReplayManager(store, pipeline);
// Replay all failed messages
const result = await replayManager.replayFailed({ limit: 50 });
console.log(`Replayed: ${result.succeeded} succeeded, ${result.failed} failed`);
// Replay specific error types
await replayManager.replayByErrorType('ValidationError');
// Replay single message
await replayManager.replayOne('message-id-123');Storage Stages
Control when messages are stored using the storeOn option:
Store Only Errors (Default)
new PersistenceInboundMiddleware(store, {
storeOn: ['error'] // Store only failed messages
})Use case: Debugging production issues, minimal storage overhead
Store Only Successful Messages
new PersistenceInboundMiddleware(store, {
storeOn: ['success'] // Store only successful messages
})Use case: Audit trail of completed operations
Store on Entry
new PersistenceInboundMiddleware(store, {
storeOn: ['entry'] // Store as PROCESSING when message arrives
})Use case: Track in-progress messages
Full Audit Trail
new PersistenceInboundMiddleware(store, {
storeOn: ['always'] // Store on entry and update on completion
})Use case: Complete audit trail, compliance requirements
Message Status Lifecycle
Messages progress through these states:
PROCESSING → SUCCEEDED → ARCHIVED
↓
FAILEDStatus Descriptions:
PROCESSING- Message is currently being processedSUCCEEDED- Message processed successfullyFAILED- Message processing failedARCHIVED- Message archived after successful processing
Query API
Find by Status
import { MessageStatus } from '@message-in-the-middle/persistence-core';
// Find all failed messages
const failed = await store.findByStatus(MessageStatus.FAILED);
// With pagination
const recent = await store.findByStatus(MessageStatus.FAILED, {
limit: 20,
offset: 0,
sortBy: 'created',
sortOrder: 'desc'
});
// With date range
const lastWeek = await store.findByStatus(MessageStatus.FAILED, {
startDate: new Date(Date.now() - 7 * 24 * 60 * 60 * 1000),
endDate: new Date()
});Find by Error Type
// Find all validation errors
const validationErrors = await store.findByError('ValidationError');
// With options
const recentErrors = await store.findByError('ValidationError', {
limit: 10,
startDate: new Date('2024-01-01')
});Find by ID
const message = await store.findById('msg-123');
if (message) {
console.log('Status:', message.status);
console.log('Attempts:', message.retryCount);
console.log('Error:', message.errorMessage);
}Count Messages
// Count by status
const failedCount = await store.count({ status: MessageStatus.FAILED });
// Count by error type
const validationErrorCount = await store.count({
errorType: 'ValidationError'
});
// Count in date range
const todayCount = await store.count({
startDate: new Date(Date.now() - 24 * 60 * 60 * 1000)
});Replay Operations
Replay Failed Messages
const replayManager = new MessageReplayManager(store, pipeline);
// Replay all failed messages
const result = await replayManager.replayFailed({ limit: 100 });
console.log(`Total: ${result.total}`);
console.log(`Succeeded: ${result.succeeded}`);
console.log(`Failed: ${result.failed}`);
console.log(`Skipped: ${result.skipped}`);Replay by Error Type
// Replay specific error types
await replayManager.replayByErrorType('ValidationError');
// After fixing a bug, replay those failures
await replayManager.replayByErrorType('PaymentGatewayError');Replay Single Message
// Replay one message by ID
const success = await replayManager.replayOne('message-id-123');
if (success) {
console.log('Message replayed successfully');
} else {
console.log('Message replay failed');
}Advanced Replay Options
await replayManager.replayFailed({
limit: 50,
offset: 0,
startDate: new Date('2024-01-01'),
endDate: new Date('2024-01-31'),
sortBy: 'created',
sortOrder: 'desc',
concurrency: 5 // Process 5 messages concurrently
});Custom ID Generation
Customize how message IDs are generated:
new PersistenceInboundMiddleware(store, {
storeOn: ['error'],
idGenerator: (ctx) => `${ctx.metadata.queueName}-${Date.now()}`
})Source Tracking
Track message source for multi-queue systems:
new PersistenceInboundMiddleware(store, {
storeOn: ['error'],
sourceExtractor: (ctx) => ({
queue: ctx.metadata.queueName,
exchange: ctx.metadata.exchange,
routingKey: ctx.metadata.routingKey
})
})Implementing a Custom Store
Implement the MessageStore interface to create a custom storage backend:
import { MessageStore, StoredMessage, MessageStatus } from '@message-in-the-middle/persistence-core';
export class CustomMessageStore implements MessageStore {
async save(message: StoredMessage): Promise<void> {
// Your storage logic
}
async findById(id: string): Promise<StoredMessage | null> {
// Your query logic
}
async findByStatus(
status: MessageStatus,
options?: QueryOptions
): Promise<StoredMessage[]> {
// Your query logic
}
async findByError(
errorType: string,
options?: QueryOptions
): Promise<StoredMessage[]> {
// Your query logic
}
async updateStatus(
id: string,
updates: Partial<StoredMessage>
): Promise<void> {
// Your update logic
}
async count(filters?: QueryFilters): Promise<number> {
// Your count logic
}
async delete(id: string): Promise<void> {
// Your delete logic
}
async destroy(): Promise<void> {
// Cleanup resources
}
}TypeScript Types
Core Types
// Message status enum
enum MessageStatus {
PROCESSING = 'PROCESSING',
SUCCEEDED = 'SUCCEEDED',
FAILED = 'FAILED',
ARCHIVED = 'ARCHIVED'
}
// Stored message
interface StoredMessage {
id: string;
status: MessageStatus;
message: any;
raw?: any;
metadata: Record<string, any>;
attributes: Record<string, any>;
source?: Record<string, any>;
errorMessage?: string;
errorStack?: string;
errorType?: string;
retryCount: number;
created: Date;
updated: Date;
completed?: Date;
}
// Query options
interface QueryOptions {
limit?: number;
offset?: number;
sortBy?: 'created' | 'updated' | 'completed';
sortOrder?: 'asc' | 'desc';
startDate?: Date;
endDate?: Date;
}
// Replay result
interface ReplayResult {
total: number;
succeeded: number;
failed: number;
skipped: number;
}Available Store Implementations
In-Memory Store (Development/Testing)
pnpm add @message-in-the-middle/store-memoryFeatures:
- Fast, in-memory storage
- No external dependencies
- Bounded size with automatic eviction
- Perfect for development and testing
⚠️ Not for production - Data is lost on restart
MySQL Store (Production)
pnpm add @message-in-the-middle/store-mysqlFeatures:
- Persistent storage
- ACID transactions
- Indexing for fast queries
- Schema versioning
Best for: Production workloads, compliance requirements
See individual package READMEs for detailed setup instructions.
Complete Example
import { MessageMiddlewareManager } from '@message-in-the-middle/core';
import {
PersistenceInboundMiddleware,
MessageReplayManager,
MessageStatus
} from '@message-in-the-middle/persistence-core';
import { InMemoryMessageStore } from '@message-in-the-middle/store-memory';
// Create store
const store = new InMemoryMessageStore();
// Create manager with persistence
const manager = new MessageMiddlewareManager();
manager
.addInboundMiddleware(new PersistenceInboundMiddleware(store, {
storeOn: ['error']
}))
.addInboundMiddleware(new ParseJsonInboundMiddleware())
.addInboundMiddleware(new ValidateInboundMiddleware(validator));
// Process messages...
try {
await manager.processInbound(message);
} catch (error) {
// Failed messages are automatically stored
}
// Later, query failed messages
const failed = await store.findByStatus(MessageStatus.FAILED);
console.log(`Found ${failed.length} failed messages`);
// Replay them
const replayManager = new MessageReplayManager(store, manager);
const result = await replayManager.replayFailed({ limit: 10 });
console.log(`Replayed ${result.succeeded} successfully`);Best Practices
- Choose the Right Store: Use in-memory for dev/testing, MySQL/PostgreSQL for production
- Storage Strategy: Store only what you need (errors vs full audit trail)
- Cleanup Old Messages: Implement retention policies
- Monitor Storage Size: Set up alerts for storage growth
- Test Replay Logic: Ensure replayed messages can be processed correctly
Related Packages
- @message-in-the-middle/core - Core library (required)
- @message-in-the-middle/store-memory - In-memory store for dev/testing
- @message-in-the-middle/store-mysql - MySQL store for production
Documentation
- Main README - Complete documentation
- SQL Schema - SQL database schema
- Architecture - Design patterns
- Contributing - How to contribute
License
MIT
