nestjs-event-bus-kit
v0.0.2
Published
Reliable event infrastructure for NestJS with standardized envelopes, retries, dead-letter queues, middleware, and hooks.
Maintainers
Readme
nestjs-event-bus-kit
Reliable event infrastructure for NestJS with standardized envelopes, decorator-based handlers, retries, dead-letter queues, middleware, and hooks.
- npm: https://www.npmjs.com/package/nestjs-event-bus-kit
- GitHub: https://github.com/shubhamjoshi99899/nestjs-event-bus-kit
Installation
npm install nestjs-event-bus-kitWhat You Get
EventBusModule.forRoot(...)for NestJS module setupEventBusServicefor publishing events@EventHandler(...)for handler registration- standardized
EventEnvelope<TPayload, TMetaExtra> - retry and DLQ support
- middleware and lifecycle hooks
createChainedPublishOptions(...)for event-to-event workflowsMemoryAdapterandMemoryIdempotencyStorefor local development
Quick Start
import { Module } from "@nestjs/common";
import {
EventBusModule,
MemoryAdapter,
MemoryIdempotencyStore,
} from "nestjs-event-bus-kit";
@Module({
imports: [
EventBusModule.forRoot({
serviceName: "order-service",
adapter: new MemoryAdapter(),
retry: {
maxAttempts: 3,
baseDelayMs: 250,
strategy: "exponential",
maxDelayMs: 2_000,
},
dlq: {
enabled: true,
topicSuffix: ".dlq",
},
idempotencyStore: new MemoryIdempotencyStore(),
}),
],
})
export class AppModule {}Publish an event
import { Injectable } from "@nestjs/common";
import {
EventBusService,
PublishableEvent,
} from "nestjs-event-bus-kit";
interface OrderPlacedPayload {
orderId: string;
userId: string;
totalAmount: number;
}
@Injectable()
export class OrderService {
constructor(private readonly eventBus: EventBusService) {}
async placeOrder(): Promise<void> {
const event: PublishableEvent<OrderPlacedPayload> = {
name: "order.placed",
payload: {
orderId: "ord_1001",
userId: "usr_1",
totalAmount: 499,
},
};
await this.eventBus.publish(event, {
correlationId: "corr-ord_1001",
idempotencyKey: "ord_1001:order.placed",
tags: ["commerce", "checkout"],
});
}
}Consume an event
import { Injectable } from "@nestjs/common";
import {
EventEnvelope,
EventHandler,
HandlerContext,
} from "nestjs-event-bus-kit";
interface OrderPlacedPayload {
orderId: string;
userId: string;
totalAmount: number;
}
@Injectable()
export class NotificationHandler {
@EventHandler("order.placed", {
retries: 3,
retryDelayMs: 300,
useExponentialBackoff: true,
dlqEnabled: true,
idempotent: true,
})
async handleOrderPlaced(
event: EventEnvelope<OrderPlacedPayload>,
context: HandlerContext,
): Promise<void> {
console.log("event", event.name);
console.log("attempt", context.attempt);
}
}Hooks
import { EventBusHooks } from "nestjs-event-bus-kit";
const hooks: EventBusHooks = {
onPublish: (event) => {
console.log("[publish]", event.name);
},
onConsumeStart: (event) => {
console.log("[consume:start]", event.name);
},
onConsumeSuccess: (event, durationMs) => {
console.log("[consume:success]", event.name, durationMs);
},
beforeHandle: ({ event, handler, context }) => {
console.log(
"[handler:before]",
event.name,
handler.methodName,
context.attempt,
);
},
afterHandle: ({ event, handler, context }) => {
console.log(
"[handler:after]",
event.name,
handler.methodName,
context.attempt,
);
},
onError: ({ event, handler, context, error }) => {
console.log(
"[handler:error]",
event.name,
handler.methodName,
context.attempt,
error.message,
);
},
};Middleware
import { EventMiddleware } from "nestjs-event-bus-kit";
const loggingMiddleware: EventMiddleware = async (
{ event, handler, context },
next,
): Promise<void> => {
console.log(
"[middleware:enter]",
event.name,
handler.methodName,
context.attempt,
);
try {
await next();
console.log("[middleware:exit]", event.name, handler.methodName);
} catch (error) {
const message = error instanceof Error ? error.message : String(error);
console.log("[middleware:error]", event.name, handler.methodName, message);
throw error;
}
};Chained Publish Options
import {
createChainedPublishOptions,
EventBusService,
EventEnvelope,
} from "nestjs-event-bus-kit";
interface OrderPlacedPayload {
orderId: string;
}
interface InventoryReservedPayload {
orderId: string;
}
async function reserveInventory(
eventBus: EventBusService,
sourceEvent: EventEnvelope<OrderPlacedPayload>,
): Promise<void> {
await eventBus.publish<InventoryReservedPayload>(
{
name: "inventory.reserved",
payload: {
orderId: sourceEvent.payload.orderId,
},
},
createChainedPublishOptions({
sourceEvent,
topic: "inventory.workflow",
idempotencyKey: `${sourceEvent.payload.orderId}:inventory.reserved`,
appendTags: ["inventory"],
}),
);
}Event Shape
{
id: string;
name: string;
version: number;
occurredAt: string;
source: string;
payload: TPayload;
metadata: {
correlationId: string;
causationId?: string;
actorId?: string;
tenantId?: string;
requestId?: string;
idempotencyKey?: string;
retryCount: number;
tags?: string[];
extra?: TMetaExtra;
};
}Demo
The repository includes a runnable example application in event-bus-demo/.
Status
This package is still in the 0.x stage. Expect API iteration while the public surface settles.
License
ISC
