npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

@message-in-the-middle/persistence-core

v0.1.3

Published

Persistence interfaces and replay manager for message-middleware

Readme

@message-in-the-middle/persistence-core

⚠️ Work in Progress Is this library production-ready? No. Is this library safe? No. When will it be ready? Soon™ (maybe tomorrow, maybe never). Why is it public? Experiment

message-in-the-middle is to Express.js what your message queue processing is to HTTP request processing. Just as Express provides a middleware pattern for HTTP requests, this library provides a middleware pattern for processing queue messages.

Why This Exists

Processing queue messages usually means copy-pasting the same boilerplate: parse JSON, validate, log, retry, deduplicate, route to handlers. This library lets you compose that logic as middlewares.


Persistence interfaces and replay manager for message-middleware. Store, query, and replay messages for debugging, audit trails, and compliance.

This package provides the core persistence interfaces, middleware, and replay functionality. For actual storage implementations, use specific store packages like @message-in-the-middle/store-memory or @message-in-the-middle/store-mysql.

Features

  • 📦 Store Interfaces - Standard interfaces for message persistence
  • 🔄 Message Replay - Replay failed or stored messages
  • 🎯 Persistence Middleware - Automatically store messages during processing
  • 🔍 Query API - Find messages by status, error type, date range
  • 📊 Message Status Tracking - Track PROCESSING, SUCCEEDED, FAILED, ARCHIVED states
  • 🎨 TypeScript - Full type safety

Installation

# npm
npm install @message-in-the-middle/persistence-core @message-in-the-middle/core

# pnpm
pnpm add @message-in-the-middle/persistence-core @message-in-the-middle/core

# yarn
yarn add @message-in-the-middle/persistence-core @message-in-the-middle/core

Note: This package only provides interfaces. You also need a store implementation:

# For development/testing
pnpm add @message-in-the-middle/store-memory

# For production
pnpm add @message-in-the-middle/store-mysql

Quick Start

1. Setup Store and Middleware

import { MessageMiddlewareManager } from '@message-in-the-middle/core';
import { PersistenceInboundMiddleware } from '@message-in-the-middle/persistence-core';
import { InMemoryMessageStore } from '@message-in-the-middle/store-memory';

// Create store
const store = new InMemoryMessageStore();

// Create manager and add persistence
const manager = new MessageMiddlewareManager();
manager.addInboundMiddleware(
  new PersistenceInboundMiddleware(store, {
    storeOn: ['error']  // Store only failed messages
  })
);

// Add your other middlewares...
manager.addInboundMiddleware(new ParseJsonInboundMiddleware());

2. Query Stored Messages

import { MessageStatus } from '@message-in-the-middle/persistence-core';

// Find failed messages
const failed = await store.findByStatus(MessageStatus.FAILED);

// Find by error type
const validationErrors = await store.findByError('ValidationError');

// Find specific message
const message = await store.findById('msg-123');

3. Replay Failed Messages

import { MessageReplayManager } from '@message-in-the-middle/persistence-core';

// Create replay manager
const replayManager = new MessageReplayManager(store, pipeline);

// Replay all failed messages
const result = await replayManager.replayFailed({ limit: 50 });
console.log(`Replayed: ${result.succeeded} succeeded, ${result.failed} failed`);

// Replay specific error types
await replayManager.replayByErrorType('ValidationError');

// Replay single message
await replayManager.replayOne('message-id-123');

Storage Stages

Control when messages are stored using the storeOn option:

Store Only Errors (Default)

new PersistenceInboundMiddleware(store, {
  storeOn: ['error']  // Store only failed messages
})

Use case: Debugging production issues, minimal storage overhead

Store Only Successful Messages

new PersistenceInboundMiddleware(store, {
  storeOn: ['success']  // Store only successful messages
})

Use case: Audit trail of completed operations

Store on Entry

new PersistenceInboundMiddleware(store, {
  storeOn: ['entry']  // Store as PROCESSING when message arrives
})

