@guillermolg00/distributed-workflow
v0.2.1
Published
Distributed Workflow Engine for NestJS Microservices with Saga Pattern
Maintainers
Readme
Distributed Workflow Engine
A TypeScript/NestJS library that brings durability, reliability, and observability to distributed microservices workflows. Built with the Saga pattern, it provides automatic compensation, multi-transport support, and seamless state management.
✨ Features
- ✅ Declarative Workflow Definition - Define complex workflows with simple decorators
- ✅ Automatic Compensation - Built-in saga pattern with automatic rollback on failures
- ✅ Multi-Transport Support - RabbitMQ, HTTP, Local execution with extensibility
- ✅ Retry & Circuit Breaking - Configurable retry logic with exponential backoff
- ✅ State Persistence - Redis/PostgreSQL/In-memory state management
- ✅ Observability First - Automatic tracing, metrics, and structured logging
- ✅ Idempotency - Built-in idempotency keys for safe retries
- ✅ Type-Safe - Full TypeScript support with generics
- ✅ NestJS Integration - Native NestJS module with dependency injection
📦 Installation
npm install @nestjs-distributed-workflow/distributed-workflow
# Or with yarn
yarn add @nestjs-distributed-workflow/distributed-workflow
# Or with pnpm
pnpm add @nestjs-distributed-workflow/distributed-workflowOptional Dependencies
For specific features, install these optional dependencies:
# For HTTP transport
npm install @nestjs/axios axios
# For message transport (RabbitMQ, Kafka, etc.)
npm install @nestjs/microservices
# For Redis state store
npm install ioredis
# For PostgreSQL state store
npm install pg🚀 Quick Start
1. Setup Module
import { Module } from '@nestjs/common';
import { DistributedWorkflowModule } from '@nestjs-distributed-workflow/distributed-workflow';
import { UserOnboardingWorkflow } from './workflows/user-onboarding.workflow';
@Module({
imports: [
DistributedWorkflowModule.forRoot({
stateStore: {
type: 'memory', // or 'redis', 'postgres'
},
observability: {
logging: { enabled: true, level: 'info' },
metrics: { enabled: true },
tracing: { enabled: true },
},
}),
// Register workflows as providers
DistributedWorkflowModule.forFeature([UserOnboardingWorkflow]),
],
})
export class AppModule {}2. Define a Workflow
import { Injectable } from '@nestjs/common';
import {
Workflow,
Step,
Compensate,
WorkflowInput
} from '@nestjs-distributed-workflow/distributed-workflow';
@Injectable()
export class UserOnboardingWorkflow {
@Workflow('user-onboarding', {
timeout: '5m',
idempotent: true,
})
async execute(@WorkflowInput() input: { email: string; name: string }) {
// Step 1: Create user
const user = await this.createUser(input);
// Step 2: Send welcome email
await this.sendWelcomeEmail(user);
return { userId: user.id, status: 'completed' };
}
@Step({
target: 'ms-users.create',
timeout: '30s',
retries: 3,
critical: true,
})
async createUser(data: any) {
return data;
}
@Compensate('createUser')
async compensateCreateUser(user: { id: string }) {
// Rollback: delete user
console.log(`Compensating user creation: ${user.id}`);
}
@Step({
target: 'http:POST:https://api.email.com/send',
timeout: '15s',
retries: 2,
critical: false,
})
async sendWelcomeEmail(user: any) {
return { sent: true };
}
}3. Execute the Workflow
import { Injectable } from '@nestjs/common';
import { WorkflowEngine } from '@nestjs-distributed-workflow/distributed-workflow';
@Injectable()
export class OnboardingService {
constructor(private readonly workflowEngine: WorkflowEngine) {}
async onboardUser(email: string, name: string) {
const result = await this.workflowEngine.execute('user-onboarding', {
email,
name,
});
if (result.success) {
console.log('Onboarding completed:', result.output);
} else {
console.error('Onboarding failed:', result.error);
}
return result;
}
}📖 Core Concepts
Workflow
A Workflow is a series of steps executed in order. Define it with the @Workflow decorator:
@Workflow('my-workflow', {
timeout: '10m',
idempotent: true,
stateStore: 'redis',
})
async execute(@WorkflowInput() input: MyInput) {
// Your workflow steps
}Step
A Step is an atomic operation that can succeed or fail. Configure it with @Step:
@Step({
target: 'ms-service.method', // Service target
transport: 'MESSAGE', // MESSAGE, HTTP, or LOCAL
timeout: '30s',
retries: 3,
backoff: 'exponential',
critical: true,
})
async myStep(data: any) {
return data;
}Compensation
Compensation is the rollback logic for a step, executed in reverse order on failure:
@Compensate('myStep')
async compensateMyStep(data: any) {
// Undo the step's changes
}Transports
Transports define how steps communicate:
- MESSAGE: RabbitMQ, Kafka, Redis (via
@nestjs/microservices) - HTTP: REST APIs (internal or external)
- LOCAL: In-process function calls
🔧 Configuration
State Stores
Memory (Development/Testing)
stateStore: {
type: 'memory',
}Redis
import Redis from 'ioredis';
const redis = new Redis({
host: 'localhost',
port: 6379,
});
stateStore: {
type: 'redis',
options: {
client: redis,
keyPrefix: 'workflow:',
ttl: 86400, // 24 hours
},
}PostgreSQL
import { Pool } from 'pg';
const pool = new Pool({
host: 'localhost',
port: 5432,
database: 'workflows',
user: 'postgres',
password: 'password',
});
stateStore: {
type: 'postgres',
options: {
pool,
tableName: 'workflow_states',
},
}Transports
Message Transport (RabbitMQ)
import { Transport } from '@nestjs/microservices';
transports: {
message: {
services: [
{
name: 'ms-users',
client: clientProxyFactory.create({
transport: Transport.RMQ,
options: {
urls: ['amqp://localhost:5672'],
queue: 'users_queue',
},
}),
},
],
},
}🧪 Testing
Use the built-in testing utilities:
import {
createMockWorkflowEngine,
createMockTransportRegistry,
getMockTransport,
} from '@nestjs-distributed-workflow/distributed-workflow/testing';
describe('MyWorkflow', () => {
it('should execute successfully', async () => {
const { engine, transportRegistry } = createMockWorkflowEngine();
const mockTransport = getMockTransport(transportRegistry);
// Mock responses
mockTransport.mockResponse('ms-users.create', { id: 'user-123' });
// Execute workflow
const result = await engine.execute('my-workflow', { data: 'test' });
expect(result.success).toBe(true);
});
});📊 Observability
Logging
import { WorkflowLogger } from '@nestjs-distributed-workflow/distributed-workflow';
@Injectable()
export class MyService {
constructor(private logger: WorkflowLogger) {}
async myMethod() {
this.logger.info('Workflow started', { workflowId: '123' });
}
}Metrics
import { WorkflowMetrics } from '@nestjs-distributed-workflow/distributed-workflow';
@Injectable()
export class MyService {
constructor(private metrics: WorkflowMetrics) {}
async myMethod() {
this.metrics.recordWorkflowExecution('my-workflow', 'success', 1500);
}
}Tracing
import { WorkflowTracer } from '@nestjs-distributed-workflow/distributed-workflow';
@Injectable()
export class MyService {
constructor(private tracer: WorkflowTracer) {}
async myMethod() {
const span = this.tracer.startTrace('my-operation');
// ... do work
this.tracer.endSpan(span.spanId);
}
}🛠️ Advanced Features
Sleep/Scheduling
import { sleep } from '@nestjs-distributed-workflow/distributed-workflow';
@Workflow('newsletter')
async execute(@WorkflowInput() input: any) {
await this.sendEmail(input.email, 'welcome');
await sleep('7 days'); // Pause for 7 days
await this.sendEmail(input.email, 'follow-up');
}Circuit Breaker
@Step({
target: 'unreliable-service.call',
circuitBreaker: {
enabled: true,
failureThreshold: 5,
successThreshold: 2,
timeout: 60000,
},
})
async callUnreliableService(data: any) {
return data;
}Idempotency
@Workflow('payment-processing', {
idempotent: true,
})
async execute(@WorkflowInput() input: { orderId: string }) {
// Will not re-execute if called with same orderId
}📚 Documentation
For comprehensive documentation, visit our docs.
🤝 Contributing
Contributions are welcome! Please read our Contributing Guide for details.
📄 License
MIT License - see the LICENSE file for details.
🙏 Acknowledgments
Inspired by Vercel's Workflow DevKit and adapted for distributed microservices architectures.
📧 Support
Made with ❤️ for the NestJS community
