npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

@message-in-the-middle/store-mysql

v0.1.3

Published

MySQL/MariaDB message store implementation for message-middleware

Readme

@message-in-the-middle/store-mysql

⚠️ Work in Progress Is this library production-ready? No. Is this library safe? No. When will it be ready? Soon™ (maybe tomorrow, maybe never). Why is it public? Experiment

message-in-the-middle is to Express.js what your message queue processing is to HTTP request processing. Just as Express provides a middleware pattern for HTTP requests, this library provides a middleware pattern for processing queue messages.

Why This Exists

Processing queue messages usually means copy-pasting the same boilerplate: parse JSON, validate, log, retry, deduplicate, route to handlers. This library lets you compose that logic as middlewares.


MySQL/MariaDB message store implementation for message-middleware. Production-ready persistent storage with full ACID guarantees.

This package provides a robust, production-ready message store backed by MySQL or MariaDB with features like transactions, indexing, and schema versioning.

Features

  • 💾 Persistent Storage - Data survives restarts
  • 🔒 ACID Transactions - Full transactional support
  • 🚀 Indexed Queries - Fast lookups by status, error type, date
  • 📊 Schema Versioning - Safe migrations and upgrades
  • 🛡️ SQL Injection Protection - Parameterized queries and table name validation
  • 🎨 Full TypeScript Support - Complete type safety
  • 💉 Connection Pooling - Efficient connection management

Installation

# npm
npm install @message-in-the-middle/store-mysql mysql2

# pnpm
pnpm add @message-in-the-middle/store-mysql mysql2

# yarn
yarn add @message-in-the-middle/store-mysql mysql2

Required Peer Dependency: mysql2 ^3.0.0

Quick Start

1. Create Database and Tables

-- Create database
CREATE DATABASE message_store;

-- Use the database
USE message_store;