Use case: Track in-progress messages

Full Audit Trail

new PersistenceInboundMiddleware(store, {
  storeOn: ['always']  // Store on entry and update on completion
})

Use case: Complete audit trail, compliance requirements

Message Status Lifecycle

Messages progress through these states:

PROCESSING  → SUCCEEDED  → ARCHIVED
            ↓
            FAILED

Status Descriptions:

  • PROCESSING - Message is currently being processed
  • SUCCEEDED - Message processed successfully
  • FAILED - Message processing failed
  • ARCHIVED - Message archived after successful processing

Query API

Find by Status

import { MessageStatus } from '@message-in-the-middle/persistence-core';

// Find all failed messages
const failed = await store.findByStatus(MessageStatus.FAILED);

// With pagination
const recent = await store.findByStatus(MessageStatus.FAILED, {
  limit: 20,
  offset: 0,
  sortBy: 'created',
  sortOrder: 'desc'
});

// With date range
const lastWeek = await store.findByStatus(MessageStatus.FAILED, {
  startDate: new Date(Date.now() - 7 * 24 * 60 * 60 * 1000),
  endDate: new Date()
});

Find by Error Type

// Find all validation errors
const validationErrors = await store.findByError('ValidationError');

// With options
const recentErrors = await store.findByError('ValidationError', {
  limit: 10,
  startDate: new Date('2024-01-01')
});

Find by ID

const message = await store.findById('msg-123');

if (message) {
  console.log('Status:', message.status);
  console.log('Attempts:', message.retryCount);
  console.log('Error:', message.errorMessage);
}

Count Messages

// Count by status
const failedCount = await store.count({ status: MessageStatus.FAILED });

// Count by error type
const validationErrorCount = await store.count({
  errorType: 'ValidationError'
});

// Count in date range
const todayCount = await store.count({
  startDate: new Date(Date.now() - 24 * 60 * 60 * 1000)
});

Replay Operations

Replay Failed Messages

const replayManager = new MessageReplayManager(store, pipeline);

// Replay all failed messages
const result = await replayManager.replayFailed({ limit: 100 });
console.log(`Total: ${result.total}`);
console.log(`Succeeded: ${result.succeeded}`);
console.log(`Failed: ${result.failed}`);
console.log(`Skipped: ${result.skipped}`);

Replay by Error Type

// Replay specific error types
await replayManager.replayByErrorType('ValidationError');

// After fixing a bug, replay those failures
await replayManager.replayByErrorType('PaymentGatewayError');

Replay Single Message

// Replay one message by ID
const success = await replayManager.replayOne('message-id-123');

if (success) {
  console.log('Message replayed successfully');
} else {
  console.log('Message replay failed');
}

Advanced Replay Options

await replayManager.replayFailed({
  limit: 50,
  offset: 0,
  startDate: new Date('2024-01-01'),
  endDate: new Date('2024-01-31'),
  sortBy: 'created',
  sortOrder: 'desc',
  concurrency: 5  // Process 5 messages concurrently
});

Custom ID Generation

Customize how message IDs are generated:

new PersistenceInboundMiddleware(store, {
  storeOn: ['error'],
  idGenerator: (ctx) => `${ctx.metadata.queueName}-${Date.now()}`
})

Source Tracking

Track message source for multi-queue systems:

new PersistenceInboundMiddleware(store, {
  storeOn: ['error'],
  sourceExtractor: (ctx) => ({
    queue: ctx.metadata.queueName,
    exchange: ctx.metadata.exchange,
    routingKey: ctx.metadata.routingKey
  })
})

Implementing a Custom Store

Implement the MessageStore interface to create a custom storage backend:

import { MessageStore, StoredMessage, MessageStatus } from '@message-in-the-middle/persistence-core';

export class CustomMessageStore implements MessageStore {
  async save(message: StoredMessage): Promise<void> {
    // Your storage logic
  }

