@message-in-the-middle/store-mysql
v0.1.3
Published
MySQL/MariaDB message store implementation for message-middleware
Maintainers
Readme
@message-in-the-middle/store-mysql
⚠️ Work in Progress Is this library production-ready? No. Is this library safe? No. When will it be ready? Soon™ (maybe tomorrow, maybe never). Why is it public? Experiment
message-in-the-middle is to Express.js what your message queue processing is to HTTP request processing. Just as Express provides a middleware pattern for HTTP requests, this library provides a middleware pattern for processing queue messages.
Why This Exists
Processing queue messages usually means copy-pasting the same boilerplate: parse JSON, validate, log, retry, deduplicate, route to handlers. This library lets you compose that logic as middlewares.
MySQL/MariaDB message store implementation for message-middleware. Production-ready persistent storage with full ACID guarantees.
This package provides a robust, production-ready message store backed by MySQL or MariaDB with features like transactions, indexing, and schema versioning.
Features
- 💾 Persistent Storage - Data survives restarts
- 🔒 ACID Transactions - Full transactional support
- 🚀 Indexed Queries - Fast lookups by status, error type, date
- 📊 Schema Versioning - Safe migrations and upgrades
- 🛡️ SQL Injection Protection - Parameterized queries and table name validation
- 🎨 Full TypeScript Support - Complete type safety
- 💉 Connection Pooling - Efficient connection management
Installation
# npm
npm install @message-in-the-middle/store-mysql mysql2
# pnpm
pnpm add @message-in-the-middle/store-mysql mysql2
# yarn
yarn add @message-in-the-middle/store-mysql mysql2Required Peer Dependency: mysql2 ^3.0.0
Quick Start
1. Create Database and Tables
-- Create database
CREATE DATABASE message_store;
-- Use the database
USE message_store;
-- Create messages table
CREATE TABLE messages (
id VARCHAR(255) PRIMARY KEY,
status ENUM('PROCESSING', 'SUCCEEDED', 'FAILED', 'ARCHIVED') NOT NULL,
message JSON NOT NULL,
raw JSON,
metadata JSON NOT NULL,
attributes JSON NOT NULL,
source JSON,
error_message TEXT,
error_stack TEXT,
error_type VARCHAR(255),
retry_count INT NOT NULL DEFAULT 0,
created TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
completed TIMESTAMP NULL,
INDEX idx_status (status),
INDEX idx_error_type (error_type),
INDEX idx_created (created),
INDEX idx_updated (updated),
INDEX idx_status_created (status, created)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
-- Create metadata table for schema versioning
CREATE TABLE messages_metadata (
meta_key VARCHAR(255) PRIMARY KEY,
meta_value VARCHAR(255) NOT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
-- Insert schema version
INSERT INTO messages_metadata (meta_key, meta_value) VALUES ('schema_version', '1');Complete schema available: See ../../docs/persistence-sql-schema.md
2. Setup Connection Pool
import { createPool } from 'mysql2/promise';
import { MySQLMessageStore } from '@message-in-the-middle/store-mysql';
// Create MySQL connection pool
const pool = createPool({
host: 'localhost',
port: 3306,
user: 'your_user',
password: 'your_password',
database: 'message_store',
waitForConnections: true,
connectionLimit: 10,
queueLimit: 0
});
// Create store
const store = new MySQLMessageStore(pool);3. Use with Persistence Middleware
import { MessageMiddlewareManager } from '@message-in-the-middle/core';
import { PersistenceInboundMiddleware } from '@message-in-the-middle/persistence-core';
const manager = new MessageMiddlewareManager();
manager.addInboundMiddleware(
new PersistenceInboundMiddleware(store, {
storeOn: ['error'] // Store failed messages
})
);Configuration
Basic Configuration
import { createPool } from 'mysql2/promise';
import { MySQLMessageStore } from '@message-in-the-middle/store-mysql';
const pool = createPool({
host: 'localhost',
user: 'user',
password: 'password',
database: 'message_store'
});
const store = new MySQLMessageStore(pool);Custom Table Name
const store = new MySQLMessageStore(pool, {
tableName: 'custom_messages' // Default: 'messages'
});Note: Table name is validated to prevent SQL injection. Only alphanumeric characters and underscores allowed.
Production Configuration
const pool = createPool({
host: process.env.DB_HOST,
port: parseInt(process.env.DB_PORT || '3306'),
user: process.env.DB_USER,
password: process.env.DB_PASSWORD,
database: process.env.DB_NAME,
// Connection pooling
waitForConnections: true,
connectionLimit: 10,
queueLimit: 0,
// Connection timeout
connectTimeout: 10000,
// Enable multiple statements (if needed)
multipleStatements: false,
// SSL/TLS (for production)
ssl: {
ca: fs.readFileSync('/path/to/ca-cert.pem')
}
});
const store = new MySQLMessageStore(pool, {
tableName: 'messages'
});Querying Messages
Find by Status
import { MessageStatus } from '@message-in-the-middle/persistence-core';
// Find all failed messages
const failed = await store.findByStatus(MessageStatus.FAILED);
// With pagination and sorting
const recent = await store.findByStatus(MessageStatus.FAILED, {
limit: 20,
offset: 0,
sortBy: 'created',
sortOrder: 'desc'
});
// With date range
const lastWeek = await store.findByStatus(MessageStatus.FAILED, {
startDate: new Date(Date.now() - 7 * 24 * 60 * 60 * 1000),
endDate: new Date()
});Find by Error Type
// Find all validation errors
const errors = await store.findByError('ValidationError');
// With options
const recentErrors = await store.findByError('ValidationError', {
limit: 10,
startDate: new Date('2024-01-01')
});Find by ID
const message = await store.findById('msg-123');
if (message) {
console.log('Status:', message.status);
console.log('Error:', message.errorMessage);
console.log('Retry count:', message.retryCount);
}Count Messages
// Total messages
const total = await store.count();
// Count failed messages
const failedCount = await store.count({
status: MessageStatus.FAILED
});
// Count by error type
const validationErrors = await store.count({
errorType: 'ValidationError'
});
// Count in date range
const todayCount = await store.count({
startDate: new Date(Date.now() - 24 * 60 * 60 * 1000)
});Message Replay
import { MessageReplayManager } from '@message-in-the-middle/persistence-core';
const replayManager = new MessageReplayManager(store, pipeline);
// Replay failed messages
const result = await replayManager.replayFailed({ limit: 100 });
console.log(`Replayed ${result.succeeded} successfully`);
// Replay specific error type
await replayManager.replayByErrorType('PaymentGatewayError');Complete Example
import { createPool } from 'mysql2/promise';
import {
MessageMiddlewareManager,
ParseJsonInboundMiddleware,
RetryInboundMiddleware
} from '@message-in-the-middle/core';
import {
PersistenceInboundMiddleware,
MessageReplayManager,
MessageStatus
} from '@message-in-the-middle/persistence-core';
import { MySQLMessageStore } from '@message-in-the-middle/store-mysql';
// Setup database connection
const pool = createPool({
host: 'localhost',
user: 'user',
password: 'password',
database: 'message_store',
connectionLimit: 10
});
// Create store
const store = new MySQLMessageStore(pool);
// Create manager with persistence
const manager = new MessageMiddlewareManager();
manager
.addInboundMiddleware(new PersistenceInboundMiddleware(store, {
storeOn: ['error'] // Store failed messages
}))
.addInboundMiddleware(new ParseJsonInboundMiddleware())
.addInboundMiddleware(new RetryInboundMiddleware({ maxRetries: 3 }));
// Process messages
try {
await manager.processInbound(messageBody);
} catch (error) {
console.error('Processing failed, message stored in MySQL');
}
// Query failed messages
const failed = await store.findByStatus(MessageStatus.FAILED, {
limit: 10,
sortBy: 'created',
sortOrder: 'desc'
});
console.log(`Found ${failed.length} failed messages`);
// Replay failures
const replayManager = new MessageReplayManager(store, manager);
const result = await replayManager.replayFailed({ limit: 10 });
console.log(`Replayed: ${result.succeeded} succeeded, ${result.failed} failed`);
// Cleanup on shutdown
process.on('SIGTERM', async () => {
await store.destroy();
await pool.end();
process.exit(0);
});Schema Management
Schema Versioning
The store tracks schema versions in a metadata table:
-- Check current schema version
SELECT meta_value FROM messages_metadata WHERE meta_key = 'schema_version';Current schema version: 1
Migrations
When upgrading the library, check for schema changes in the changelog. Migration scripts will be provided for breaking schema changes.
Custom Schema
If you need to customize the schema:
- Start with the base schema from docs/persistence-sql-schema.md
- Add your custom columns
- Ensure core columns remain unchanged
- Update indexes for your query patterns
Performance Optimization
Indexes
The default schema includes these indexes:
PRIMARY KEY (id)- Fast lookups by IDINDEX idx_status (status)- Fast status queriesINDEX idx_error_type (error_type)- Fast error type queriesINDEX idx_created (created)- Date range queriesINDEX idx_status_created (status, created)- Combined queries
Connection Pooling
Configure pool size based on your workload:
const pool = createPool({
// ...
connectionLimit: 10, // Max connections
queueLimit: 0, // Unlimited queue
waitForConnections: true
});Guidelines:
- Low traffic: 5-10 connections
- Medium traffic: 10-20 connections
- High traffic: 20-50 connections
Query Optimization
For large tables, consider:
- Partition by date - Partition the messages table by created date
- Archive old data - Move old SUCCEEDED messages to archive table
- Limit queries - Always use
limitin production - Use indexes - Ensure queries use indexes (check with
EXPLAIN)
Data Retention
Manual Cleanup
// Delete old succeeded messages
await pool.execute(`
DELETE FROM messages
WHERE status = 'SUCCEEDED'
AND created < DATE_SUB(NOW(), INTERVAL 30 DAY)
LIMIT 1000
`);Automated Cleanup (Cron Job)
-- Create event for daily cleanup (MySQL 5.1+)
CREATE EVENT IF NOT EXISTS cleanup_old_messages
ON SCHEDULE EVERY 1 DAY
DO
DELETE FROM messages
WHERE status = 'SUCCEEDED'
AND created < DATE_SUB(NOW(), INTERVAL 30 DAY)
LIMIT 10000;High Availability
Master-Replica Setup
Use read replicas for queries:
// Write pool (master)
const writePool = createPool({
host: 'master.db.example.com',
// ...
});
// Read pool (replica)
const readPool = createPool({
host: 'replica.db.example.com',
// ...
});
// Use write pool for store
const store = new MySQLMessageStore(writePool);
// Use read pool for queries
const messages = await readPool.execute(
'SELECT * FROM messages WHERE status = ? LIMIT ?',
[MessageStatus.FAILED, 100]
);Backup Strategy
- Regular backups: Daily full backups
- Incremental backups: Hourly binlog backups
- Test restores: Monthly restore tests
- Offsite storage: Store backups in different region
Security
SQL Injection Prevention
The store uses:
- Parameterized queries - All user input is parameterized
- Table name validation - Only safe table names allowed
- No dynamic SQL - No string concatenation of queries
Connection Security
const pool = createPool({
// ...
ssl: {
ca: fs.readFileSync('/path/to/ca-cert.pem'),
cert: fs.readFileSync('/path/to/client-cert.pem'),
key: fs.readFileSync('/path/to/client-key.pem')
}
});Access Control
-- Create dedicated user with minimal permissions
CREATE USER 'msg_store_user'@'%' IDENTIFIED BY 'secure_password';
GRANT SELECT, INSERT, UPDATE, DELETE ON message_store.messages TO 'msg_store_user'@'%';
GRANT SELECT, INSERT ON message_store.messages_metadata TO 'msg_store_user'@'%';
FLUSH PRIVILEGES;Troubleshooting
Connection Issues
// Add error handling
pool.on('error', (err) => {
console.error('MySQL pool error:', err);
if (err.code === 'PROTOCOL_CONNECTION_LOST') {
// Handle reconnection
}
});Slow Queries
-- Enable slow query log
SET GLOBAL slow_query_log = 'ON';
SET GLOBAL long_query_time = 2;
-- Check slow queries
SELECT * FROM mysql.slow_log ORDER BY start_time DESC LIMIT 10;Large Tables
If the messages table grows very large:
- Add partitioning by date
- Archive old data
- Increase
innodb_buffer_pool_size - Use
LIMITin all queries
MariaDB Compatibility
This package works with both MySQL and MariaDB. Tested versions:
- MySQL: 5.7, 8.0, 8.1
- MariaDB: 10.6, 10.11, 11.0
Related Packages
- @message-in-the-middle/persistence-core - Persistence interfaces (required)
- @message-in-the-middle/store-memory - In-memory store for dev/testing
- @message-in-the-middle/core - Core library
Documentation
- SQL Schema - Complete database schema
- Persistence Core - Complete persistence documentation
- Main README - Library documentation
- Architecture - Design patterns
- Contributing - How to contribute
License
MIT