-- Create messages table
CREATE TABLE messages (
  id VARCHAR(255) PRIMARY KEY,
  status ENUM('PROCESSING', 'SUCCEEDED', 'FAILED', 'ARCHIVED') NOT NULL,
  message JSON NOT NULL,
  raw JSON,
  metadata JSON NOT NULL,
  attributes JSON NOT NULL,
  source JSON,
  error_message TEXT,
  error_stack TEXT,
  error_type VARCHAR(255),
  retry_count INT NOT NULL DEFAULT 0,
  created TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
  updated TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  completed TIMESTAMP NULL,

  INDEX idx_status (status),
  INDEX idx_error_type (error_type),
  INDEX idx_created (created),
  INDEX idx_updated (updated),
  INDEX idx_status_created (status, created)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;

-- Create metadata table for schema versioning
CREATE TABLE messages_metadata (
  meta_key VARCHAR(255) PRIMARY KEY,
  meta_value VARCHAR(255) NOT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;

-- Insert schema version
INSERT INTO messages_metadata (meta_key, meta_value) VALUES ('schema_version', '1');

Complete schema available: See ../../docs/persistence-sql-schema.md

2. Setup Connection Pool

import { createPool } from 'mysql2/promise';
import { MySQLMessageStore } from '@message-in-the-middle/store-mysql';

// Create MySQL connection pool
const pool = createPool({
  host: 'localhost',
  port: 3306,
  user: 'your_user',
  password: 'your_password',
  database: 'message_store',
  waitForConnections: true,
  connectionLimit: 10,
  queueLimit: 0
});

// Create store
const store = new MySQLMessageStore(pool);

3. Use with Persistence Middleware

import { MessageMiddlewareManager } from '@message-in-the-middle/core';
import { PersistenceInboundMiddleware } from '@message-in-the-middle/persistence-core';

const manager = new MessageMiddlewareManager();
manager.addInboundMiddleware(
  new PersistenceInboundMiddleware(store, {
    storeOn: ['error']  // Store failed messages
  })
);

Configuration

Basic Configuration

import { createPool } from 'mysql2/promise';
import { MySQLMessageStore } from '@message-in-the-middle/store-mysql';

const pool = createPool({
  host: 'localhost',
  user: 'user',
  password: 'password',
  database: 'message_store'
});

const store = new MySQLMessageStore(pool);

Custom Table Name

const store = new MySQLMessageStore(pool, {
  tableName: 'custom_messages'  // Default: 'messages'
});

Note: Table name is validated to prevent SQL injection. Only alphanumeric characters and underscores allowed.

Production Configuration

const pool = createPool({
  host: process.env.DB_HOST,
  port: parseInt(process.env.DB_PORT || '3306'),
  user: process.env.DB_USER,
  password: process.env.DB_PASSWORD,
  database: process.env.DB_NAME,

  // Connection pooling
  waitForConnections: true,
  connectionLimit: 10,
  queueLimit: 0,

  // Connection timeout
  connectTimeout: 10000,

  // Enable multiple statements (if needed)
  multipleStatements: false,

  // SSL/TLS (for production)
  ssl: {
    ca: fs.readFileSync('/path/to/ca-cert.pem')
  }
});

const store = new MySQLMessageStore(pool, {
  tableName: 'messages'
});

Querying Messages

Find by Status

import { MessageStatus } from '@message-in-the-middle/persistence-core';

// Find all failed messages
const failed = await store.findByStatus(MessageStatus.FAILED);

// With pagination and sorting
const recent = await store.findByStatus(MessageStatus.FAILED, {
  limit: 20,
  offset: 0,
  sortBy: 'created',
  sortOrder: 'desc'
});

// With date range
const lastWeek = await store.findByStatus(MessageStatus.FAILED, {
  startDate: new Date(Date.now() - 7 * 24 * 60 * 60 * 1000),
  endDate: new Date()
});

Find by Error Type

// Find all validation errors
const errors = await store.findByError('ValidationError');

// With options
const recentErrors = await store.findByError('ValidationError', {
  limit: 10,
  startDate: new Date('2024-01-01')
});

Find by ID

const message = await store.findById('msg-123');

if (message) {
  console.log('Status:', message.status);
  console.log('Error:', message.errorMessage);
  console.log('Retry count:', message.retryCount);
}

Count Messages

// Total messages
const total = await store.count();

// Count failed messages
const failedCount = await store.count({
  status: MessageStatus.FAILED
});

// Count by error type
const validationErrors = await store.count({
  errorType: 'ValidationError'
});

// Count in date range
const todayCount = await store.count({
  startDate: new Date(Date.now() - 24 * 60 * 60 * 1000)
});

Message Replay

import { MessageReplayManager } from '@message-in-the-middle/persistence-core';

const replayManager = new MessageReplayManager(store, pipeline);

// Replay failed messages
const result = await replayManager.replayFailed({ limit: 100 });
console.log(`Replayed ${result.succeeded} successfully`);

// Replay specific error type
await replayManager.replayByErrorType('PaymentGatewayError');

Complete Example

import { createPool } from 'mysql2/promise';
import {
  MessageMiddlewareManager,
  ParseJsonInboundMiddleware,
  RetryInboundMiddleware
} from '@message-in-the-middle/core';
import {
  PersistenceInboundMiddleware,
  MessageReplayManager,
  MessageStatus
} from '@message-in-the-middle/persistence-core';
import { MySQLMessageStore } from '@message-in-the-middle/store-mysql';

// Setup database connection
const pool = createPool({
  host: 'localhost',
  user: 'user',
  password: 'password',
  database: 'message_store',
  connectionLimit: 10
});

// Create store
const store = new MySQLMessageStore(pool);

// Create manager with persistence
const manager = new MessageMiddlewareManager();
manager
  .addInboundMiddleware(new PersistenceInboundMiddleware(store, {
    storeOn: ['error']  // Store failed messages
  }))
  .addInboundMiddleware(new ParseJsonInboundMiddleware())
  .addInboundMiddleware(new RetryInboundMiddleware({ maxRetries: 3 }));

// Process messages
try {
  await manager.processInbound(messageBody);
} catch (error) {
  console.error('Processing failed, message stored in MySQL');
}

// Query failed messages
const failed = await store.findByStatus(MessageStatus.FAILED, {
  limit: 10,
  sortBy: 'created',
  sortOrder: 'desc'
});

console.log(`Found ${failed.length} failed messages`);

// Replay failures
const replayManager = new MessageReplayManager(store, manager);
const result = await replayManager.replayFailed({ limit: 10 });
console.log(`Replayed: ${result.succeeded} succeeded, ${result.failed} failed`);

// Cleanup on shutdown
process.on('SIGTERM', async () => {
  await store.destroy();
  await pool.end();
  process.exit(0);
});

Schema Management

Schema Versioning

The store tracks schema versions in a metadata table:

-- Check current schema version
SELECT meta_value FROM messages_metadata WHERE meta_key = 'schema_version';

Current schema version: 1

Migrations

When upgrading the library, check for schema changes in the changelog. Migration scripts will be provided for breaking schema changes.

Custom Schema

If you need to customize the schema:

  1. Start with the base schema from docs/persistence-sql-schema.md
  2. Add your custom columns
  3. Ensure core columns remain unchanged
  4. Update indexes for your query patterns

Performance Optimization

Indexes

The default schema includes these indexes:

  • PRIMARY KEY (id) - Fast lookups by ID
  • INDEX idx_status (status) - Fast status queries
  • INDEX idx_error_type (error_type) - Fast error type queries
  • INDEX idx_created (created) - Date range queries
  • INDEX idx_status_created (status, created) - Combined queries

Connection Pooling

Configure pool size based on your workload:

const pool = createPool({
  // ...
  connectionLimit: 10,  // Max connections
  queueLimit: 0,        // Unlimited queue
  waitForConnections: true
});

Guidelines:

  • Low traffic: 5-10 connections
  • Medium traffic: 10-20 connections
  • High traffic: 20-50 connections

Query Optimization

For large tables, consider:

  1. Partition by date - Partition the messages table by created date
  2. Archive old data - Move old SUCCEEDED messages to archive table
  3. Limit queries - Always use limit in production
  4. Use indexes - Ensure queries use indexes (check with EXPLAIN)

Data Retention

Manual Cleanup

// Delete old succeeded messages
await pool.execute(`
  DELETE FROM messages
  WHERE status = 'SUCCEEDED'
    AND created < DATE_SUB(NOW(), INTERVAL 30 DAY)
  LIMIT 1000
`);

Automated Cleanup (Cron Job)

-- Create event for daily cleanup (MySQL 5.1+)
CREATE EVENT IF NOT EXISTS cleanup_old_messages
ON SCHEDULE EVERY 1 DAY
DO
  DELETE FROM messages
  WHERE status = 'SUCCEEDED'
    AND created < DATE_SUB(NOW(), INTERVAL 30 DAY)
  LIMIT 10000;

High Availability

Master-Replica Setup

Use read replicas for queries:

// Write pool (master)
const writePool = createPool({
  host: 'master.db.example.com',
  // ...
});

// Read pool (replica)
const readPool = createPool({
  host: 'replica.db.example.com',
  // ...
});

// Use write pool for store
const store = new MySQLMessageStore(writePool);

// Use read pool for queries
const messages = await readPool.execute(
  'SELECT * FROM messages WHERE status = ? LIMIT ?',
  [MessageStatus.FAILED, 100]
);

Backup Strategy

  1. Regular backups: Daily full backups
  2. Incremental backups: Hourly binlog backups
  3. Test restores: Monthly restore tests
  4. Offsite storage: Store backups in different region

Security

SQL Injection Prevention

The store uses:

  • Parameterized queries - All user input is parameterized
  • Table name validation - Only safe table names allowed
  • No dynamic SQL - No string concatenation of queries

Connection Security

const pool = createPool({
  // ...
  ssl: {
    ca: fs.readFileSync('/path/to/ca-cert.pem'),
    cert: fs.readFileSync('/path/to/client-cert.pem'),
    key: fs.readFileSync('/path/to/client-key.pem')
  }
});

Access Control

-- Create dedicated user with minimal permissions
CREATE USER 'msg_store_user'@'%' IDENTIFIED BY 'secure_password';
GRANT SELECT, INSERT, UPDATE, DELETE ON message_store.messages TO 'msg_store_user'@'%';
GRANT SELECT, INSERT ON message_store.messages_metadata TO 'msg_store_user'@'%';
FLUSH PRIVILEGES;

Troubleshooting

Connection Issues

// Add error handling
pool.on('error', (err) => {
  console.error('MySQL pool error:', err);
  if (err.code === 'PROTOCOL_CONNECTION_LOST') {
    // Handle reconnection
  }
});

Slow Queries

-- Enable slow query log
SET GLOBAL slow_query_log = 'ON';
SET GLOBAL long_query_time = 2;

-- Check slow queries
SELECT * FROM mysql.slow_log ORDER BY start_time DESC LIMIT 10;

Large Tables

If the messages table grows very large:

  1. Add partitioning by date
  2. Archive old data
  3. Increase innodb_buffer_pool_size
  4. Use LIMIT in all queries

MariaDB Compatibility

This package works with both MySQL and MariaDB. Tested versions:

  • MySQL: 5.7, 8.0, 8.1
  • MariaDB: 10.6, 10.11, 11.0

Related Packages

Documentation

License

MIT

Links