@betternest/workflows
v3.0.0
Published
MongoDB-based workflow orchestration for NestJS (alternative to BullMQ)
Downloads
804
Maintainers
Readme
@betternest/workflows
MongoDB-based workflow orchestration for NestJS (alternative to BullMQ)
@betternest/workflows provides a type-safe, decorator-based workflow engine for NestJS applications. Unlike BullMQ which requires Redis, this package uses MongoDB for persistence, making it perfect for applications that already use MongoDB.
Features
- ✅ Type-Safe - Full TypeScript support with state inference
- ✅ MongoDB-Based - No Redis required
- ✅ Cron Scheduling - Built-in scheduler with timezone support
- ✅ Stateful Workflows - Preserve state between tasks
- ✅ Automatic Retries - Configurable retry logic
- ✅ Concurrent Execution - Multiple workflows run in parallel
- ✅ Heartbeat Monitoring - Detects and recovers stale workflows
- ✅ Progress Tracking - Track workflow execution progress
- ✅ Task Timeouts - Configurable per-task and per-workflow timeouts
- ✅ Non-Blocking Retries - Retries don't block other workflows
- ✅ Graceful Shutdown - Waits for running workflows before shutdown
Installation
npm install @betternest/workflows @nestjs/mongoose mongoose
# or
yarn add @betternest/workflows @nestjs/mongoose mongoose
# or
pnpm add @betternest/workflows @nestjs/mongoose mongooseQuick Start
1. Setup Module
import { Module } from '@nestjs/common';
import { MongooseModule } from '@nestjs/mongoose';
import { BetterWorkflowsModule } from '@betternest/workflows';
@Module({
imports: [
// Setup MongoDB connection for your app
MongooseModule.forRoot('mongodb://localhost:27017/myapp'),
// Setup MongoDB connection for workflows (can be the same or separate)
MongooseModule.forRoot('mongodb://localhost:27017/workflows', {
connectionName: 'workflows',
}),
// Setup Workflows Module
BetterWorkflowsModule.forRoot({
connectionName: 'workflows',
workflowrunsCollectionName: 'workflowRuns',
timezone: 'UTC',
}),
],
})
export class AppModule {}2. Create Your First Workflow
import { Workflow, Task, TaskResult } from '@betternest/workflows';
import { Injectable } from '@nestjs/common';
@Workflow({
name: 'daily-report-workflow',
schedule: '0 9 * * *', // Run daily at 9 AM
firstTask: 'fetchData',
})
export class DailyReportWorkflow {
@Task({ name: 'Fetch data' })
async fetchData() {
const data = await this.fetchFromAPI();
return {
state: { data },
nextTask: 'processData',
} satisfies TaskResult<this>;
}
@Task({ name: 'Process data' })
async processData(state: StateAfter<'fetchData'>) {
const processed = await this.process(state.data);
return {
state: { ...state, processed },
nextTask: 'sendReport',
} satisfies TaskResult<this>;
}
@Task({ name: 'Send report' })
async sendReport(state: StateAfter<'processData'>) {
await this.sendEmail(state.processed);
return {
state,
nextTask: 'FINISHED', // Mark workflow as complete
} satisfies TaskResult<this>;
}
}
type StateAfter<T extends keyof DailyReportWorkflow> = WorkflowStateAfter<
DailyReportWorkflow,
T
>;3. Register Workflow
Simple approach (if workflow has no dependencies):
@Module({
imports: [
BetterWorkflowsModule.forFeature({
workflows: [DailyReportWorkflow],
}),
],
})
export class AppModule {}With dependencies (recommended):
@Module({
imports: [
BetterWorkflowsModule.forFeature({
imports: [DataModule, EmailModule], // Modules that workflows depend on
workflows: [DailyReportWorkflow],
}),
],
})
export class FeatureModule {}That's it! Your workflow will run automatically at 9 AM every day.
Core Concepts
Workflows
A Workflow is a multi-step, stateful process defined with the @Workflow() decorator.
@Workflow({
name: 'my-workflow', // Optional: defaults to class name
schedule: '0 9 * * *', // Optional: cron schedule
firstTask: 'step1', // Starting task method name
maxRetries: 3, // Optional: retry failed workflows (default: 3)
taskTimeout: 30000, // Optional: default timeout for all tasks in ms (default: 20000)
})
export class MyWorkflow {
constructor(readonly myService: MyService) {} // Standard NestJS DI
}Note: Dependencies (modules) are configured via BetterWorkflowsModule.forFeature(), not in the decorator.
Tasks
A Task is an individual unit of work within a workflow, defined with the @Task() decorator.
@Task({
name: 'My task',
maxRetries: 5, // Optional: override workflow maxRetries
timeout: 60000, // Optional: override workflow taskTimeout (in ms)
})
async myTask(state: WorkflowStateAfter<MyWorkflow, 'previousTask'>) {
// Do work
return {
state: { ...state, newData: 'value' },
nextTask: 'nextTaskName', // or 'FINISHED'
message: 'Optional status message',
progression: { value: 50, total: 100 }, // Optional progress
} satisfies TaskResult<this>;
}Iterative Tasks: To make a task run multiple times, return its own method name in nextTask. For example, nextTask: 'myTask' will loop back to the same task.
State Management
Workflow state is type-safe and automatically inferred:
@Task({ name: 'Step 1' })
async step1() {
return {
state: { userId: 123, userName: 'John' },
nextTask: 'step2',
};
}
@Task({ name: 'Step 2' })
async step2(state: WorkflowStateAfter<'step1'>) {
// TypeScript knows: state has { userId: number, userName: string }
console.log(state.userId); // ✅ Type-safe
console.log(state.unknownProp); // ❌ TypeScript error
return {
state: { ...state, age: 30 },
nextTask: 'step3',
};
}
@Task({ name: 'Step 3' })
async step3(state: WorkflowStateAfter<'step2'>) {
// TypeScript knows: state has { userId, userName, age }
console.log(state.age); // ✅ Type-safe
}API Reference
@Workflow(options)
Decorator to define a workflow class.
Options:
{
name?: string; // Unique workflow identifier (defaults to class name)
schedule?: string; // Cron expression (optional)
firstTask: string; // Name of the first task method
maxRetries?: number; // Max retry attempts (default: 3)
taskTimeout?: number; // Default timeout for all tasks in ms (default: 20000)
}Note: The @Workflow() decorator automatically makes the class @Injectable(), so you can use standard NestJS dependency injection in the constructor.
@Task(options)
Decorator to define a task method.
Options:
{
name: string; // Task display name
maxRetries?: number; // Max retry attempts for this task (overrides workflow maxRetries)
timeout?: number; // Timeout in ms for this task (overrides workflow taskTimeout)
}Note: Tasks can be iterative by returning their own method name in nextTask. This allows a task to run multiple times (e.g., processing items in a loop).
TaskResult<T>
Return type for task methods.
{
state: Record<string, unknown>; // Updated workflow state
nextTask: string; // Next task name or 'FINISHED'
message?: string; // Optional status message
progression?: { value: number; total: number }; // Optional progress
error?: string; // Optional error (marks as failed)
}WorkflowStateAfter<TaskName>
Type helper to infer state after a specific task.
type State = WorkflowStateAfter<'fetchData'>;
// Automatically infers the state shape returned by 'fetchData' taskWorkflowManager
Service to manage workflows programmatically.
import { WorkflowManager } from '@betternest/workflows';
@Injectable()
export class MyService {
constructor(private readonly workflowManager: WorkflowManager) {}
async triggerWorkflow() {
// Manually trigger a workflow
const workflow = await this.workflowManager.triggerWorkflow('my-workflow');
return workflow._id;
}
async getWorkflowStatus(workflowId: string) {
const workflow = await this.workflowManager.getWorkflowDocument(workflowId);
return workflow;
}
async listWorkflows() {
// List workflow executions with filters
const workflows = await this.workflowManager.listWorkflowDocuments({
workflowName: 'my-workflow',
status: 'finished',
limit: 10,
skip: 0,
});
return workflows;
}
async countWorkflows() {
// Count workflow executions
const count = await this.workflowManager.countWorkflowDocuments({
workflowName: 'my-workflow',
status: 'inProgress',
});
return count;
}
async cleanup() {
// Cleanup old workflows (default: older than 7 days)
const deletedCount = await this.workflowManager.cleanupOldWorkflows(7);
return deletedCount;
}
getRegisteredWorkflows() {
// Get all registered workflow definitions
return this.workflowManager.getRegisteredWorkflows();
}
}Advanced Features
Iterative Tasks
For tasks that need to run multiple times (e.g., processing a list):
@Workflow({
name: 'process-users',
firstTask: 'getUsers',
})
export class ProcessUsersWorkflow {
@Task({ name: 'Get users' })
async getUsers() {
const users = await this.fetchUsers();
return {
state: { users },
nextTask: 'processUser',
} satisfies TaskResult<this>;
}
@Task({ name: 'Process user' })
async processUser(state: StateAfter<'getUsers'> & Partial<{ currentIndex: number }>) {
const user = state.users[state.currentIndex ?? 0];
if (!user) {
// No more users, finish
return { state, nextTask: 'FINISHED' } satisfies TaskResult<this>;
}
// Process user
await this.process(user);
// Increment index for next iteration
state.currentIndex = (state.currentIndex ?? 0) + 1;
// Continue to next user
return {
state,
nextTask: 'processUser', // Loop back to itself
progression: {
value: state.currentIndex,
total: state.users.length,
},
} satisfies TaskResult<this>;
}
}
type StateAfter<T extends keyof ProcessUsersWorkflow> = WorkflowStateAfter<
ProcessUsersWorkflow,
T
>;Progress Tracking
Track workflow progress in real-time:
@Task({ name: 'Process items' })
async processItems(state) {
const completed = await this.process();
return {
state: { ...state, completed },
nextTask: 'cleanup',
progression: {
value: completed,
total: state.totalItems,
},
message: `Processed ${completed}/${state.totalItems} items`,
};
}Error Handling
Tasks can report errors while still succeeding:
@Task({ name: 'Send notifications' })
async sendNotifications(state) {
const errors: string[] = [];
for (const user of state.users) {
try {
await this.sendEmail(user);
} catch (error) {
errors.push(`Failed for user ${user.id}: ${error.message}`);
}
}
return {
state: { ...state, errors },
nextTask: 'FINISHED',
error: errors.length > 0 ? errors.join('; ') : undefined,
};
}Task Timeouts
Configure timeouts to prevent tasks from hanging indefinitely:
@Workflow({
name: 'api-workflow',
firstTask: 'fetchData',
taskTimeout: 30000, // Default 30s timeout for all tasks
})
export class ApiWorkflow {
@Task({ name: 'Fetch data' })
async fetchData() {
// This task will timeout after 30s (workflow default)
const data = await this.fetchFromAPI();
return { state: { data }, nextTask: 'process' };
}
@Task({ name: 'Process', timeout: 60000 })
async process(state) {
// This task will timeout after 60s (task-level override)
const result = await this.heavyProcessing(state.data);
return { state: { ...state, result }, nextTask: 'FINISHED' };
}
}Timeout Priority:
- Task-level
timeout(highest priority) - Workflow-level
taskTimeout - Default: 20 seconds
Important: taskTimeout is the timeout for EACH individual task, not the entire workflow. A workflow can run for hours if it has many tasks.
Non-Blocking Retries
Retries are non-blocking and scheduled using timestamps. When a task fails, the workflow is marked for retry after 5 seconds without blocking other workflows:
@Workflow({
name: 'resilient-workflow',
firstTask: 'unreliableTask',
maxRetries: 5, // Retry up to 5 times
})
export class ResilientWorkflow {
@Task({ name: 'Unreliable task', maxRetries: 10 })
async unreliableTask() {
// This task will retry up to 10 times (overrides workflow maxRetries)
const result = await this.callUnreliableAPI();
return { state: { result }, nextTask: 'FINISHED' };
}
}Retries wait 5 seconds between attempts without blocking the orchestrator, allowing other workflows to be processed during the wait period.
Graceful Shutdown
The workflow orchestrator supports graceful shutdown. When receiving a shutdown signal (SIGTERM, SIGINT), the system:
- Stops accepting new workflows
- Waits up to 30 seconds for running workflows to finish
- Continues heartbeat updates for running workflows during shutdown
- After timeout, exits and lets other instances recover via stale detection
To enable graceful shutdown, add this to your main.ts:
import { NestFactory } from '@nestjs/core';
import { AppModule } from './app.module';
async function bootstrap() {
const app = await NestFactory.create(AppModule);
// REQUIRED for graceful shutdown
app.enableShutdownHooks();
await app.listen(3000);
}
bootstrap();Manual Triggers
Trigger workflows programmatically:
import { WorkflowManager } from '@betternest/workflows';
@Injectable()
export class ReportService {
constructor(private readonly workflowManager: WorkflowManager) {}
async generateReport() {
// Trigger a workflow execution
const workflow = await this.workflowManager.triggerWorkflow(
'generate-report-workflow',
);
return { workflowId: workflow._id };
}
}Note: The workflow's first task will receive an empty state {}. To pass initial data, add it in the first task by fetching from a service or database.
Cron Scheduling
Schedule workflows with cron expressions:
@Workflow({
name: 'backup-workflow',
schedule: '0 2 * * *', // Daily at 2 AM
firstTask: 'backup',
})Timezone Support: Configure timezone via module options (defaults to 'UTC').
Configuration
Module Options
Important: MongoDB connection is configured separately via MongooseModule.forRoot(). The BetterWorkflowsModule only needs timing and collection configuration.
BetterWorkflowsModule.forRoot({
namespace?: string; // Multi-tenant isolation namespace (default: 'default')
connectionName?: string; // MongoDB connection name (from MongooseModule)
workflowrunsCollectionName?: string; // Collection name for workflows (no default)
timezone?: string; // Timezone for cron scheduling (default: 'UTC')
beatIntervalMs?: number; // Workflow processing interval (default: 2000)
heartbeatIntervalMs?: number; // Heartbeat update interval (default: 5000)
heartbeatTimeoutMs?: number; // Stale detection threshold (default: 10000)
retryDelayMs?: number; // Delay before retry (default: 5000)
shutdownTimeoutMs?: number; // Graceful shutdown timeout (default: 30000)
})
// Or async configuration
BetterWorkflowsModule.forRootAsync({
imports: [...],
inject: [...],
useFactory: (...deps) => ({
connectionName: 'workflows',
workflowrunsCollectionName: 'workflowRuns',
timezone: 'Europe/Paris',
beatIntervalMs: 2000,
// ... other options
}),
})Feature Module Pattern
Register workflows with their dependencies:
BetterWorkflowsModule.forFeature({
imports: [DataModule, EmailModule], // Modules that workflows depend on
workflows: [MyWorkflow, AnotherWorkflow],
})Using with @betternest/config
import { ConfigModule } from '@betternest/config';
import { IsString, IsNumber, IsOptional } from 'class-validator';
export class WorkflowsConfigModel {
@IsString()
@IsOptional()
workflowrunsCollectionName?: string = 'workflowRuns';
@IsString()
@IsOptional()
timezone?: string = 'UTC';
@IsNumber()
@IsOptional()
beatIntervalMs?: number = 2000;
@IsNumber()
@IsOptional()
heartbeatIntervalMs?: number = 5000;
@IsNumber()
@IsOptional()
heartbeatTimeoutMs?: number = 10000;
@IsNumber()
@IsOptional()
retryDelayMs?: number = 5000;
@IsNumber()
@IsOptional()
shutdownTimeoutMs?: number = 30000;
}
@Module({
imports: [
// MongoDB connection
MongooseModule.forRoot('mongodb://localhost:27017/workflows', {
connectionName: 'workflows',
}),
// Config
ConfigModule.forRoot(WorkflowsConfigModel, WorkflowsConfigValues),
// Workflows
BetterWorkflowsModule.forRootAsync({
imports: [ConfigModule],
useFactory: (config: WorkflowsConfigModel) => ({
connectionName: 'workflows',
...config,
}),
inject: [WorkflowsConfigModel],
}),
],
})
export class AppModule {}Multi-Tenant Setup
If you need to share a MongoDB collection between multiple projects or applications, use the namespace option to isolate workflows:
Basic Namespace Configuration
Project 1:
BetterWorkflowsModule.forRoot({
namespace: 'project1', // Unique namespace for this project
workflowrunsCollectionName: 'shared-workflows',
})Project 2:
BetterWorkflowsModule.forRoot({
namespace: 'project2', // Different namespace
workflowrunsCollectionName: 'shared-workflows',
})Each project will only see and execute workflows from its own namespace. This provides complete isolation while using the same MongoDB collection.
With Async Configuration
BetterWorkflowsModule.forRootAsync({
imports: [ConfigModule],
useFactory: (config: ConfigService) => ({
namespace: config.get('WORKFLOW_NAMESPACE'), // e.g., 'project1'
workflowrunsCollectionName: 'shared-workflows',
}),
inject: [ConfigService],
})Default Namespace
If you don't specify a namespace, it defaults to 'default'. This ensures backward compatibility with existing code.
How Namespace Isolation Works
All workflow operations are automatically scoped to the configured namespace:
- ✅ Orchestration:
processWorkflows()only picks up workflows from its namespace - ✅ Creation:
triggerWorkflow()creates workflows in its namespace - ✅ Queries:
getWorkflowDocument(),listWorkflowDocuments()only return workflows from its namespace - ✅ Cleanup:
cleanupOldWorkflows()only deletes workflows from its namespace - ✅ Scheduling: Cron jobs create workflows in their namespace
Migrating Existing Workflows
If you're upgrading from a version without namespace support, existing workflows will need migration:
import { migrateWorkflowsToNamespace, countWorkflowsWithoutNamespace } from '@betternest/workflows';
import { Model } from 'mongoose';
import { WorkflowRun } from '@betternest/workflows';
async function bootstrap() {
const app = await NestFactory.create(AppModule);
// Get the WorkflowRun model
const workflowRunModel = app.get<Model<WorkflowRun<string>>>('WORKFLOW_RUN_MODEL');
// Check if migration is needed
const count = await countWorkflowsWithoutNamespace(workflowRunModel);
if (count > 0) {
console.log(`Migrating ${count} workflows to default namespace...`);
const migratedCount = await migrateWorkflowsToNamespace(workflowRunModel);
console.log(`Migrated ${migratedCount} workflows successfully`);
}
await app.listen(3000);
}Note: Migration is only needed once when upgrading. New installations automatically use the 'default' namespace.
Comparison with BullMQ
| Feature | @betternest/workflows | BullMQ | |---------|----------------------|---------| | Database | MongoDB | Redis | | Type-Safety | ✅ Full | ⚠️ Manual | | State Management | ✅ Built-in | ❌ Manual | | Cron Scheduling | ✅ Built-in | ✅ Via Bull | | Progress Tracking | ✅ Built-in | ✅ Built-in | | Retry Logic | ✅ Built-in | ✅ Built-in | | UI Dashboard | ❌ Not yet | ✅ Bull Board | | Maturity | 🆕 New | ✅ Mature |
When to use @betternest/workflows:
- ✅ You already use MongoDB
- ✅ You want type-safe state management
- ✅ You prefer decorator-based workflows
- ✅ You need stateful multi-step workflows
When to use BullMQ:
- ✅ You already use Redis
- ✅ You need extreme performance (millions of jobs)
- ✅ You want a mature dashboard UI
- ✅ You need job priorities and delays
Best Practices
1. Keep Tasks Idempotent
Tasks should be safe to retry:
// ✅ Good: Idempotent
@Task({ name: 'Update user' })
async updateUser(state) {
await this.db.users.updateOne(
{ id: state.userId },
{ $set: { processed: true } }
);
}
// ❌ Bad: Not idempotent
@Task({ name: 'Increment counter' })
async incrementCounter(state) {
await this.db.counters.updateOne(
{ id: 'main' },
{ $inc: { count: 1 } } // Will increment multiple times on retry!
);
}2. Use Small, Focused Tasks
// ✅ Good: Small, focused tasks
@Task({ name: 'Fetch data' })
async fetchData() { ... }
@Task({ name: 'Transform data' })
async transformData(state) { ... }
@Task({ name: 'Save data' })
async saveData(state) { ... }
// ❌ Bad: One giant task
@Task({ name: 'Do everything' })
async doEverything() {
const data = await this.fetch();
const transformed = this.transform(data);
await this.save(transformed);
}3. Always Use satisfies TaskResult<this>
Get type-safety and auto-completion:
@Task({ name: 'My task' })
async myTask(state) {
return {
state: { userId: 123 },
nextTask: 'nextTask',
} satisfies TaskResult<this>; // ✅ Type-safe
}4. Handle Errors Gracefully
@Task({ name: 'Send emails' })
async sendEmails(state) {
const errors = [];
for (const email of state.emails) {
try {
await this.send(email);
} catch (error) {
errors.push(error.message);
}
}
return {
state: { ...state, errors },
nextTask: 'FINISHED',
error: errors.length > 0 ? errors.join('; ') : undefined,
};
}5. ⚠️ Never Use Instance Variables
TypeScript now prevents you from adding instance variables via the WorkflowConstructor type. If you try to add a property like private counter = 0;, you'll get a compilation error.
// ❌ TypeScript ERROR: Instance variables are forbidden
@Workflow({ name: 'bad-workflow', firstTask: 'task1' })
export class BadWorkflow {
private counter = 0; // ❌ TS Error: Incompatible with WorkflowConstructor
private static cache = new Map(); // ❌ TS Error
private results: any[] = []; // ❌ TS Error
@Task({ name: 'Task 1' })
async task1() {
this.counter++; // Won't compile!
return { state: {}, nextTask: 'FINISHED' };
}
}
// ✅ CORRECT: Use state instead
@Workflow({ name: 'good-workflow', firstTask: 'task1' })
export class GoodWorkflow {
@Task({ name: 'Task 1' })
async task1() {
return {
state: { counter: 1 }, // ✅ Isolated per execution
nextTask: 'task2',
} satisfies TaskResult<this>;
}
@Task({ name: 'Task 2' })
async task2(state: StateAfter<'task1'>) {
return {
state: { ...state, counter: state.counter + 1 },
nextTask: 'FINISHED',
} satisfies TaskResult<this>;
}
}
type StateAfter<T extends keyof GoodWorkflow> = WorkflowStateAfter<
GoodWorkflow,
T
>;Why this matters:
- Workflow class is a singleton (instantiated once at startup)
- Instance variables create race conditions in concurrent executions
- Leads to bugs that are extremely difficult to debug
- Not thread-safe
What you CAN use:
- ✅ Constructor dependency injection (NestJS services)
- ✅ Read-only constants
- ❌ Mutable instance variables
Documentation
📚 Core Documentation
- TYPESCRIPT-PATTERNS.md - ⚠️ CRITICAL - TypeScript patterns for type-safe workflows
- TESTING-STRATEGY.md - Testing strategy & validation approach
- DISTRIBUTED-VALIDATION.md - Manual validation guide for distributed behaviors
🚀 Getting Started
- examples/README.md - Quick start guide for examples & validation
- examples/basic-workflow/ - Complete workflow example with dependencies
⚠️ Before You Code
READ THIS FIRST: TYPESCRIPT-PATTERNS.md
This document explains the critical TypeScript patterns you must follow to ensure type-safety:
- ✅ How to correctly type task parameters
- ✅ When to use
satisfies(and why neveras) - ✅ How to handle self-referencing tasks
- ❌ Common mistakes that break type-safety
Not following these patterns will lead to runtime errors!
Examples
See the examples directory for complete working examples:
- Basic Workflow - Simple 3-step workflow
- Iterative Workflow - Process a list of items
- Scheduled Workflow - Cron-based execution
- Error Handling - Graceful error management
Troubleshooting
Workflow not starting
- Check that workflow is registered in module providers
- Verify MongoDB connection
- Check cron expression syntax
Tasks not executing
- Verify task names match exactly
- Check
firstTaskpoints to existing task - Review logs for errors
State not persisting
- Ensure you return updated state:
{ ...state, newData } - Check
nextTaskis set correctly - Verify MongoDB write permissions
Contributing
Contributions are welcome! Please see CONTRIBUTING.md.
License
MIT © Mathieu Colmon
Related Packages
- @betternest/config - Type-safe configuration
- @betternest/health - Auto-discovery health checks
Part of the BetterNest ecosystem - Production-proven patterns for NestJS applications.
