@cobranza-apps/events-toolkit
v0.8.0
Published
NestJS library for standardized NATS+JetStream event handling across Cobranza App microservices
Maintainers
Readme
@cobranza-apps/events-toolkit
NATS + JetStream event handling library for the Cobranza App microservices platform.
Quickstart (for AI agents)
npm install @cobranza-apps/events-toolkitRegister NATS + subsystems in
AppModule:EventsToolkitModule.forRoot({ nats: { servers: ['nats://localhost:4222'] }, discovery: { enabled: true, registerOnStartup: true, service: { name: 'payment-service', version: '1.0.0' } }, })Define an event DTO — extend
EventEnvelope<T>, decorate every field withclass-validator.Emit: decorate a service method with
@EmitEvent('domain.entity.action', { version, description, payloadExample }).Consume: decorate a handler with
@OnEvent('domain.entity.action', { version, description, payloadExample }).Run:
npm run start.
See the Onboarding Flow section for the full 11-step path (architecture → deploy).
Onboarding Flow
- Architecture — NATS + JetStream, event envelope, actors, tenant isolation → Core Concepts · Architecture
- Install & configure —
EventsToolkitModule.forRoot()→ Installation · Setup (Unified Module) - Define an event DTO —
EventEnvelope<T>+class-validator→ Defining an Event - Produce an event —
@EmitEvent()·ProducerService.emit()→ Producer - Consume an event —
@OnEvent()· DLQ routing → Consumer · Error Handling & DLQ - Request-reply —
request()/sendRequest()+@OnRequestReply()→ Request-Reply Pattern - Outbox —
OutboxService.saveToOutbox()·sendAsyncRequestThroughOutbox()→ Outbox Pattern - Service discovery — manifests ·
GET /discovery/manifest· platform events → Discovery - Schema generation — auto JSON Schema from DTOs ·
payloadSchemaRef→ Event Discovery & Service Registry - Testing —
EventsToolkitTestModule· mock services · assertion helpers → Testing Utilities - Deployment — JetStream stream config · env vars · health checks → Deployment (new section)
Overview
events-toolkit encapsulates all NATS/JetStream event infrastructure concerns into reusable NestJS modules, services, and decorators. It enforces the Event & Messaging Convention at compile-time and runtime, ensuring every microservice in the platform produces and consumes events consistently.
What it provides
- Event Envelope: Strongly typed
EventEnvelope<T>extending abstractEventBasewith built-inclass-validatorvalidation - Subject Builder: Single entry point for all NATS subject generation following the convention
- Producer Module:
@EmitEvent()decorator andProducerServicefor fire-and-forget publishing - Consumer Module:
@OnEvent()decorator with automatic validation, error handling, and DLQ routing - Request-Reply:
RequestReplyServicefor sync (request()) and async (sendRequest()+@OnRequestReply) request-reply patterns - Outbox Module: Persistent outbox with SQLite or PostgreSQL backends, background processor for transactional safety
- Event Logger: Winston-based structured logging with trace and correlation IDs
- Discovery Module:
DiscoveryModule,DiscoveryService,@EmitEvent/@OnEvent/@OnRequestReplymanifest annotation, schema auto-generation from class-validator DTOs, service registration viaplatform.service.register.v1events, periodic heartbeats, and HTTP endpoints for manifest/schema retrieval
Non-goals
- Does NOT define domain-specific event payloads — each microservice owns its events.
- Does NOT replace the main PostgreSQL outbox in
ms-db-gateway— it supplements with SQLite for other services. However, a shared PostgreSQL backend is available for services that already have one. - Is NOT a standalone service — it is a library consumed by NestJS microservices.
Table of Contents
- Quickstart (for AI agents)
- Onboarding Flow
- Overview
- Installation
- Core Concepts
- Usage
- Architecture
- Guidelines for AI Agents
- Development
- Deployment
- Related Documentation
- License
Installation
npm install @cobranza-apps/events-toolkitPeer Dependencies
The following must be installed in the consuming microservice:
{
"@nestjs/common": "^11.1.0",
"@nestjs/core": "^11.1.0",
"@nestjs/microservices": "^11.1.0",
"class-transformer": "^0.5.1",
"class-validator": "^0.14.0",
"nats": "^2.29.0"
}Requirements
- Node.js >= 20
- NATS server >= 2.10 with JetStream enabled
Core Concepts
Event Envelope
All messages follow a standardized envelope structure. The toolkit provides EventBase as the abstract base class defining common envelope fields, and EventEnvelope<T> as the concrete generic class that extends it with a typed data payload:
{
"id": "evt_01JXYZABC123456789012345",
"type": "payment.proof.uploaded",
"version": "1.0.0",
"produced_at": "2025-06-08T16:45:12.345Z",
"producer": "payment-service",
"company_id": "550e8400-e29b-41d4-a716-446655440000",
"actor_type": "client",
"actor_id": "clt_123e4567-e89b-12d3-a456-426614174000",
"correlation_id": "987fcdeb-51a2-43e8-9c4f-123456789abc",
"causation_id": "evt_01JXYZABC987654321098765",
"trace_id": "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01",
"reply_to": null,
"data": { }
}Key fields:
id— UUIDv7 withevt_prefixcompany_id— UUID for tenant isolation (mandatory)actor_type/actor_id— who performed the action (mandatory for audit)correlation_id— links events across a transaction chaindata— domain-specific payload (typed per microservice)
Subject Naming Convention
All NATS subjects follow this pattern:
company.{company_id}.{domain}.{entity}.{action}.v{version}| Token | Description | Example |
| ----- | ----------- | ------ |
| company_id | UUID without dashes | 550e8400e29b41d4a716446655440000 |
| domain | Business domain | payment, debt, bank, notification |
| entity | Main entity | proof, statement, schedule |
| action | Past-tense verb | uploaded, created, processed, sent |
| version | Major version | v1, v2 |
Examples:
company.550e8400e29b41d4a716446655440000.debt.created.v1company.550e8400e29b41d4a716446655440000.payment.proof.uploaded.v1company.550e8400e29b41d4a716446655440000.bank.statement.processed.v1
For Request then Async Response: append .response to the base subject:
- Request:
company.{id}.payment.proof.uploaded.v1 - Response:
company.{id}.payment.proof.uploaded.response.v1
Actor Types
enum ActorType {
CLIENT = "client",
COMPANY_USER = "company_user",
SYSTEM = "system",
SCHEDULER = "scheduler",
EXTERNAL_API = "external_api"
}Event Context
The EventContext provides the identity and traceability metadata required by every event. When constructing or factory-creating an event, you pass an EventContext object:
interface EventContext {
type: string; // Event type in dot-notation (e.g. 'payment.proof.uploaded')
version: string; // Schema version (e.g. '1.0.0')
producer: string; // Name of the producing microservice (e.g. 'payment-service')
companyId: string; // Tenant UUID (mandatory)
actorType: ActorType; // Who performed the action
actorId: string; // Identifier of the actor
correlationId: string; // Links events across a transaction chain
causationId?: string; // ID of the event that caused this one
traceId?: string; // Distributed tracing ID
replyTo?: string; // Subject for request-reply responses
}The toolkit propagates companyId, actorType, and actorId into the envelope fields company_id, actor_type, and actor_id.
Usage
Setup (Individual Modules)
Import the modules you need in your NestJS application:
import { Module } from '@nestjs/common';
import { ProducerModule, ConsumerModule } from '@cobranza-apps/events-toolkit';
import { connect } from 'nats';
const natsConnection = await connect({ servers: ['nats://localhost:4222'] });
@Module({
imports: [
ProducerModule.forRoot({ connection: natsConnection }),
ConsumerModule.forRoot({ connection: natsConnection }),
]
})
export class AppModule {}Setup (Unified Module)
Use EventsToolkitModule.forRoot() to configure all subsystems in a single call:
import { Module } from '@nestjs/common';
import { EventsToolkitModule } from '@cobranza-apps/events-toolkit';
@Module({
imports: [
EventsToolkitModule.forRoot({
nats: { servers: ['nats://localhost:4222'] },
outbox: {
type: 'sqlite',
sqlitePath: '/data/outbox.sqlite',
serviceOptions: { maxRetries: 3 },
},
logging: { level: 'info' },
}),
],
})
export class AppModule {}Defining an Event
Extend EventEnvelope<T> with your domain-specific data type:
import { EventEnvelope } from '@cobranza-apps/events-toolkit';
import { IsUUID, IsUrl, IsNumber, IsEnum } from 'class-validator';
enum Currency {
USD = 'USD',
MXN = 'MXN',
COP = 'COP'
}
class PaymentProofUploadedData {
@IsUUID()
paymentAttemptId: string;
@IsUrl()
fileUrl: string;
@IsNumber()
amount: number;
@IsEnum(Currency)
currency: Currency;
}
class PaymentProofUploadedEvent extends EventEnvelope<PaymentProofUploadedData> {
readonly type = 'payment.proof.uploaded';
readonly version = '1.0.0';
}Producer (Publishing Events)
Option 1 — Decorator-based (@EmitEvent())
import { EmitEvent, SubjectBuilder, EventContext } from '@cobranza-apps/events-toolkit';
class PaymentController {
constructor(private readonly subjectBuilder: SubjectBuilder) {}
@EmitEvent('payment.proof.uploaded', {
version: '1',
description: 'A payment proof file was uploaded',
payloadExample: { paymentAttemptId: 'uuid', fileUrl: 'https://...', amount: 100, currency: 'MXN' },
})
async handleUpload(dto: UploadDto, context: EventContext): Promise<PaymentProofUploadedData> {
return new PaymentProofUploadedData({ paymentAttemptId, fileUrl, amount });
}
}Option 2 — Direct service injection
import { createEvent, ProducerService, SubjectBuilder } from '@cobranza-apps/events-toolkit';
class PaymentService {
constructor(
private readonly producerService: ProducerService,
private readonly subjectBuilder: SubjectBuilder
) {}
async processUpload(data: PaymentProofUploadedData, context: EventContext): Promise<void> {
const subject = this.subjectBuilder.build({
companyId: context.companyId,
domain: 'payment',
entity: 'proof',
action: 'uploaded',
version: '1'
});
const event = createEvent(data, context);
await this.producerService.publish(subject, event);
}
}Consumer (Subscribing to Events)
import { OnEvent, EventEnvelope } from '@cobranza-apps/events-toolkit';
class PaymentProofConsumer {
@OnEvent('payment.proof.uploaded', {
version: '1',
description: 'Handles payment proof upload events',
payloadExample: { paymentAttemptId: 'uuid', fileUrl: 'https://...', amount: 100, currency: 'MXN' },
})
async onProofUploaded(event: EventEnvelope<PaymentProofUploadedData>): Promise<void> {
const { data, company_id, correlation_id } = event;
// Business logic — toolkit handles parsing, validation, acknowledgment
await this.processProof(data);
}
}Error Handling & DLQ
Throw EventConsumerException to route a message to the Dead Letter Queue. The consumer service catches this exception and forwards the failed message to the DLQ subject (dlq.company.{company_id}.{domain}.{entity}.{action}.v{version}):
import { EventConsumerException } from '@cobranza-apps/events-toolkit';
@OnEvent('payment.proof.uploaded', {
version: '1',
description: 'Handles payment proof upload events',
payloadExample: { paymentAttemptId: 'uuid', fileUrl: 'https://...', amount: 100, currency: 'MXN' },
})
async onProofUploaded(event: EventEnvelope<PaymentProofUploadedData>): Promise<void> {
if (event.data.amount <= 0) {
throw new EventConsumerException({
message: 'Invalid amount',
eventId: event.id,
eventType: event.type,
correlationId: event.correlation_id,
});
}
// Process valid event
}Structured Logging
EventLoggerService provides Winston-based structured logging for all event operations. It accepts optional custom transports, enabling microservices to integrate with existing logging infrastructure:
import { EventLoggerService } from '@cobranza-apps/events-toolkit';
import * as winston from 'winston';
const logger = new EventLoggerService({
transports: [new winston.transports.Console()],
level: 'info',
});
logger.logEventEmitted({
eventId: event.id,
eventType: event.type,
subject: 'company.550e8400e29b...payment.proof.uploaded.v1',
correlationId: event.correlation_id,
});Request-Reply Pattern
The toolkit supports two request-reply patterns. For the full guide, see Request-Reply Patterns. For guidance on choosing between sync and async patterns, see Request-Reply Guidelines.
Full examples: See Sync Request-Reply Example and Async Request-Reply Example for complete, runnable code samples.
Sync — request()
Blocks until a response arrives or a timeout expires:
import { RequestReplyService, SubjectBuilder, EventContext } from '@cobranza-apps/events-toolkit';
class PaymentService {
constructor(
private readonly requestReply: RequestReplyService,
private readonly subjectBuilder: SubjectBuilder
) {}
async requestProofStatus(companyId: string, paymentId: string, context: EventContext): Promise<ProofResponse> {
const subject = this.subjectBuilder.build({
companyId,
domain: 'payment',
entity: 'proof',
action: 'requested',
version: '1'
});
const payload = new ProofRequestedData({ paymentId });
const response = await this.requestReply.request<ProofRequestedData, ProofResponse>(
subject,
payload,
{ context, timeoutMs: 10000 }
);
return response.data;
}
}Async — sendRequest() + @OnRequestReply
Non-blocking: publish a request with reply_to, receive the response in a decorated handler:
// ── Requester: send async request ──
import {
RequestReplyService, SubjectBuilder, EventContext,
ActorType,
} from '@cobranza-apps/events-toolkit';
class DebtService {
constructor(
private readonly requestReply: RequestReplyService,
private readonly subjectBuilder: SubjectBuilder,
) {}
async requestCreditCheck(clientId: string, companyId: string): Promise<string> {
const replySubject = this.subjectBuilder.build({
companyId, domain: 'credit', entity: 'check',
action: 'completed', version: '1',
});
const context: EventContext = {
type: 'credit.check.requested',
version: '1.0.0',
producer: 'debt-service',
companyId,
actorType: ActorType.SYSTEM,
actorId: 'debt-service',
correlationId: '987fcdeb-51a2-43e8-9c4f-123456789abc',
replyTo: replySubject,
};
const result = await this.requestReply.sendRequest({
subject: this.subjectBuilder.build({
companyId, domain: 'credit', entity: 'check',
action: 'requested', version: '1',
}),
payload: new CreditCheckRequestedData({ clientId }),
context,
});
return result.correlationId;
}
}
// ── Responder: handle request, send response ──
class CreditCheckConsumer {
constructor(private readonly requestReply: RequestReplyService) {}
@OnEvent('credit.check.requested', {
version: '1',
description: 'Handles incoming credit check requests',
payloadExample: { clientId: 'uuid', fullName: 'Jane Doe' },
})
async onCreditCheckRequested(event: EventEnvelope<CreditCheckRequestedData>): Promise<void> {
if (!this.requestReply.isRequestReplyMessage(event)) { return; }
const responseEvent = this.requestReply.buildResponseEnvelope({
requestEvent: event,
responseContext: {
type: 'credit.check.completed', version: '1.0.0',
producer: 'credit-service', companyId: event.company_id,
actorType: ActorType.SYSTEM, actorId: 'credit-service',
correlationId: event.correlation_id,
replyTo: event.reply_to,
},
responseData: await this.performCheck(event.data),
});
await this.requestReply.sendResponse(event.correlation_id, responseEvent);
}
}
// ── Requester: handle async response ──
class DebtServiceResponseHandler {
@OnRequestReply('credit.check.completed', {
description: 'Handles credit check completion responses',
payloadExample: { clientId: 'uuid', score: 750, approved: true },
})
async handleCreditCheckResponse(
event: EventEnvelope<CreditCheckResultData>,
context: EventContext,
): Promise<void> {
await this.processCreditResult(event.data, event.correlation_id);
}
}Outbox Pattern
For a decision-making guide on when to use the outbox, which backend to choose, and transactional vs normal persistence, see Outbox Usage Guidelines.
For transactional safety, the Outbox module persists events before publishing. It supports two backends:
| Backend | Use Case | Service Type |
| -------- | ------------------------------- | -------------------------------------------------- |
| SQLite | Lightweight, self-contained | Services without their own database |
| Postgres | Shares main application DB | ms-db-gateway and services with existing TypeORM |
For detailed configuration, see docs/outbox-configuration.md.
import { createEvent, OutboxService, SubjectBuilder } from '@cobranza-apps/events-toolkit';
class PaymentService {
constructor(
private readonly outboxService: OutboxService,
private readonly subjectBuilder: SubjectBuilder,
) {}
async processWithOutbox(data: PaymentProofUploadedData, context: EventContext): Promise<void> {
const subject = this.subjectBuilder.build({
companyId: context.companyId,
domain: 'payment',
entity: 'proof',
action: 'uploaded',
version: '1',
});
const event = createEvent(data, context);
// Persisted to outbox, published by background processor
await this.outboxService.saveToOutbox(event, subject);
}
}SQLite configuration:
OutboxModule.forRoot({
type: 'sqlite',
sqlite: { dbPath: '/data/outbox.sqlite' }, // Use Docker volume path
serviceOptions: {
processorIntervalMs: 5000, // Background processor interval (ms)
maxRetries: 3, // Max publish retries before DLQ routing
},
})PostgreSQL configuration:
OutboxModule.forRoot({
type: 'postgres',
postgres: { entityManager: myTypeOrmEntityManager },
serviceOptions: { maxRetries: 3 },
})Transactional outbox: For PostgreSQL + TypeORM services,
saveInTransactioninserts outbox events within the caller's active database transaction, ensuring atomicity with business writes. See Transactional Outbox Usage Guide for details and examples.
Request-Reply Through the Outbox
The outbox module supports async request-reply patterns through two APIs.
Low-Level API — sendRequestThroughOutbox
For cases where you already have a pre-built envelope, use sendRequestThroughOutbox:
import {
createEvent, OutboxService, SubjectBuilder, EventContext,
ActorType,
} from '@cobranza-apps/events-toolkit';
class DebtService {
constructor(
private readonly outboxService: OutboxService,
private readonly subjectBuilder: SubjectBuilder,
) {}
async requestCreditCheck(clientId: string, companyId: string): Promise<void> {
const context: EventContext = {
type: 'credit.check.requested',
version: '1.0.0',
producer: 'debt-service',
companyId,
actorType: ActorType.SYSTEM,
actorId: 'debt-service',
correlationId: '987fcdeb-51a2-43e8-9c4f-123456789abc',
replyTo: this.subjectBuilder.build({
companyId, domain: 'credit', entity: 'check',
action: 'completed', version: '1',
}),
};
const event = createEvent({ clientId }, context);
await this.outboxService.sendRequestThroughOutbox(
event,
this.subjectBuilder.build({
companyId, domain: 'credit', entity: 'check',
action: 'requested', version: '1',
}),
);
}
}High-Level API — sendAsyncRequestThroughOutbox
For a simpler API that builds the envelope for you, use sendAsyncRequestThroughOutbox:
import {
OutboxService, SubjectBuilder, EventContext,
ActorType, generateUuidV7,
} from '@cobranza-apps/events-toolkit';
class DebtService {
constructor(
private readonly outboxService: OutboxService,
private readonly subjectBuilder: SubjectBuilder,
) {}
async requestCreditCheck(clientId: string, companyId: string): Promise<string> {
const requestSubject = this.subjectBuilder.build({
companyId, domain: 'credit', entity: 'check', action: 'requested', version: '1',
});
const replySubject = this.subjectBuilder.build({
companyId, domain: 'credit', entity: 'check', action: 'completed', version: '1',
});
const result = await this.outboxService.sendAsyncRequestThroughOutbox({
subject: requestSubject,
payload: { clientId },
context: {
type: 'credit.check.requested',
version: '1.0.0',
producer: 'debt-service',
companyId,
actorType: ActorType.SYSTEM,
actorId: 'debt-service',
correlationId: generateUuidV7(),
replyTo: replySubject,
},
});
return result.correlationId;
}
}Recommended Patterns
| Pattern | Approach |
|---------|----------|
| Sync Request-Reply (request()) | Bypass the outbox for the request — you're waiting for the response anyway. Use the outbox only for side effects triggered by the response. |
| Async Request-Reply (sendRequest() / sendAsyncRequestThroughOutbox) | Route the initial request through the outbox to guarantee delivery, even if the service restarts. Use sendAsyncRequestThroughOutbox for the simplest API, or sendRequestThroughOutbox if you need to build the envelope manually. |
See Request-Reply Patterns for full async pattern documentation and Outbox Configuration for request-reply outbox guidance.
Subject Builder
The SubjectBuilder is the single entry point for subject generation:
import { SubjectBuilder } from '@cobranza-apps/events-toolkit';
const subject = subjectBuilder.build({
companyId: '550e8400e29b41d4a716446655440000',
domain: 'payment',
entity: 'proof',
action: 'uploaded',
version: '1'
});
// Result: "company.550e8400e29b41d4a716446655440000.payment.proof.uploaded.v1"Discovery
Configure the discovery subsystem to auto-register your service and generate JSON Schemas:
import { EventsToolkitModule } from '@cobranza-apps/events-toolkit';
EventsToolkitModule.forRoot({
nats: { servers: ['nats://localhost:4222'] },
discovery: {
enabled: true,
registerOnStartup: true,
heartbeatIntervalMinutes: 5,
service: { name: 'payment-service', version: '1.0.0' },
},
})For the full guide, see Event Discovery & Service Registry.
Event Factory
Create validated event instances without the new keyword. The factory accepts the data payload and an EventContext, and returns a fully-populated EventEnvelope:
import { createEvent, ActorType } from '@cobranza-apps/events-toolkit';
const eventContext: EventContext = {
type: 'payment.proof.uploaded',
version: '1.0.0',
producer: 'payment-service',
companyId: '550e8400-e29b-41d4-a716-446655440000',
actorType: ActorType.CLIENT,
actorId: 'clt_123e4567-e89b-12d3-a456-426614174000',
correlationId: '987fcdeb-51a2-43e8-9c4f-123456789abc',
};
const event = createEvent(paymentData, eventContext);Architecture
src/
├── index.ts # Public API barrel exports
├── common/ # Shared across all modules
│ ├── constants.ts # Magic strings, defaults
│ ├── envelope/ # EventEnvelope<T>, ActorType, EventBase
│ │ └── validators/ # Custom class-validator decorators
│ ├── dto/ # BuildSubjectDto
│ ├── utils/ # SubjectBuilder, EventFactory, uuid.utils, date utils
│ └── errors/ # EventConsumerException
├── producer/
│ ├── decorators/ # @EmitEvent(), EmitEventInterceptor
│ ├── producer.module.ts
│ └── producer.service.ts
├── consumer/
│ ├── decorators/ # @OnEvent(), @OnRequestReply(), explorers
│ ├── consumer.module.ts
│ ├── consumer.service.ts
│ └── jetstream-consumer.service.ts
├── discovery/ # Service discovery, manifest generation, schema publishing
│ ├── dto/ # Manifest DTOs (ServiceManifestDto, ManifestConsumeEntry, etc.)
│ ├── events/ # DiscoveryEventPublisher, platform subjects and event types
│ ├── utils/ # SchemaGenerator, SchemaPersister
│ ├── discovery.controller.ts # HTTP endpoints: GET /discovery/manifest, GET /discovery/schemas
│ ├── discovery.service.ts # Orchestrates manifest generation, heartbeat, shutdown
│ ├── discovery.module.ts # DiscoveryModule DynamicModule
│ ├── manifest.service.ts # Scans decorators, builds ServiceManifestDto
│ └── manifest-entry.builder.ts # Builds manifest entries, resolves payloadSchemaRef
├── request-reply/ # RequestReplyService
├── outbox/ # OutboxModule, OutboxService, SqliteOutboxRepository, PostgresOutboxRepository
└── logging/ # EventLoggerService (Winston)Each concern is a separate NestJS DynamicModule — microservices import only what they need.
Guidelines for AI Agents
When generating event-related code in microservices using this toolkit, follow these rules:
- Subject naming: Always use
SubjectBuilder.build()— never concatenate subject strings manually. - Event IDs: Use
generateEventId()from the toolkit, which returns a UUIDv7 prefixed withevt_. - Validation: Always decorate event data classes with
class-validatordecorators. - Actor context: Always populate
actor_typeandactor_idin the event context. - Tenant isolation:
company_idis mandatory in every event envelope. - Idempotency: Consumers must be idempotent — use
id+correlation_idfor deduplication. - Past-tense actions: Action names must use past tense (
created,uploaded,processed). - Consumer errors: Throw
EventConsumerExceptionfor business errors that should route to DLQ. - References over objects: Prefer IDs over full object graphs in event payloads.
- Events under 256KB: Keep event payloads small.
For step-by-step instructions on creating events, naming subjects, and common pitfalls, see docs/ai-agent-guidelines.md.
For the full convention specification, see docs/event-messaging-convention.md.
Development
Local Development Setup
git clone <repo-url>
cd events-toolkit
npm installScripts
npm run build # Compile TypeScript to dist/
npm test # Run unit tests (Jest)
npm run test:e2e # Run integration tests (requires NATS)
npm run lint # ESLint
npm run format # PrettierLocal NATS for testing
docker run -p 4222:4222 nats:latest -jsTesting Utilities
The toolkit provides mock services, a NestJS test module, and Jest assertion helpers for unit-testing microservices that depend on events-toolkit — no NATS connection required.
import { Test } from '@nestjs/testing';
import {
EventsToolkitTestModule,
MockProducerService,
expectEventPublished,
} from '@cobranza-apps/events-toolkit';
describe('PaymentService', () => {
let service: PaymentService;
let mockProducer: MockProducerService;
beforeEach(async () => {
const module = await Test.createTestingModule({
imports: [EventsToolkitTestModule.forRoot()],
providers: [PaymentService],
}).compile();
service = module.get(PaymentService);
mockProducer = module.get(MockProducerService);
});
it('should publish an event', async () => {
await service.uploadProof({ companyId: '...', amount: 250 });
expectEventPublished(mockProducer, 'company.550e.payment.proof.uploaded.v1');
});
});Full documentation: docs/testing-utilities.md
Deployment
JetStream Stream Configuration
Configure event and DLQ streams with the following JetStream settings:
// Event stream
await nc.jetStreamManager.streams.add({
name: 'EVENTS',
subjects: ['company.>'],
retention: 'limits',
max_age: 7 * 24 * 60 * 60 * 1_000_000_000, // 7 days in nanoseconds
max_msgs_per_subject: 10_000,
dedupe_window: 2 * 60 * 1_000_000_000, // 2 minutes in nanoseconds
storage: 'file',
});
// DLQ stream
await nc.jetStreamManager.streams.add({
name: 'DLQ',
subjects: ['dlq.>'],
retention: 'limits',
max_age: 30 * 24 * 60 * 60 * 1_000_000_000, // 30 days in nanoseconds
max_msgs_per_subject: 100_000,
storage: 'file',
dedupe_window: 2 * 60 * 1_000_000_000,
});
// Platform events stream (for service discovery)
await nc.jetStreamManager.streams.add({
name: 'PLATFORM',
subjects: ['platform.service.>'],
retention: 'limits',
max_age: 7 * 24 * 60 * 60 * 1_000_000_000,
max_msgs_per_subject: 1_000,
storage: 'file',
});Environment Variables
| Variable | Description | Example |
|----------|-------------|---------|
| NATS_URLS | Comma-separated NATS server URLs | nats://localhost:4222 |
| SERVICE_NAME | Microservice name for discovery | payment-service |
| SERVICE_VERSION | Microservice version for discovery | 1.0.0 |
| OUTBOX_DB_PATH | SQLite file path (SQLite outbox only) | /data/outbox.sqlite |
Health Checks
- Liveness probe:
GET /discovery/manifest— returns the service manifest JSON. A 200 response indicates the service is healthy and the discovery subsystem is active. - Heartbeat: Set
heartbeatIntervalMinutesin discovery options to enable periodic platform heartbeat events (platform.service.heartbeat.v1). Default: 5 minutes. - SQLite persistence: When using the SQLite outbox backend in Docker, mount a persistent volume at the
OUTBOX_DB_PATHdirectory to survive container restarts. See Outbox Configuration.
Related Documentation
- Changelog — Notable release changes and upgrade notes
- Event & Messaging Convention — Full event standard specification
- Outbox Configuration — SQLite vs Postgres setup, service options, and migration guide
- Outbox Usage Guidelines — Decision trees for outbox backend, transactional vs normal, and request-reply patterns
- Transactional Outbox Usage — TypeORM transaction examples and saveInTransaction guide
- AI Agent Guidelines — Step-by-step event creation, naming, correlation/causation, and common mistakes
- Event Discovery & Service Registry — Service manifest, schema generation, platform events, and discovery module setup
- Request-Reply Patterns — Sync vs async patterns, correlation, timeouts, and error handling
- Request-Reply Guidelines — Decision tree, timeout recommendations, performance trade-offs, and best practices
- Request-Reply Examples — Complete code examples for sync, async, and outbox request-reply patterns
- Testing Utilities — Mock services, test module, and assertion helpers
- Architecture — Module design and data flows
- Tech Stack — Technology choices and development setup
- Product Overview — Problem definition and goals
License
Unlicense
