@vitkuz/pipeline-manager
v0.1.0
Published
Functional TypeScript pipeline manager with execution control, retry logic, and cost tracking
Maintainers
Readme
@vitkuz/pipeline-manager
Functional TypeScript pipeline manager with execution control, retry logic, and cost tracking.
Features
- Pipeline Execution - Run tasks in stages (sequential stages, parallel tasks within stage)
- HTTP & CODE Tasks - Support for HTTP requests and custom code handlers
- Retry Logic - Configurable retries with exponential backoff
- Cost Tracking - Track execution costs and savings on resume
- Callbacks -
onSuccessandonErrorcallbacks for notifications - Resume Support - Skip successful tasks on re-run
Installation
npm install @vitkuz/pipeline-managerQuick Start
import { createPipelineManager, TaskType, HandlersMap, PipelineCallback } from '@vitkuz/pipeline-manager';
// Create manager
const manager = createPipelineManager({
filename: 'data/pipelines',
collection: 'pipelines',
saveOnPush: true,
humanReadable: true,
});
// Create pipeline
const pipeline = await manager.createOne({
description: 'My pipeline',
stages: [
// Stage 1: Parallel HTTP tasks
[
{
id: 'fetch-data',
description: 'Fetch data',
type: TaskType.HTTP,
config: {
url: 'https://api.example.com/data',
method: 'GET',
},
retry: {
maxRetries: 3,
backoffMs: 1000,
backoffMultiplier: 2,
},
timeoutMs: 10000,
estimatedCost: 0.01,
},
],
// Stage 2: CODE task
[
{
id: 'process-data',
description: 'Process fetched data',
type: TaskType.CODE,
config: {
handlerName: 'process-data',
},
estimatedCost: 0.02,
},
],
],
});
// Define handlers for CODE tasks
const handlers: HandlersMap = {
'process-data': async (instance) => {
const fetchTask = instance.getTaskById('fetch-data');
return { processed: true, data: fetchTask?.result };
},
};
// Define callbacks
const onSuccess: PipelineCallback = async (pipeline) => {
console.log(`Pipeline ${pipeline.id} completed!`);
// Post to webhook, websocket, etc.
};
const onError: PipelineCallback = async (pipeline) => {
console.error(`Pipeline ${pipeline.id} failed!`);
};
// Execute pipeline
const result = await pipeline.start({ handlers, onSuccess, onError });
console.log(`Status: ${result.data.status}`);
console.log(`Total Cost: $${result.data.totalCost}`);API
createPipelineManager(settings)
Creates a pipeline manager instance.
Settings:
filename- Base path for JSON database filecollection- Collection namesaveOnPush- Auto-save on changes (default: true)humanReadable- Pretty print JSON (default: true)separator- Path separator (default: '/')logger- Optional logger with debug/info/warn/error methods
Returns: PipelineManager with methods:
list(options?)- List all pipelinesgetOne(id)- Get pipeline by IDcreateOne(params)- Create new pipelineupdateOne(id, params)- Update pipelinedeleteOne(id)- Delete pipeline
PipelineInstance
Returned by getOne() and createOne().
Properties:
data- Pipeline data object
Methods:
start(options?)- Start executionrestart(options?)- Reset and startstop()- Stop executionresume(options?)- Resume from last stategetTaskById(id)- Get task by ID
ExecutionOptions
{
handlers?: HandlersMap; // CODE task handlers
onSuccess?: PipelineCallback; // Called on success
onError?: PipelineCallback; // Called on failure
}Task Types
HTTP Task
{
id: 'task-id',
type: TaskType.HTTP,
config: {
url: 'https://api.example.com',
method: 'GET' | 'POST' | 'PUT' | 'DELETE' | 'PATCH',
headers?: Record<string, string>,
body?: unknown,
},
retry?: {
maxRetries: 3,
backoffMs: 1000,
backoffMultiplier: 2,
retryableErrors?: string[],
},
timeoutMs?: 10000,
estimatedCost?: 0.01,
}CODE Task
{
id: 'task-id',
type: TaskType.CODE,
config: {
handlerName: 'my-handler',
params?: unknown,
},
estimatedCost?: 0.02,
}Resume Behavior
When calling resume(), the manager:
- Skips stages where all tasks are successful
- Re-runs failed or pending tasks
- Tracks saved cost from skipped tasks
// First run
await pipeline.start({ handlers });
// Add more tasks...
// ...
// Resume - skips already successful tasks
const result = await pipeline.resume({ handlers });
console.log(`Saved: $${result.data.savedCost}`);License
MIT
