@iota-big3/sdk-workflows
v2.0.0
Published
Workflow automation and orchestration for IOTA Big3 SDK
Readme
@iota-big3/sdk-workflows
Enhanced workflow automation and orchestration for IOTA Big3 SDK
Features
- 🔄 DAG Support: Define complex workflows with dependencies
- ⚡ Parallel Execution: Run multiple steps concurrently
- 🌐 Conditional Logic: Branch workflows based on conditions
- 🔁 Retry Policies: Automatic retry with exponential backoff
- 📊 Event Emission: Monitor workflow lifecycle events
- 🛡️ Error Handling: Graceful error recovery and custom handlers
- ⏱️ Timeouts: Step and workflow-level timeout support
- 📦 Type Safety: Full TypeScript support with comprehensive types
Installation
npm install @iota-big3/sdk-workflowsQuick Start
import {
WorkflowEngine,
WorkflowConfig,
StepType,
} from "@iota-big3/sdk-workflows";
// Create workflow engine
const engine = new WorkflowEngine({
name: "My Workflow Engine",
timeout: 30000,
});
// Define workflow
const workflow: WorkflowConfig = {
id: "data-pipeline",
name: "Data Processing Pipeline",
steps: [
{
id: "fetch",
name: "Fetch Data",
type: StepType.TASK,
execute: async () => {
const data = await fetchData();
return { data };
},
},
{
id: "process",
name: "Process Data",
type: StepType.TASK,
dependencies: ["fetch"],
execute: async (context) => {
const processed = await processData(context.data);
return { processed };
},
},
],
};
// Register and execute
await engine.registerWorkflow(workflow);
const result = await engine.executeWorkflow("data-pipeline");Step Types
Task Step
Basic unit of work in a workflow.
{
id: 'my-task',
name: 'My Task',
type: StepType.TASK,
execute: async (context) => {
// Your task logic
return { result: 'done' };
}
}Parallel Step
Execute multiple steps concurrently.
{
id: 'parallel-work',
name: 'Parallel Processing',
type: StepType.PARALLEL,
steps: [
{ id: 'task1', type: StepType.TASK, execute: async () => {...} },
{ id: 'task2', type: StepType.TASK, execute: async () => {...} },
{ id: 'task3', type: StepType.TASK, execute: async () => {...} }
]
}Conditional Step
Branch execution based on conditions.
{
id: 'decision',
name: 'Make Decision',
type: StepType.CONDITIONAL,
condition: (context) => context.value > 100,
thenStep: {
id: 'high-value',
type: StepType.TASK,
execute: async () => ({ path: 'high' })
},
elseStep: {
id: 'low-value',
type: StepType.TASK,
execute: async () => ({ path: 'low' })
}
}Dependencies
Define execution order with dependencies:
{
id: 'step3',
name: 'Final Step',
type: StepType.TASK,
dependencies: ['step1', 'step2'], // Waits for both
execute: async () => ({ completed: true })
}Retry Policies
Configure automatic retry for unreliable operations:
{
id: 'api-call',
name: 'Call External API',
type: StepType.TASK,
retryPolicy: {
maxAttempts: 3,
backoffMs: 1000,
backoffMultiplier: 2,
maxBackoffMs: 10000
},
execute: async () => {
const response = await callAPI();
return response;
}
}Event Monitoring
Monitor workflow execution with events:
engine.on(EventType.WORKFLOW_STARTED, (data) => {
console.log("Workflow started:", data.workflowId);
});
engine.on(EventType.STEP_COMPLETED, (data) => {
console.log("Step completed:", data.stepId);
});
engine.on(EventType.WORKFLOW_FAILED, (data) => {
console.log("Workflow failed:", data.error);
});Error Handling
Handle errors gracefully with custom handlers:
{
id: 'risky-operation',
name: 'Risky Operation',
type: StepType.TASK,
onError: (error) => {
// Custom error handling
console.error('Operation failed:', error);
return { fallback: true, error: error.message };
},
execute: async () => {
// May throw error
return riskyOperation();
}
}Persistence
The SDK now includes workflow persistence capabilities for saving, loading, and versioning workflows.
Serialization
import { WorkflowSerializer } from "@iota-big3/sdk-workflows";
// Serialize a workflow
const serialized = WorkflowSerializer.serialize(workflow);
// Export as YAML
const yaml = WorkflowSerializer.toYAML(workflow);
// Export as self-contained module
const moduleCode = WorkflowSerializer.exportAsModule(workflow);Storage
import {
InMemoryWorkflowStorage,
FileSystemWorkflowStorage,
} from "@iota-big3/sdk-workflows";
// In-memory storage
const memoryStorage = new InMemoryWorkflowStorage();
await memoryStorage.save(serializedWorkflow);
const loaded = await memoryStorage.load("workflow-id");
// File system storage
const fsStorage = new FileSystemWorkflowStorage("./workflows");
await fsStorage.save(serializedWorkflow);
const workflows = await fsStorage.list();CI/CD Integration
The package includes a validation script for CI/CD pipelines:
# Run CI validation
npm run test:ci
# Run validation directly
node scripts/validate-ci.jsThe CI validation tests:
- Basic engine functionality
- Workflow registration and execution
- DAG dependency resolution
- Parallel execution performance
- Error handling and retry mechanisms
- Event system
- Memory usage
Testing
Jest Configuration
The package uses Jest for testing. Due to TypeScript enum handling in ts-jest, we recommend:
- Using the provided
tsconfig.test.jsonfor test-specific configuration - Running the CI validation script for comprehensive testing
- Using CommonJS imports in tests if you encounter import issues
// If you encounter import issues in tests, use:
const { WorkflowEngine, StepType } = require("@iota-big3/sdk-workflows");Running Tests
# Unit tests
npm test
# CI validation
npm run test:ci
# Build and validate
npm run build && npm run validateExamples
The package includes comprehensive examples in the examples/ directory:
advanced-workflows.ts- DAG, parallel, conditional, and retry examplespersistence-demo.ts- Workflow serialization and storage examples- More examples demonstrating all features
Run examples:
# TypeScript examples
npx ts-node examples/advanced-workflows.ts
npx ts-node examples/persistence-demo.tsAPI Reference
WorkflowEngine
class WorkflowEngine extends EventEmitter {
constructor(config: WorkflowEngineConfig);
// Workflow management
async registerWorkflow(workflow: WorkflowConfig): Promise<void>;
async getWorkflow(id: string): Promise<WorkflowConfig | undefined>;
async removeWorkflow(id: string): Promise<boolean>;
// Execution
async executeWorkflow(
workflowId: string,
input?: any
): Promise<ExecutionResult>;
}Types
interface WorkflowConfig {
id: string;
name: string;
version?: string;
description?: string;
timeout?: number;
steps: WorkflowStep[];
}
interface WorkflowStep {
id: string;
name: string;
type: StepType;
dependencies?: string[];
execute?: (context: any) => Promise<any>;
onError?: (error: Error) => any;
retryPolicy?: RetryPolicy;
timeout?: number;
// Additional properties for specific step types
}
interface ExecutionResult {
executionId: string;
workflowId: string;
status: WorkflowStatus;
success: boolean;
startTime: Date;
endTime?: Date;
duration?: number;
output?: any;
error?: string;
}Documentation
See https://docs.iota-big3.com for full documentation.
License
MIT
