@asar-studio/nestjs-event-sourcing
v3.1.0
Published
NestJS event sourcing module with projections, CQRS integration, and transactional multi-aggregate commits.
Maintainers
Readme
@asar-studio/nestjs-event-sourcing
NestJS event sourcing module with:
- event store persistence in MongoDB
- projection pipeline for materialized views
- transactional multi-aggregate commits
- optimistic concurrency protection
- CQRS integration (
@nestjs/cqrs)
Install
npm i @asar-studio/nestjs-event-sourcingPeer dependencies:
@nestjs/common@nestjs/core@nestjs/cqrs@nestjs/mongoose@nestjs/platform-express
Quick start
1) Register event sourcing module
import { Module } from "@nestjs/common";
import { EventSourcingModule } from "@asar-studio/nestjs-event-sourcing";
@Module({
imports: [
EventSourcingModule.forRoot({
mongoURL: "mongodb://mongo1:27017,mongo2:27017,mongo3:27017/es?replicaSet=rs0",
connectOptions: {},
collectionsOptions: {
eventsCollectionName: "event-store",
snapshotsCollectionName: "event-snapshots",
transactionsCollectionName: "event-transactions",
},
}),
EventSourcingModule.forFeature(),
],
})
export class AppModule {}2) Define a storable domain event
import { StorableEvent } from "@asar-studio/nestjs-event-sourcing";
export class OrderPlacedEvent extends StorableEvent {
eventAggregate = "order";
eventVersion = 1;
constructor(
public readonly id: string,
public readonly customerId: string,
public readonly total: number,
) {
super();
}
}3) Commit via aggregate root
import { AggregateRoot } from "@nestjs/cqrs";
export class Order extends AggregateRoot {
constructor(public readonly id: string) {
super();
}
place(customerId: string, total: number) {
this.apply(new OrderPlacedEvent(this.id, customerId, total));
}
}const order = this.eventPublisher.mergeObjectContext(new Order(orderId));
order.place(customerId, total);
order.commit();Projections
Use @ProjectionHandler(...) and IProjection<TEvent> to update MV collections:
import { InjectModel } from "@nestjs/mongoose";
import { Model } from "mongoose";
import { IProjection, ProjectionHandler } from "@asar-studio/nestjs-event-sourcing";
@ProjectionHandler(OrderPlacedEvent)
export class OrderProjection implements IProjection<OrderPlacedEvent> {
constructor(
@InjectModel("OrderView")
private readonly orderViewModel: Model<any>,
) {}
async handle(event: OrderPlacedEvent): Promise<void> {
await this.orderViewModel.updateOne(
{ orderId: event.id },
{
$set: {
orderId: event.id,
customerId: event.customerId,
total: event.total,
status: "PLACED",
},
},
{ upsert: true },
);
}
}Multiple projections for the same event are supported and executed in registration order.
Transactional multi-aggregate commit
If one command updates multiple aggregate roots, use AggregateTransactionService:
import { AggregateTransactionService } from "@asar-studio/nestjs-event-sourcing";
constructor(private readonly aggregateTransactionService: AggregateTransactionService) {}
await this.aggregateTransactionService.commit([
orderAggregate,
paymentAggregate,
inventoryAggregate,
]);Behavior:
- all event streams are committed in one DB transaction
- if one stream fails, all streams are rolled back
- events are published only after successful persistence
Concurrency behavior
Stale writes are rejected with:
Concurrency conflict: aggregate revision already committedUse this to implement retries or idempotent command responses.
Public API highlights
EventSourcingModuleEventStoreStoreEventBusStoreEventPublisherAggregateRepositoryAggregateTransactionServiceProjectionHandlerProjectionIProjectionStorableEventReconstructViewDb
Testing (production-like)
Run e2e against a real dockerized MongoDB replica set:
npm run test:e2e:dockerThis validates event-store writes and projection updates against real MV collections.
Migration
See MIGRATION.md for upgrade notes (3.0.0 and 3.1.0).
