@message-in-the-middle/nestjs
v0.1.3
Published
NestJS integration for message-middleware with decorators and modules
Downloads
45
Maintainers
Readme
@message-in-the-middle/nestjs
⚠️ Work in Progress Is this library production-ready? No. Is this library safe? No. When will it be ready? Soon™ (maybe tomorrow, maybe never). Why is it public? Experiment
message-in-the-middle is to Express.js what your message queue processing is to HTTP request processing. Just as Express provides a middleware pattern for HTTP requests, this library provides a middleware pattern for processing queue messages.
Why This Exists
Processing queue messages usually means copy-pasting the same boilerplate: parse JSON, validate, log, retry, deduplicate, route to handlers. This library lets you compose that logic as middlewares.
NestJS integration for message-middleware with decorators, modules, and dependency injection support.
This package provides NestJS-specific utilities to seamlessly integrate message-middleware into your NestJS applications with proper dependency injection and decorator support.
Features
- 🎯 @MessageHandler - Decorator for marking message handler methods
- 🚦 @MessageRoute - Decorator for defining message routes
- 📦 MessageMiddlewareModule - NestJS module for dependency injection
- 💉 Dependency Injection - Full DI support for managers and dispatchers
- 🎨 TypeScript - Complete type safety
Installation
# npm
npm install @message-in-the-middle/nestjs @message-in-the-middle/core
# pnpm
pnpm add @message-in-the-middle/nestjs @message-in-the-middle/core
# yarn
yarn add @message-in-the-middle/nestjs @message-in-the-middle/corePeer Dependencies:
@nestjs/common^9.0.0 || ^10.0.0@nestjs/core^9.0.0 || ^10.0.0reflect-metadata^0.1.13 || ^0.2.0
Quick Start
1. Import the Module
import { Module } from '@nestjs/common';
import { MessageMiddlewareModule } from '@message-in-the-middle/nestjs';
@Module({
imports: [
MessageMiddlewareModule.forRoot({
isGlobal: true // Make available globally
})
]
})
export class AppModule {}2. Inject and Use in Services
import { Injectable } from '@nestjs/common';
import { MessageMiddlewareManager } from '@message-in-the-middle/core';
import { ParseJsonInboundMiddleware, LogInboundMiddleware } from '@message-in-the-middle/core';
@Injectable()
export class OrderService {
constructor(
private readonly manager: MessageMiddlewareManager
) {
// Configure pipeline in constructor
this.manager
.addInboundMiddleware(new ParseJsonInboundMiddleware())
.addInboundMiddleware(new LogInboundMiddleware(console));
}
async processMessage(messageBody: string, rawMessage?: any) {
const context = await this.manager.processInbound(messageBody, rawMessage);
return context.message;
}
}3. Use with Message Handlers
import { Injectable, Logger } from '@nestjs/common';
import { MessageDispatcher, DispatcherMiddleware } from '@message-in-the-middle/core';
import { MessageHandler } from '@message-in-the-middle/nestjs';
@Injectable()
export class OrderMessageService {
private readonly logger = new Logger(OrderMessageService.name);
private readonly dispatcher: MessageDispatcher;
private readonly manager: MessageMiddlewareManager;
constructor() {
// Setup dispatcher
this.dispatcher = new MessageDispatcher({ identifierField: 'action' });
// Register handlers
this.dispatcher
.register('CREATE_ORDER', this.handleCreateOrder.bind(this))
.register('UPDATE_ORDER', this.handleUpdateOrder.bind(this));
// Setup manager with dispatcher
this.manager = new MessageMiddlewareManager();
this.manager
.addInboundMiddleware(new ParseJsonInboundMiddleware())
.addInboundMiddleware(new DispatcherMiddleware(this.dispatcher));
}
async processMessage(message: string) {
return this.manager.processInbound(message);
}
@MessageHandler('CREATE_ORDER')
private async handleCreateOrder(ctx: MessageContext) {
this.logger.log(`Creating order: ${ctx.message.orderId}`);
// Your order creation logic
}
@MessageHandler('UPDATE_ORDER')
private async handleUpdateOrder(ctx: MessageContext) {
this.logger.log(`Updating order: ${ctx.message.orderId}`);
// Your order update logic
}
}Module Configuration
Basic Configuration
import { MessageMiddlewareModule } from '@message-in-the-middle/nestjs';
@Module({
imports: [
MessageMiddlewareModule.forRoot()
]
})
export class AppModule {}Global Module
Make the module available globally:
@Module({
imports: [
MessageMiddlewareModule.forRoot({
isGlobal: true
})
]
})
export class AppModule {}Async Configuration
Configure with async factory:
@Module({
imports: [
MessageMiddlewareModule.forRootAsync({
imports: [ConfigModule],
inject: [ConfigService],
useFactory: (configService: ConfigService) => ({
isGlobal: true,
// Additional configuration from ConfigService
})
})
]
})
export class AppModule {}Decorators
@MessageHandler
Mark methods as message handlers:
import { MessageHandler } from '@message-in-the-middle/nestjs';
class OrderService {
@MessageHandler('CREATE_ORDER')
async handleCreateOrder(ctx: MessageContext) {
// Handler logic
}
}@MessageRoute
Define message routes (alternative syntax):
import { MessageRoute } from '@message-in-the-middle/nestjs';
class OrderService {
@MessageRoute('orders', 'CREATE')
async createOrder(ctx: MessageContext) {
// Handler logic
}
}Complete Example
Message Consumer Service
import { Injectable, Logger, OnModuleInit } from '@nestjs/common';
import {
MessageMiddlewareManager,
MessageDispatcher,
DispatcherMiddleware,
ParseJsonInboundMiddleware,
LogInboundMiddleware,
RetryInboundMiddleware,
DeduplicateInboundMiddleware,
InMemoryDeduplicationStore,
MessageContext
} from '@message-in-the-middle/core';
@Injectable()
export class MessageConsumerService implements OnModuleInit {
private readonly logger = new Logger(MessageConsumerService.name);
private readonly manager: MessageMiddlewareManager;
private readonly dispatcher: MessageDispatcher;
constructor(
private readonly orderService: OrderService,
private readonly paymentService: PaymentService
) {
// Create dispatcher
this.dispatcher = new MessageDispatcher({
identifierField: 'action'
});
// Register handlers
this.dispatcher.registerMany({
'CREATE_ORDER': this.handleCreateOrder.bind(this),
'PROCESS_PAYMENT': this.handleProcessPayment.bind(this)
});
// Setup middleware pipeline
this.manager = new MessageMiddlewareManager();
const deduplicationStore = new InMemoryDeduplicationStore();
this.manager
.addInboundMiddleware(new ParseJsonInboundMiddleware())
.addInboundMiddleware(new LogInboundMiddleware(this.logger))
.addInboundMiddleware(new DeduplicateInboundMiddleware(deduplicationStore))
.addInboundMiddleware(new RetryInboundMiddleware({ maxRetries: 3 }))
.addInboundMiddleware(new DispatcherMiddleware(this.dispatcher));
}
onModuleInit() {
this.logger.log('Message consumer service initialized');
}
async processMessage(messageBody: string, rawMessage?: any) {
try {
const context = await this.manager.processInbound(messageBody, rawMessage);
this.logger.log(`Message processed successfully: ${context.message.action}`);
return context;
} catch (error) {
this.logger.error(`Failed to process message: ${error.message}`, error.stack);
throw error;
}
}
private async handleCreateOrder(ctx: MessageContext) {
this.logger.log(`Creating order: ${ctx.message.orderId}`);
await this.orderService.create(ctx.message);
}
private async handleProcessPayment(ctx: MessageContext) {
this.logger.log(`Processing payment: ${ctx.message.paymentId}`);
await this.paymentService.process(ctx.message);
}
}SQS Consumer Integration
import { Injectable, Logger } from '@nestjs/common';
import { SQSClient, ReceiveMessageCommand, DeleteMessageCommand } from '@aws-sdk/client-sqs';
import { MessageConsumerService } from './message-consumer.service';
@Injectable()
export class SqsConsumerService {
private readonly logger = new Logger(SqsConsumerService.name);
private readonly sqsClient: SQSClient;
private readonly queueUrl: string;
constructor(
private readonly messageConsumer: MessageConsumerService,
configService: ConfigService
) {
this.sqsClient = new SQSClient({
region: configService.get('AWS_REGION')
});
this.queueUrl = configService.get('SQS_QUEUE_URL');
}
async pollMessages() {
while (true) {
try {
const command = new ReceiveMessageCommand({
QueueUrl: this.queueUrl,
MaxNumberOfMessages: 10,
WaitTimeSeconds: 20
});
const response = await this.sqsClient.send(command);
if (response.Messages) {
await Promise.all(
response.Messages.map(msg => this.processMessage(msg))
);
}
} catch (error) {
this.logger.error(`Error polling messages: ${error.message}`);
await new Promise(resolve => setTimeout(resolve, 5000));
}
}
}
private async processMessage(sqsMessage: any) {
try {
// Process through middleware pipeline
await this.messageConsumer.processMessage(sqsMessage.Body, sqsMessage);
// Delete message from queue
await this.sqsClient.send(new DeleteMessageCommand({
QueueUrl: this.queueUrl,
ReceiptHandle: sqsMessage.ReceiptHandle
}));
} catch (error) {
this.logger.error(`Failed to process message: ${error.message}`);
// Message will be retried by SQS
}
}
}Best Practices
1. Use Dependency Injection
Inject the MessageMiddlewareManager into services rather than creating multiple instances:
@Injectable()
export class OrderService {
constructor(private readonly manager: MessageMiddlewareManager) {}
}2. Configure Pipeline in Constructor
Set up your middleware pipeline in the constructor:
constructor(private readonly manager: MessageMiddlewareManager) {
this.manager
.addInboundMiddleware(new ParseJsonInboundMiddleware())
.addInboundMiddleware(new LogInboundMiddleware(logger));
}3. Use NestJS Logger
Use NestJS's built-in logger for consistency:
import { Logger } from '@nestjs/common';
@Injectable()
export class OrderService {
private readonly logger = new Logger(OrderService.name);
constructor(manager: MessageMiddlewareManager) {
manager.addInboundMiddleware(
new LogInboundMiddleware(this.logger)
);
}
}4. Handle Lifecycle Events
Implement lifecycle hooks for cleanup:
import { OnModuleDestroy } from '@nestjs/common';
@Injectable()
export class OrderService implements OnModuleDestroy {
constructor(private readonly manager: MessageMiddlewareManager) {}
async onModuleDestroy() {
await this.manager.destroy();
}
}Examples
See complete NestJS examples in the examples/frameworks/nestjs/ directory:
- Basic NestJS integration
- NestJS with message routing
- SQS consumer with NestJS
- Complete e-commerce example
Related Packages
- @message-in-the-middle/core - Core library (required)
- @message-in-the-middle/store-mysql - MySQL store for persistence
- @message-in-the-middle/testing - Testing utilities
Documentation
- Main README - Complete documentation
- Core Package - Core library docs
- Examples - Usage examples
- Contributing - How to contribute
License
MIT
