pipeline-core
v1.4.0
Published
A TypeScript library for managing complex data processing pipelines with advanced error handling and parallel processing capabilities
Downloads
51
Maintainers
Readme
Pipeline Core
A TypeScript library for managing complex data processing pipelines with advanced error handling, retry mechanisms, and parallel processing capabilities.
Features
- 🔄 Asynchronous step processing
- 🔁 Advanced retry mechanisms with exponential backoff
- ⚡ Parallel processing with concurrency control
- 🛡️ Comprehensive error handling with custom strategies
- 🔍 Step-by-step execution tracking
- 📊 Event-based monitoring
- 🎯 Type-safe with TypeScript
- 🔒 Infinite loop prevention
- 👥 Worker thread support for heavy processing
- 📈 Detailed performance monitoring
- 🔄 Per-step customizable retry strategies
- 🛑 Graceful shutdown with timeout control
- 📡 System signal integration (SIGTERM/SIGINT)
- 🔄 Controlled resource cleanup
Installation
npm install pipeline-coreBasic Usage
import {
PipelineService,
PipelineConfig,
ErrorActionType,
} from "pipeline-core";
// Define pipeline steps
type PipelineSteps = "step1" | "step2" | "step3";
// Configure the pipeline
const config: PipelineConfig<PipelineSteps> = {
steps: [
{
name: "step1",
handler: async (data) => {
// Process data
return { ...data, processed: true };
},
errorHandlers: {
onError: async (error, context) => ({
type: ErrorActionType.RETRY,
maxRetries: 3,
}),
},
},
// Add more steps...
],
options: {
maxConcurrentWorkers: 5,
retryStrategy: {
maxRetries: 3,
backoffMs: 1000,
},
},
};
// Create pipeline instance
const pipeline = new PipelineService<PipelineSteps, YourDataType>(config);
// Execute pipeline
const result = await pipeline.execute({
data: yourData,
currentStep: "step1",
});Advanced Features
Error Handling
The library provides sophisticated error handling capabilities:
{
errorHandlers: {
onError: async (error, context) => ({
type: ErrorActionType.RETRY, // or CONTINUE, STOP, CUSTOM
maxRetries: 3
}),
onRetry: async (context) => {
// Custom retry logic
},
onContinue: async (context) => {
// Custom continue logic
},
onStop: async (context) => {
// Custom stop logic
}
}
}Event Monitoring
Monitor pipeline execution with event listeners:
pipeline.onEvent((event) => {
switch (event.type) {
case "ERROR":
console.error(`Error in step ${event.step}:`, event.error);
break;
// Handle other event types...
}
});Parallel Processing
Process multiple items in parallel with controlled concurrency:
const results = await pipeline.execute([
{ data: item1, currentStep: "step1" },
{ data: item2, currentStep: "step1" },
]);Worker Threads
Execute heavy processing in separate threads:
const config: PipelineConfig<"step1"> = {
steps: [
{
name: "step1",
handler: "./worker.js", // Path to worker file
},
],
};Handling External Dependencies in Workers
When using function handlers in your pipeline steps, it's important to note that the worker_threads module requires all code to be serialized into a file before execution. This means that any external dependencies need to be explicitly required within the handler function:
const config: PipelineConfig<"step1"> = {
steps: [
{
name: "step1",
handler: async (data) => {
// External dependencies must be required inside the handler
const axios = require("axios");
const response = await axios.get("https://api.example.com/data");
return { ...data, apiData: response.data };
},
},
],
};This approach is necessary because:
- The worker thread runs in an isolated context
- The handler function is serialized into a temporary file
- Dependencies need to be explicitly loaded within the worker's context
- The worker needs to have access to all required modules at runtime
Handling Internal Module Imports
When importing internal modules (files from your project), you need to handle the default exports correctly. Here's an example:
// requester.ts
import axios from "axios";
const requester = async () => {
const response = await axios.get("https://api.example.com/data");
return response;
};
export default requester;
// pipeline.ts
const config: PipelineConfig<"step1"> = {
steps: [
{
name: "step1",
handler: async (data) => {
// Access the default export using .default
const requester = require("./requester").default;
const response = await requester();
return { ...data, apiData: response.data };
},
},
],
};Note that when using require() with ES modules that have default exports, you need to access the default property to get the exported function or value.
Graceful Shutdown
The pipeline supports graceful shutdown with timeout control and system signal integration:
// Basic shutdown with default timeout (30 seconds)
await pipeline.shutdown();
// Shutdown with custom timeout
await pipeline.shutdown(10000); // 10 seconds
// Shutdown with callbacks
await pipeline.shutdown({
timeout: 15000,
onShutdownStart: () => console.log("🔄 Shutdown started..."),
onShutdownComplete: () => console.log("✅ Shutdown completed"),
onTimeout: () => console.log("⏰ Timeout reached, forcing shutdown"),
});
// Check pipeline state
if (pipeline.isShuttingDown()) {
console.log("Pipeline is shutting down...");
}
// Wait for all executions to complete
await pipeline.waitForCompletion();System Signal Integration
// Handle SIGTERM (graceful shutdown)
process.on("SIGTERM", async () => {
console.log("📡 Received SIGTERM, initiating graceful shutdown...");
try {
await pipeline.shutdown(10000);
process.exit(0);
} catch (error) {
console.error("Shutdown failed:", error.message);
process.exit(1);
}
});
// Handle SIGINT (Ctrl+C)
process.on("SIGINT", async () => {
console.log("📡 Received SIGINT, initiating graceful shutdown...");
try {
await pipeline.shutdown(5000);
process.exit(0);
} catch (error) {
console.error("Shutdown failed:", error.message);
process.exit(1);
}
});Shutdown Events
pipeline.onEvent((event) => {
switch (event.type) {
case "SHUTDOWN_START":
console.log("🔄 Shutdown initiated");
break;
case "SHUTDOWN_COMPLETE":
console.log("✅ Shutdown completed successfully");
break;
case "SHUTDOWN_TIMEOUT":
console.log("⏰ Shutdown timeout reached");
break;
case "SHUTDOWN_ERROR":
console.log("❌ Shutdown error:", event.error?.message);
break;
}
});For detailed information about graceful shutdown, see GRACEFUL_SHUTDOWN.md.
Configuration
PipelineConfig
interface PipelineConfig<TStep extends string> {
steps: StepConfig<TStep>[];
options?: StepOptions;
}StepConfig
interface StepConfig<TStep extends string> {
name: TStep;
handler: StepHandler;
errorHandlers?: {
onError?: (error: Error, context: ErrorContext) => Promise<ErrorAction>;
onRetry?: (context: ErrorContext) => Promise<void>;
onContinue?: (context: ErrorContext) => Promise<void>;
onStop?: (context: ErrorContext) => Promise<void>;
};
options?: StepOptions;
}StepOptions
interface StepOptions {
maxConcurrentWorkers?: number; // Default: 10
retryStrategy?: {
maxRetries: number; // Default: 3
backoffMs: number; // Default: 1000
};
workerTimeout?: number; // Worker timeout
transpileAlways?: boolean; // Always transpile workers
}Error Actions
The library supports multiple error handling strategies:
RETRY: Retry the current step with configurable attemptsCONTINUE: Skip to a specified next stepSTOP: Halt pipeline executionCUSTOM: Implement custom error handling logic
How It Works Under the Hood
The pipeline is designed to process data through a series of steps, where each step can be executed independently and concurrently. Here's a breakdown of how it works:
Independent Worker Services: Each step in the pipeline has its own
WorkerServiceinstance. This allows each step to manage its own concurrency and execution independently.Concurrent Processing: Within each step, multiple workers can operate concurrently, controlled by a semaphore that limits the number of active workers. This ensures that heavy processing tasks do not overwhelm the system.
Parallel Step Execution: While each step's workers operate concurrently, the steps themselves are processed in parallel. This means that as soon as one step completes, the next step can begin processing, allowing for a smooth and efficient data flow through the pipeline.
Event Propagation: The pipeline uses an event-based monitoring system to track the execution of each step. Events are emitted at the start and end of each step, allowing for detailed monitoring and logging of the pipeline's execution.
Error Handling and Retries: Each step can be configured with custom error handling strategies, including retries with exponential backoff. This ensures that transient errors do not halt the pipeline's execution.
Type Safety: The entire pipeline is built with TypeScript, providing type safety and ensuring that the data flowing through the pipeline is correctly typed and validated.
This architecture allows for a highly scalable and efficient data processing pipeline, capable of handling complex workflows with ease.
Contributing
Contributions are welcome! Please read our Contributing Guide before submitting a pull request.
License
MIT
Support
For support, please open an issue in the GitHub repository or contact the maintainers.