  async findById(id: string): Promise<StoredMessage | null> {
    // Your query logic
  }

  async findByStatus(
    status: MessageStatus,
    options?: QueryOptions
  ): Promise<StoredMessage[]> {
    // Your query logic
  }

  async findByError(
    errorType: string,
    options?: QueryOptions
  ): Promise<StoredMessage[]> {
    // Your query logic
  }

  async updateStatus(
    id: string,
    updates: Partial<StoredMessage>
  ): Promise<void> {
    // Your update logic
  }

  async count(filters?: QueryFilters): Promise<number> {
    // Your count logic
  }

  async delete(id: string): Promise<void> {
    // Your delete logic
  }

  async destroy(): Promise<void> {
    // Cleanup resources
  }
}

TypeScript Types

Core Types

// Message status enum
enum MessageStatus {
  PROCESSING = 'PROCESSING',
  SUCCEEDED = 'SUCCEEDED',
  FAILED = 'FAILED',
  ARCHIVED = 'ARCHIVED'
}

// Stored message
interface StoredMessage {
  id: string;
  status: MessageStatus;
  message: any;
  raw?: any;
  metadata: Record<string, any>;
  attributes: Record<string, any>;
  source?: Record<string, any>;
  errorMessage?: string;
  errorStack?: string;
  errorType?: string;
  retryCount: number;
  created: Date;
  updated: Date;
  completed?: Date;
}

// Query options
interface QueryOptions {
  limit?: number;
  offset?: number;
  sortBy?: 'created' | 'updated' | 'completed';
  sortOrder?: 'asc' | 'desc';
  startDate?: Date;
  endDate?: Date;
}

// Replay result
interface ReplayResult {
  total: number;
  succeeded: number;
  failed: number;
  skipped: number;
}

Available Store Implementations

In-Memory Store (Development/Testing)

pnpm add @message-in-the-middle/store-memory

Features:

  • Fast, in-memory storage
  • No external dependencies
  • Bounded size with automatic eviction
  • Perfect for development and testing

⚠️ Not for production - Data is lost on restart

MySQL Store (Production)

pnpm add @message-in-the-middle/store-mysql

Features:

  • Persistent storage
  • ACID transactions
  • Indexing for fast queries
  • Schema versioning

Best for: Production workloads, compliance requirements

See individual package READMEs for detailed setup instructions.

Complete Example

import { MessageMiddlewareManager } from '@message-in-the-middle/core';
import {
  PersistenceInboundMiddleware,
  MessageReplayManager,
  MessageStatus
} from '@message-in-the-middle/persistence-core';
import { InMemoryMessageStore } from '@message-in-the-middle/store-memory';

// Create store
const store = new InMemoryMessageStore();

// Create manager with persistence
const manager = new MessageMiddlewareManager();
manager
  .addInboundMiddleware(new PersistenceInboundMiddleware(store, {
    storeOn: ['error']
  }))
  .addInboundMiddleware(new ParseJsonInboundMiddleware())
  .addInboundMiddleware(new ValidateInboundMiddleware(validator));

// Process messages...
try {
  await manager.processInbound(message);
} catch (error) {
  // Failed messages are automatically stored
}

// Later, query failed messages
const failed = await store.findByStatus(MessageStatus.FAILED);
console.log(`Found ${failed.length} failed messages`);

// Replay them
const replayManager = new MessageReplayManager(store, manager);
const result = await replayManager.replayFailed({ limit: 10 });
console.log(`Replayed ${result.succeeded} successfully`);

Best Practices

  1. Choose the Right Store: Use in-memory for dev/testing, MySQL/PostgreSQL for production
  2. Storage Strategy: Store only what you need (errors vs full audit trail)
  3. Cleanup Old Messages: Implement retention policies
  4. Monitor Storage Size: Set up alerts for storage growth
  5. Test Replay Logic: Ensure replayed messages can be processed correctly

Related Packages

Documentation

License

MIT

Links