@egintegrations/workflow
v0.1.0
Published
Workflow orchestration library with state machines, queue services, and ETL framework for building complex workflows and background job processing.
Maintainers
Readme
@egintegrations/workflow
Workflow orchestration library with state machines, queue services, and ETL framework for building complex workflows and background job processing.
Features
- State Machine: Generic state machine with guards, actions, and history tracking
- Queue Service: Background job queue with priorities, retries, and concurrency control
- ETL Framework: Extract-Transform-Load pipelines with batch processing and progress tracking
Installation
npm install @egintegrations/workflowUsage
State Machine
Create workflows with state transitions, guards, and actions:
import { StateMachine } from '@egintegrations/workflow';
enum OrderState {
DRAFT = 'DRAFT',
SUBMITTED = 'SUBMITTED',
APPROVED = 'APPROVED',
REJECTED = 'REJECTED',
}
enum OrderEvent {
SUBMIT = 'SUBMIT',
APPROVE = 'APPROVE',
REJECT = 'REJECT',
}
const orderMachine = new StateMachine({
initialState: OrderState.DRAFT,
transitions: [
{ from: OrderState.DRAFT, to: OrderState.SUBMITTED, event: OrderEvent.SUBMIT },
{
from: OrderState.SUBMITTED,
to: OrderState.APPROVED,
event: OrderEvent.APPROVE,
guard: (ctx) => ctx.amount < 10000, // Only auto-approve small orders
action: (from, to, event, ctx) => {
console.log(`Order ${ctx.orderId} approved`);
},
},
{ from: OrderState.SUBMITTED, to: OrderState.REJECTED, event: OrderEvent.REJECT },
],
context: { orderId: '123', amount: 5000 },
enableHistory: true,
onTransition: (from, to, event, ctx) => {
console.log(`Transitioned from ${from} to ${to}`);
},
});
// Check if transition is possible
if (orderMachine.can(OrderEvent.SUBMIT)) {
await orderMachine.transition(OrderEvent.SUBMIT);
}
console.log(orderMachine.getCurrentState()); // SUBMITTED
console.log(orderMachine.getHistory()); // Array of state transitionsState Machine Features
- Generic Types: Type-safe states, events, and context
- Guards: Conditional transitions based on context
- Actions: Execute code during transitions
- History: Track all state transitions with timestamps
- Async Support: Guards and actions can be async
- Context Management: Store and update workflow data
Queue Service
Process background jobs with priorities, retries, and concurrency control:
import { InMemoryQueue } from '@egintegrations/workflow';
const queue = new InMemoryQueue({
pollInterval: 1000, // Check for jobs every second
concurrency: 5, // Process up to 5 jobs concurrently
enableDeadLetter: true, // Move failed jobs to dead letter queue
});
// Register job handler
queue.process<{ email: string; subject: string }>('send-email', async (job) => {
console.log(`Sending email to ${job.data.email}`);
await sendEmail(job.data.email, job.data.subject);
});
// Enqueue jobs
const jobId = await queue.enqueue(
'send-email',
{
email: '[email protected]',
subject: 'Welcome!',
},
{
priority: 10, // Higher priority jobs run first
maxRetries: 3, // Retry up to 3 times on failure
}
);
// Check job status
const job = await queue.getStatus(jobId);
console.log(job?.status); // PENDING, PROCESSING, COMPLETED, FAILED, CANCELLED
// Manage jobs
await queue.cancel(jobId); // Cancel a pending job
await queue.retry(failedJobId); // Retry a failed job
await queue.clearCompleted(); // Remove completed jobs
await queue.stop(); // Stop processing jobsQueue Features
- Priority Queuing: Higher priority jobs run first
- Automatic Retries: Configurable retry attempts with exponential backoff
- Concurrency Control: Limit concurrent job execution
- Job Management: Cancel, retry, and query jobs
- Status Tracking: Track job lifecycle (PENDING → PROCESSING → COMPLETED/FAILED)
- Dead Letter Queue: Failed jobs moved to separate queue
- Type Safety: Typed job data and handlers
ETL Pipeline
Build data extraction, transformation, and loading pipelines:
import {
ETLPipeline,
ArrayExtractor,
FunctionTransformer,
ArrayLoader,
} from '@egintegrations/workflow';
interface User {
id: number;
name: string;
email: string;
age: number;
}
interface TransformedUser {
userId: number;
displayName: string;
contact: string;
}
const sourceData: User[] = [
{ id: 1, name: 'Alice', email: '[email protected]', age: 30 },
{ id: 2, name: 'Bob', email: '[email protected]', age: 25 },
];
const pipeline = new ETLPipeline({
name: 'user-import',
extractor: new ArrayExtractor(sourceData),
transformer: new FunctionTransformer<User, TransformedUser>((user) => ({
userId: user.id,
displayName: user.name.toUpperCase(),
contact: user.email,
})),
loader: new ArrayLoader<TransformedUser>(),
batchSize: 100, // Process in batches of 100
onProgress: (stats) => {
console.log(`Loaded ${stats.loaded}/${stats.extracted} records`);
},
onError: (error, record) => {
console.error(`Failed to process record:`, error);
},
});
const result = await pipeline.run();
console.log(result);
// {
// pipelineName: 'user-import',
// status: 'completed',
// extracted: 2,
// transformed: 2,
// filtered: 0,
// loaded: 2,
// failed: 0,
// duration: 15,
// startTime: Date,
// endTime: Date
// }Built-in Extractors
- ArrayExtractor: Extract from in-memory array
- JSONFileExtractor: Extract from JSON file
- FunctionExtractor: Extract using custom function
Built-in Transformers
- IdentityTransformer: Pass data through unchanged
- FunctionTransformer: Transform using custom function
- FilterTransformer: Filter records based on predicate
- MapTransformer: Map fields from input to output
- ComposeTransformer: Chain multiple transformers
Built-in Loaders
- ArrayLoader: Load into in-memory array
- JSONFileLoader: Load into JSON file
- ConsoleLoader: Log records to console (debugging)
- FunctionLoader: Load using custom function
- BatchCallbackLoader: Execute callback for each batch
Custom ETL Components
Create custom extractors, transformers, and loaders:
import { Extractor, Transformer, Loader } from '@egintegrations/workflow';
// Custom extractor
class DatabaseExtractor implements Extractor<User> {
async extract(): Promise<User[]> {
return db.query('SELECT * FROM users');
}
}
// Custom transformer
class UppercaseNameTransformer implements Transformer<User, User> {
async transform(user: User): Promise<User> {
return { ...user, name: user.name.toUpperCase() };
}
}
// Custom loader
class DatabaseLoader implements Loader<User> {
async load(users: User[]): Promise<number> {
await db.insertMany(users);
return users.length;
}
}API Documentation
StateMachine<S, E, C>
Constructor
new StateMachine({
initialState: S,
transitions: Transition<S, E, C>[],
context?: C,
onTransition?: (from: S, to: S, event: E, context: C) => void | Promise<void>,
enableHistory?: boolean,
})Methods
getCurrentState(): S- Get current stategetContext(): C | undefined- Get contextsetContext(context: C): void- Update contextcan(event: E): boolean- Check if transition is possiblegetPossibleEvents(): E[]- Get all possible events from current statetransition(event: E): Promise<S>- Execute state transitiongetHistory(): StateHistoryEntry<S, E>[]- Get transition historyclearHistory(): void- Clear historyreset(initialState?: S): void- Reset to initial state
InMemoryQueue
Constructor
new InMemoryQueue({
pollInterval?: number, // Default: 1000ms
concurrency?: number, // Default: 5
enableDeadLetter?: boolean, // Default: false
})Methods
enqueue<T>(type: string, data: T, options?: EnqueueOptions): Promise<string>- Add job to queueprocess<T>(type: string, handler: JobHandler<T>): void- Register job handlergetStatus(jobId: string): Promise<Job | null>- Get job statuscancel(jobId: string): Promise<boolean>- Cancel pending jobretry(jobId: string): Promise<boolean>- Retry failed jobgetJobsByStatus(status: JobStatus): Promise<Job[]>- Get jobs by statusclearCompleted(olderThan?: Date): Promise<number>- Clear completed jobsstop(): Promise<void>- Stop processing
ETLPipeline<TExtract, TTransform>
Constructor
new ETLPipeline({
name: string,
extractor: Extractor<TExtract>,
transformer: Transformer<TExtract, TTransform>,
loader: Loader<TTransform>,
batchSize?: number, // Default: 100
onProgress?: (stats: ETLProgress) => void,
onError?: (error: Error, record?: any) => void,
})Methods
run(): Promise<ETLResult>- Execute the ETL pipeline
Source Project
Extracted from FTA:
- State Machine:
backend/app/services/state_machine.py- Generic state machine pattern - Queue Service:
backend/app/services/queue.py- Background job queue pattern - ETL Framework:
backend/app/services/etl.py- Extract-Transform-Load pattern
Refactored from Python to TypeScript with:
- Generic type support
- Pluggable extractors, transformers, and loaders
- Modern async/await patterns
- Comprehensive test coverage
Related Packages
@egintegrations/core-utils- Retry logic, error handling, health checks
License
MIT
Contributing
See the main egi-comp-library repository for contribution guidelines.
