@message-in-the-middle/core
v0.1.3
Published
Framework-agnostic middleware pattern for message queue processing. Core package with all middlewares.
Maintainers
Readme
@message-in-the-middle/core
⚠️ Work in Progress Is this library production-ready? No. Is this library safe? No. When will it be ready? Soon™ (maybe tomorrow, maybe never). Why is it public? Experiment
message-in-the-middle is to Express.js what your message queue processing is to HTTP request processing. Just as Express provides a middleware pattern for HTTP requests, this library provides a middleware pattern for processing queue messages.
Why This Exists
Processing queue messages usually means copy-pasting the same boilerplate: parse JSON, validate, log, retry, deduplicate, route to handlers. This library lets you compose that logic as middlewares.
Framework-agnostic middleware pattern for message queue processing. The core package with all middlewares included.
Features
- 🚀 Framework Agnostic - Works with any framework or standalone
- 🔌 Queue Agnostic - Works with any message queue system (SQS, RabbitMQ, Redis, Kafka, etc.)
- 🎯 Simple & Powerful - Clean middleware pattern inspired by Express.js
- 📦 Zero Dependencies - No external runtime dependencies
- 🎨 Full TypeScript Support - Complete type safety with generics
- 🔧 11 Built-in Middlewares - JSON, logging, validation, retry, deduplication, and more
- 🎭 Message Routing - Built-in dispatcher for routing messages to handlers
- 🎛️ Quick Setup Presets - Pre-configured pipelines for common scenarios
Installation
# npm
npm install @message-in-the-middle/core
# pnpm
pnpm add @message-in-the-middle/core
# yarn
yarn add @message-in-the-middle/coreFrom GitHub (Private Repo)
# Latest from main branch
npm install github:gorannovosel/message-in-the-middle
# Specific version tag (recommended)
npm install github:gorannovosel/message-in-the-middle#v0.3.0Quick Start
Option 1: Using Presets (Recommended)
import { createStandardPipeline } from '@message-in-the-middle/core/presets';
// Create a production-ready pipeline with one line
const manager = createStandardPipeline({
logging: console,
retry: { maxRetries: 3 }
});
// Process any message from any queue
await manager.processInbound(messageBody, rawMessage);Option 2: Manual Configuration
import {
MessageMiddlewareManager,
ParseJsonInboundMiddleware,
LogInboundMiddleware
} from '@message-in-the-middle/core';
// Create manager
const manager = new MessageMiddlewareManager();
// Add middlewares
manager
.addInboundMiddleware(new ParseJsonInboundMiddleware())
.addInboundMiddleware(new LogInboundMiddleware(console));
// Process message
const context = await manager.processInbound(messageBody, rawMessage);
console.log('Processed:', context.message);Queue Integration Examples
Works with any message queue - just pass the message body and raw message:
AWS SQS
import { SQSClient, ReceiveMessageCommand } from '@aws-sdk/client-sqs';
import { MessageMiddlewareManager, ParseJsonInboundMiddleware } from '@message-in-the-middle/core';
const manager = new MessageMiddlewareManager();
manager.addInboundMiddleware(new ParseJsonInboundMiddleware());
// Process SQS message
async function processSqsMessage(sqsMessage) {
const context = await manager.processInbound(
sqsMessage.Body,
sqsMessage // Raw SQS message available as context.raw
);
console.log('Order:', context.message);
console.log('MessageId:', context.raw?.MessageId);
}RabbitMQ
import * as amqp from 'amqplib';
import { MessageMiddlewareManager, ParseJsonInboundMiddleware } from '@message-in-the-middle/core';
const manager = new MessageMiddlewareManager();
manager.addInboundMiddleware(new ParseJsonInboundMiddleware());
// Process RabbitMQ message
async function processRabbitMessage(channel, msg) {
const context = await manager.processInbound(
msg.content.toString(),
msg // Raw RabbitMQ message available as context.raw
);
console.log('Message:', context.message);
channel.ack(msg);
}Redis, Kafka, etc.
Same pattern works everywhere:
const context = await manager.processInbound(messageBody, rawMessage);Built-in Middlewares
This package includes 11 production-ready middlewares:
| Middleware | Purpose | |------------|---------| | ParseJsonInboundMiddleware | Parse JSON strings automatically | | StringifyJsonOutboundMiddleware | Stringify objects to JSON | | LogInboundMiddleware | Log messages with timing | | LogOutboundMiddleware | Log outbound messages | | ValidateInboundMiddleware | Validate message schemas | | ValidateOutboundMiddleware | Validate outbound messages | | RetryInboundMiddleware | Retry with exponential backoff | | DeadLetterInboundMiddleware | Handle failed messages | | DeduplicateInboundMiddleware | Prevent duplicate processing | | RateLimitOutboundMiddleware | Control throughput with token bucket | | EncryptOutboundMiddleware | Encrypt sensitive data | | DecryptInboundMiddleware | Decrypt encrypted messages | | TransformInboundMiddleware | Transform message format | | TransformOutboundMiddleware | Transform outbound format | | MetricsInboundMiddleware | Collect processing metrics | | MetricsOutboundMiddleware | Collect outbound metrics | | TracingInboundMiddleware | Distributed tracing support | | TracingOutboundMiddleware | Trace outbound messages |
Message Routing
Route messages to different handlers based on an identifier field:
import { MessageDispatcher, DispatcherMiddleware } from '@message-in-the-middle/core';
// Create dispatcher
const dispatcher = new MessageDispatcher({
identifierField: 'action' // or 'command', 'type', 'event'
});
// Register handlers
dispatcher
.register('CREATE_ORDER', async (ctx) => {
console.log('Creating order:', ctx.message);
})
.register('UPDATE_ORDER', async (ctx) => {
console.log('Updating order:', ctx.message);
});
// Use as middleware
manager.addInboundMiddleware(new DispatcherMiddleware(dispatcher));
// Messages are automatically routed
await manager.processInbound(JSON.stringify({
action: 'CREATE_ORDER',
orderId: '123'
}));Pre-configured Presets
Quick setup helpers for common scenarios:
import {
createDevelopmentPipeline,
createProductionPipeline,
createHighThroughputPipeline,
createCompliancePipeline,
createResilientPipeline,
createObservablePipeline
} from '@message-in-the-middle/core/presets';
// Development: Minimal setup for local development
const devManager = createDevelopmentPipeline({ logging: console });
// Production: Balanced pipeline for most use cases
const prodManager = createProductionPipeline({
logging: logger,
deduplication: redisStore,
validation: validator,
retry: { maxRetries: 3 }
});
// High-Throughput: Optimized for maximum speed
const fastManager = createHighThroughputPipeline({ dispatcher });
// And more...TypeScript Support
Full type safety with generics:
interface OrderMessage {
orderId: string;
amount: number;
}
interface SQSMessage {
MessageId: string;
Body: string;
}
const manager = new MessageMiddlewareManager<OrderMessage>();
// Type-safe message handling
const context = await manager.processInbound<OrderMessage, SQSMessage>(
message.Body,
message
);
// context.message is OrderMessage
// context.raw is SQSMessageAPI Reference
Core Classes
- MessageMiddlewareManager - Main orchestrator for middleware pipelines
- MessageDispatcher - Route messages to handlers based on identifier
- MiddlewarePipeline - Low-level pipeline execution
Interfaces
- InboundMiddleware - Interface for inbound middleware
- OutboundMiddleware - Interface for outbound middleware
- MessageContext - Context object passed through pipeline
Types
- MessageHandlerFn - Type for message handler functions
- MessageContext<T, R> - Generic context type
For complete API documentation, see the main README.
Examples
Complete examples available in the examples/ directory:
- Basic usage - Getting started with the library
- Queue integrations - AWS SQS, RabbitMQ examples
- Framework integrations - NestJS, Express examples
- Custom middlewares - How to create your own middlewares
- Advanced pipelines - Complex multi-middleware setups
Related Packages
This is the core package. For additional functionality:
- @message-in-the-middle/nestjs - NestJS integration helpers
- @message-in-the-middle/persistence-core - Message persistence interfaces
- @message-in-the-middle/store-memory - In-memory stores (dev/testing)
- @message-in-the-middle/store-mysql - MySQL message store (production)
- @message-in-the-middle/testing - Testing utilities
Documentation
- Main README - Complete documentation
- Architecture - Design patterns and decisions
- Changelog - Version history
- Contributing - How to contribute
License
MIT
