type-pipelines
v1.2.0
Published
A lightweight TypeScript library for creating complex pipelines and workflows
Readme
PipelineTS
PipelineTS is a lightweight TypeScript library designed to facilitate the creation of complex pipelines and workflows. It offers a flexible and extensible framework to define, manage, and execute multi-step processes with features such as conditional branching, parallel execution, and shared data storage.
Features
- Multi-Step Pipeline Creation: Define pipelines comprising multiple sequential steps, each performing a specific action.
- Conditional Branching: Implement decision-making within pipelines, allowing different execution paths based on specified conditions.
- Parallel Execution: Execute multiple steps or entire pipelines concurrently to enhance performance and efficiency.
- Shared Data Store: Maintain a centralized data store to facilitate seamless communication and data sharing between steps.
- Error Handling and Retry Mechanism: Incorporate robust error handling with configurable retry options to manage failures gracefully.
- Extensible Logging System: Utilize a customizable logging system to monitor pipeline execution and debug processes effectively.
Installation
Install the package using npm:
npm install type-pipelinesUsage Example
import { createPipeline, createStep, ConsoleLogger, StepInput } from 'type-pipelines';
import { createConditionalStep } from 'type-pipelines/conditional';
async function runExample() {
const logger = new ConsoleLogger();
const pipeline = createPipeline({ name: 'Example Pipeline', logger });
pipeline
.addStep({
name: 'Fetch Data',
action: async () => {
logger.info('Fetching data...');
return { data: 'Some fetched data' };
}
})
.addStep({
name: 'Process Data',
action: async (input) => {
logger.info('Processing data...');
return { ...input, processed: true };
}
});
const conditionalStep = createConditionalStep({
condition: (input) => input.processed,
trueStep: createStep({
name: 'Save Processed Data',
action: async (input) => {
logger.info('Saving processed data...');
return { ...input, saved: true };
}
}),
falseStep: createStep({
name: 'Handle Unprocessed Data',
action: async (input) => {
logger.info('Handling unprocessed data...');
return { ...input, handled: true };
}
})
});
pipeline.addStep(conditionalStep as StepInput);
pipeline.addStep({
name: 'Unreliable Operation',
action: async (input) => {
if (Math.random() < 0.7) {
throw new Error('Random failure');
}
return { ...input, unreliableResult: 'Success' };
},
retryOptions: {
maxRetries: 3,
delay: 1000
}
});
try {
const result = await pipeline.execute({});
logger.info('Pipeline execution completed:', result);
// Example of parallel execution
const parallelInputs = [{ id: 1 }, { id: 2 }, { id: 3 }];
const parallelResults = await pipeline.executeParallel(parallelInputs);
logger.info('Parallel execution completed:', parallelResults);
} catch (error) {
logger.error('Pipeline execution failed:', error as Error);
}
}
runExample();Detailed Feature Overview
1. Multi-Step Pipeline Creation
PipelineTS allows you to define a series of steps that execute sequentially. Each step performs a specific action, and the output of one step can serve as the input to the next, enabling streamlined data processing workflows.
Example:
pipeline
.addStep({
name: 'Step 1',
action: async () => {
// Perform action for Step 1
return { result: 'Output of Step 1' };
}
})
.addStep({
name: 'Step 2',
action: async (input) => {
// Use input from Step 1
return { ...input, additionalData: 'Processed in Step 2' };
}
});2. Conditional Branching
In scenarios where decisions are required based on certain conditions, PipelineTS supports conditional branching. This feature enables the pipeline to choose different execution paths, enhancing flexibility in workflow management.
Example:
const conditionalStep = createConditionalStep({
condition: (input) => input.shouldProcess,
trueStep: createStep({
name: 'Process Data',
action: async (input) => {
// Process the data
return { ...input, processed: true };
}
}),
falseStep: createStep({
name: 'Skip Processing',
action: async (input) => {
// Skip processing
return { ...input, processed: false };
}
})
});
pipeline.addStep(conditionalStep as StepInput);3. Parallel Execution
PipelineTS enhances performance by allowing the concurrent execution of multiple steps or entire pipelines. This is particularly beneficial when dealing with independent tasks that can be processed simultaneously.
Example:
const parallelInputs = [{ id: 1 }, { id: 2 }, { id: 3 }];
const parallelResults = await pipeline.executeParallel(parallelInputs);4. Shared Data Store
A centralized data store facilitates seamless data sharing and communication between different steps in the pipeline, ensuring consistency and reducing complexity in data management.
Example:
pipeline
.addStep({
name: 'Initialize Data',
action: async () => {
return { sharedData: 'Initial Value' };
}
})
.addStep({
name: 'Modify Data',
action: async (input) => {
// Access and modify shared data
return { ...input, sharedData: 'Modified Value' };
}
});5. Error Handling and Retry Mechanism
To ensure robustness, PipelineTS includes error handling capabilities with configurable retry mechanisms. This allows the pipeline to gracefully handle failures and attempt retries based on defined parameters.
Example:
pipeline.addStep({
name: 'Unreliable Operation',
action: async (input) => {
if (Math.random() < 0.7) {
throw new Error('Random failure');
}
return { ...input, result: 'Success' };
},
retryOptions: {
maxRetries: 3,
delay: 1000
}
});6. Extensible Logging System
Monitoring and debugging are facilitated by an extensible logging system, allowing developers to integrate custom logging mechanisms to track pipeline execution and diagnose issues effectively.
Example:
const logger = new CustomLogger();
const pipeline = createPipeline({ name: 'Example Pipeline', logger });Feedback
We value your feedback! If you have any questions, suggestions, or issues, feel free to open an issue on our GitHub repository or contribute to the project.
Try out PipelineTS today and simplify your workflow automation!
