@message-in-the-middle/pipe-core
v0.1.2
Published
Core pipe abstractions for chaining message queues
Maintainers
Readme
@message-in-the-middle/pipe-core
⚠️ Work in Progress Is this library production-ready? No. Is this library safe? No. When will it be ready? Soon™ (maybe tomorrow, maybe never). Why is it public? Experiment
message-in-the-middle is to Express.js what your message queue processing is to HTTP request processing. Just as Express provides a middleware pattern for HTTP requests, this library provides a middleware pattern for processing queue messages.
Why This Exists
Processing queue messages usually means copy-pasting the same boilerplate: parse JSON, validate, log, retry, deduplicate, route to handlers. This library lets you compose that logic as middlewares.
Queue-agnostic message pipeline orchestration for chaining message queues with transformations.
Overview
@message-in-the-middle/pipe-core provides a declarative API for building message processing pipelines that span multiple queues. Chain queue operations like functional programming pipes: source → transform → destination → transform → destination.
Key Features:
- 🔗 Queue-Agnostic: Works with SQS, RabbitMQ, Redis, Kafka, or any custom queue
- 🎯 Declarative API: Express pipelines as fluent chains
- 🔒 Type-Safe: Full TypeScript support with generics
- 📊 Observable: Rich event system for monitoring
- 🛡️ Resilient: Built-in retry, error handling, health checks
- 🔌 Middleware Compatible: Seamlessly integrates with existing middleware ecosystem
- 🚀 Production-Ready: Graceful shutdown, statistics, health endpoints
Installation
npm install @message-in-the-middle/pipe-coreNote: You'll also need queue adapters:
# For AWS SQS
npm install @message-in-the-middle/pipe-adapters-sqs @aws-sdk/client-sqs
# For RabbitMQ
npm install @message-in-the-middle/pipe-adapters-rabbitmq amqplibQuick Start
import { MessagePipelineOrchestrator } from '@message-in-the-middle/pipe-core';
import { SQSPipeAdapter } from '@message-in-the-middle/pipe-adapters-sqs';
import { SQSClient } from '@aws-sdk/client-sqs';
const sqsClient = new SQSClient({ region: 'us-east-1' });
const pipeline = new MessagePipelineOrchestrator({ logger: console })
// Pull from source queue
.source('raw-orders', new SQSPipeAdapter({
client: sqsClient,
queueUrl: process.env.RAW_ORDERS_QUEUE_URL!,
}))
// Transform messages
.pipe('parse', (body) => JSON.parse(body))
.pipe('validate', (order) => {
if (!order.customerId) throw new Error('Invalid order');
return order;
})
.pipe('enrich', async (order) => {
const customer = await fetchCustomer(order.customerId);
return { ...order, customer };
})
// Send to destination queue
.destination('enriched-orders', new SQSPipeAdapter({
client: sqsClient,
queueUrl: process.env.ENRICHED_ORDERS_QUEUE_URL!,
}))
// Start the pipeline
.start();
// Graceful shutdown
process.on('SIGTERM', () => pipeline.stop());Core Concepts
1. Pipeline Segments
Pipelines consist of segments chained together:
- Source - Entry point (polling from a queue)
- Transform - Message transformation function
- Destination - Exit point (sending to a queue)
- Filter - Conditional message filtering (future)
- Branch - Conditional routing (future)
- Parallel - Fan-out to multiple destinations (future)
2. Queue Adapters
Queue adapters implement the QueueAdapter interface to support any queue system:
interface QueueAdapter {
poll(options?: PollOptions): AsyncIterableIterator<QueueMessage>;
send(message: any, options?: SendOptions): Promise<void>;
sendBatch(messages: any[], options?: SendOptions): Promise<void>;
ack(message: QueueMessage): Promise<void>;
nack(message: QueueMessage, requeue?: boolean): Promise<void>;
connect(): Promise<void>;
disconnect(): Promise<void>;
// ... more methods
}Available Adapters:
@message-in-the-middle/pipe-adapters-sqs- AWS SQS@message-in-the-middle/pipe-adapters-rabbitmq- RabbitMQ (AMQP)- More coming soon...
3. Message Flow
┌─────────────┐
│ Source │ ──▶ Poll messages
│ Adapter │
└─────────────┘
│
▼
┌─────────────┐
│ Transform 1 │ ──▶ Parse JSON
└─────────────┘
│
▼
┌─────────────┐
│ Transform 2 │ ──▶ Validate
└─────────────┘
│
▼
┌─────────────┐
│ Destination │ ──▶ Send to queue
│ Adapter │
└─────────────┘
│
▼
┌─────────────┐
│ ACK │ ──▶ Acknowledge source
└─────────────┘API Reference
MessagePipelineOrchestrator
Constructor
new MessagePipelineOrchestrator(options?: PipelineOrchestratorOptions)Options:
logger?: Logger- Logger instance (console, Winston, Pino, etc.)name?: string- Pipeline name for identificationautoStart?: boolean- Auto-start on first segment added (default: false)maxConcurrency?: number- Max concurrent messages (default: 100)slowProcessingThreshold?: number- Emit warning if segment exceeds (ms)errorHandler?: PipeErrorHandler- Global error handler
Methods
.source(name: string, adapter: QueueAdapter): this
Define a source queue (entry point).
pipeline.source('orders', new SQSPipeAdapter({ ... })).pipe<TIn, TOut>(name: string, transform: TransformFunction<TIn, TOut>): this
Add a transformation function.
pipeline.pipe('uppercase', (msg: string) => msg.toUpperCase()).destination(name: string, adapter: QueueAdapter): this
Define a destination queue (exit point).
pipeline.destination('processed', new SQSPipeAdapter({ ... })).filter<T>(name: string, filterFn: FilterFunction<T>): this
Add a filter function.
pipeline.filter('high-priority', (msg) => msg.priority > 5).start(): Promise<this>
Start the pipeline.
.stop(): Promise<void>
Stop the pipeline gracefully (waits for in-flight messages).
.pause(): void
Pause the pipeline (no new messages, in-flight complete).
.resume(): void
Resume a paused pipeline.
.getStats(): PipelineStats
Get pipeline statistics.
.getHealth(): PipelineHealth
Get pipeline health status.
Events
The orchestrator emits events for observability:
pipeline
.on('pipeline:started', () => {
console.log('Pipeline started');
})
.on('message:complete', (message, duration) => {
console.log(`Message processed in ${duration}ms`);
})
.on('message:failed', (message, error, segmentId) => {
console.error(`Message failed in ${segmentId}:`, error);
})
.on('segment:complete', (segmentId, message, duration) => {
console.log(`Segment ${segmentId} completed in ${duration}ms`);
})
.on('slow:processing', (segmentId, duration, threshold) => {
console.warn(`Slow processing detected in ${segmentId}: ${duration}ms`);
});Available Events:
pipeline:started,pipeline:stopped,pipeline:paused,pipeline:resumedmessage:enter,message:exit,message:complete,message:failedsegment:start,segment:complete,segment:erroradapter:connected,adapter:disconnected,adapter:errorslow:processing
Examples
Example 1: Cross-Queue Integration (SQS → RabbitMQ)
import { SQSPipeAdapter } from '@message-in-the-middle/pipe-adapters-sqs';
import { RabbitMQPipeAdapter } from '@message-in-the-middle/pipe-adapters-rabbitmq';
const pipeline = new MessagePipelineOrchestrator()
.source('sqs-events', new SQSPipeAdapter({
client: sqsClient,
queueUrl: sqsQueueUrl,
}))
.pipe('transform', (sqsMsg) => ({
type: sqsMsg.eventType,
payload: sqsMsg.data,
timestamp: new Date().toISOString(),
}))
.destination('rabbitmq-events', new RabbitMQPipeAdapter({
connection: 'amqp://localhost',
queue: 'events',
}))
.start();Example 2: Multi-Stage Processing
const pipeline = new MessagePipelineOrchestrator()
// Stage 1: Validate orders
.source('raw-orders', rawOrdersAdapter)
.pipe('validate', validateOrder)
.destination('validated-orders', validatedOrdersAdapter)
// Stage 2: Process payments
.source('validated-orders', validatedOrdersAdapter)
.pipe('charge', chargeCustomer)
.destination('paid-orders', paidOrdersAdapter)
// Stage 3: Fulfill orders
.source('paid-orders', paidOrdersAdapter)
.pipe('fulfill', fulfillOrder)
.destination('fulfilled-orders', fulfilledOrdersAdapter)
.start();Example 3: With Middleware
import { parseJson, validate, log } from '@message-in-the-middle/core';
const pipeline = new MessagePipelineOrchestrator()
.source('orders', ordersAdapter)
.use(parseJson()) // Existing middleware!
.use(log(logger)) // Existing middleware!
.use(validate(orderSchema)) // Existing middleware!
.pipe('enrich', enrichOrder)
.destination('enriched', enrichedAdapter)
.start();Error Handling
Per-Segment Error Handling
pipeline
.pipe('risky-operation', riskyTransform)
.continueOnError = true; // Continue pipeline even if this failsGlobal Error Handler
const pipeline = new MessagePipelineOrchestrator({
errorHandler: async (error, context, segment) => {
await logToErrorTracking(error, context);
await sendAlertToSlack(error, segment.name);
},
});Retry Policy (Future)
pipeline.pipe('flaky-api', callFlakyAPI, {
retryPolicy: {
maxRetries: 3,
initialDelayMs: 1000,
maxDelayMs: 10000,
multiplier: 2,
},
});Monitoring & Observability
Statistics
const stats = pipeline.getStats();
console.log(`Messages processed: ${stats.messagesProcessed}`);
console.log(`Messages in flight: ${stats.messagesInFlight}`);
console.log(`Errors: ${stats.errors}`);
for (const [segmentId, segmentStats] of stats.segmentStats) {
console.log(`${segmentId}: ${segmentStats.messagesProcessed} processed`);
console.log(` Avg time: ${segmentStats.avgProcessingTime}ms`);
}Health Checks
app.get('/health', (req, res) => {
const health = pipeline.getHealth();
res.status(health.status === 'healthy' ? 200 : 503).json(health);
});Health Response:
{
"status": "healthy",
"segments": {
"transform-validate-123": {
"segmentId": "transform-validate-123",
"status": "healthy",
"errorRate": 0.5
}
},
"adapters": {
"orders": {
"name": "orders",
"connected": true,
"status": "healthy"
}
}
}Testing
Unit Testing Pipelines
import { MockQueueAdapter } from '@message-in-the-middle/testing';
describe('Order Pipeline', () => {
it('should enrich orders', async () => {
const mockAdapter = new MockQueueAdapter();
const pipeline = new MessagePipelineOrchestrator()
.source('orders', mockAdapter)
.pipe('enrich', enrichOrder)
.destination('enriched', mockAdapter)
.start();
mockAdapter.sendMessage({ orderId: '123', customerId: '456' });
await waitForProcessing();
const enrichedMessage = mockAdapter.getLastSentMessage();
expect(enrichedMessage).toHaveProperty('customer');
});
});Architecture
Design Principles
- Queue-Agnostic Abstraction - Works with any queue via adapters
- Type Safety - Full TypeScript support with generics
- Event-Driven - Rich event system for extensibility
- Graceful Degradation - Continues operating despite partial failures
- Observable - Metrics, stats, and health checks built-in
Package Structure
packages/pipe-core/
├── src/
│ ├── types/ # Core type definitions
│ ├── errors/ # Error classes
│ ├── orchestrator/ # Main orchestrator
│ ├── segment/ # Segment implementations (future)
│ └── adapter/ # Base adapter helpers (future)
├── tests/
└── RFC.md # Detailed design documentRoadmap
Phase 1: Core Infrastructure ✅ (Completed)
- [x] Basic linear pipelines (source → transform → destination)
- [x] QueueAdapter interface
- [x] MessagePipelineOrchestrator
- [x] Event emission
- [x] Statistics tracking
Phase 2: Queue Adapters 🚧 (In Progress)
- [ ] SQS Adapter
- [ ] RabbitMQ Adapter
- [ ] Example applications
Phase 3: Advanced Routing 🔮 (Future)
- [ ] Conditional branching (fan-out)
- [ ] Parallel destinations
- [ ] Message filtering
- [ ] Aggregation (fan-in)
Phase 4: Enterprise Features 🔮 (Future)
- [ ] Saga pattern support
- [ ] Circuit breaker integration
- [ ] Backpressure management
- [ ] Dynamic pipeline reconfiguration
Related Packages
@message-in-the-middle/core- Core middleware library@message-in-the-middle/aws- AWS SQS poller@message-in-the-middle/rabbitmq- RabbitMQ poller@message-in-the-middle/pipe-adapters-sqs- SQS pipe adapter@message-in-the-middle/pipe-adapters-rabbitmq- RabbitMQ pipe adapter
Contributing
Contributions are welcome! Please read the RFC for design details and integration plans.
License
MIT © Message in the Middle
