npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2025 – Pkg Stats / Ryan Hefner

pipeline-core

v1.4.0

Published

A TypeScript library for managing complex data processing pipelines with advanced error handling and parallel processing capabilities

Downloads

51

Readme

Pipeline Core

npm version License: MIT TypeScript Node.js CI

A TypeScript library for managing complex data processing pipelines with advanced error handling, retry mechanisms, and parallel processing capabilities.

Features

  • 🔄 Asynchronous step processing
  • 🔁 Advanced retry mechanisms with exponential backoff
  • ⚡ Parallel processing with concurrency control
  • 🛡️ Comprehensive error handling with custom strategies
  • 🔍 Step-by-step execution tracking
  • 📊 Event-based monitoring
  • 🎯 Type-safe with TypeScript
  • 🔒 Infinite loop prevention
  • 👥 Worker thread support for heavy processing
  • 📈 Detailed performance monitoring
  • 🔄 Per-step customizable retry strategies
  • 🛑 Graceful shutdown with timeout control
  • 📡 System signal integration (SIGTERM/SIGINT)
  • 🔄 Controlled resource cleanup

Installation

npm install pipeline-core

Basic Usage

import {
  PipelineService,
  PipelineConfig,
  ErrorActionType,
} from "pipeline-core";

// Define pipeline steps
type PipelineSteps = "step1" | "step2" | "step3";

// Configure the pipeline
const config: PipelineConfig<PipelineSteps> = {
  steps: [
    {
      name: "step1",
      handler: async (data) => {
        // Process data
        return { ...data, processed: true };
      },
      errorHandlers: {
        onError: async (error, context) => ({
          type: ErrorActionType.RETRY,
          maxRetries: 3,
        }),
      },
    },
    // Add more steps...
  ],
  options: {
    maxConcurrentWorkers: 5,
    retryStrategy: {
      maxRetries: 3,
      backoffMs: 1000,
    },
  },
};

// Create pipeline instance
const pipeline = new PipelineService<PipelineSteps, YourDataType>(config);

// Execute pipeline
const result = await pipeline.execute({
  data: yourData,
  currentStep: "step1",
});

Advanced Features

Error Handling

The library provides sophisticated error handling capabilities:

{
  errorHandlers: {
    onError: async (error, context) => ({
      type: ErrorActionType.RETRY, // or CONTINUE, STOP, CUSTOM
      maxRetries: 3
    }),
    onRetry: async (context) => {
      // Custom retry logic
    },
    onContinue: async (context) => {
      // Custom continue logic
    },
    onStop: async (context) => {
      // Custom stop logic
    }
  }
}

Event Monitoring

Monitor pipeline execution with event listeners:

pipeline.onEvent((event) => {
  switch (event.type) {
    case "ERROR":
      console.error(`Error in step ${event.step}:`, event.error);
      break;
    // Handle other event types...
  }
});

Parallel Processing

Process multiple items in parallel with controlled concurrency:

const results = await pipeline.execute([
  { data: item1, currentStep: "step1" },
  { data: item2, currentStep: "step1" },
]);

Worker Threads

Execute heavy processing in separate threads:

const config: PipelineConfig<"step1"> = {
  steps: [
    {
      name: "step1",
      handler: "./worker.js", // Path to worker file
    },
  ],
};

Handling External Dependencies in Workers

When using function handlers in your pipeline steps, it's important to note that the worker_threads module requires all code to be serialized into a file before execution. This means that any external dependencies need to be explicitly required within the handler function:

const config: PipelineConfig<"step1"> = {
  steps: [
    {
      name: "step1",
      handler: async (data) => {
        // External dependencies must be required inside the handler
        const axios = require("axios");

        const response = await axios.get("https://api.example.com/data");
        return { ...data, apiData: response.data };
      },
    },
  ],
};

This approach is necessary because:

  1. The worker thread runs in an isolated context
  2. The handler function is serialized into a temporary file
  3. Dependencies need to be explicitly loaded within the worker's context
  4. The worker needs to have access to all required modules at runtime

Handling Internal Module Imports

When importing internal modules (files from your project), you need to handle the default exports correctly. Here's an example:

// requester.ts
import axios from "axios";

const requester = async () => {
  const response = await axios.get("https://api.example.com/data");
  return response;
};

export default requester;

// pipeline.ts
const config: PipelineConfig<"step1"> = {
  steps: [
    {
      name: "step1",
      handler: async (data) => {
        // Access the default export using .default
        const requester = require("./requester").default;
        const response = await requester();
        return { ...data, apiData: response.data };
      },
    },
  ],
};

Note that when using require() with ES modules that have default exports, you need to access the default property to get the exported function or value.

Graceful Shutdown

The pipeline supports graceful shutdown with timeout control and system signal integration:

// Basic shutdown with default timeout (30 seconds)
await pipeline.shutdown();

// Shutdown with custom timeout
await pipeline.shutdown(10000); // 10 seconds

// Shutdown with callbacks
await pipeline.shutdown({
  timeout: 15000,
  onShutdownStart: () => console.log("🔄 Shutdown started..."),
  onShutdownComplete: () => console.log("✅ Shutdown completed"),
  onTimeout: () => console.log("⏰ Timeout reached, forcing shutdown"),
});

// Check pipeline state
if (pipeline.isShuttingDown()) {
  console.log("Pipeline is shutting down...");
}

// Wait for all executions to complete
await pipeline.waitForCompletion();

System Signal Integration

// Handle SIGTERM (graceful shutdown)
process.on("SIGTERM", async () => {
  console.log("📡 Received SIGTERM, initiating graceful shutdown...");
  try {
    await pipeline.shutdown(10000);
    process.exit(0);
  } catch (error) {
    console.error("Shutdown failed:", error.message);
    process.exit(1);
  }
});

// Handle SIGINT (Ctrl+C)
process.on("SIGINT", async () => {
  console.log("📡 Received SIGINT, initiating graceful shutdown...");
  try {
    await pipeline.shutdown(5000);
    process.exit(0);
  } catch (error) {
    console.error("Shutdown failed:", error.message);
    process.exit(1);
  }
});

Shutdown Events

pipeline.onEvent((event) => {
  switch (event.type) {
    case "SHUTDOWN_START":
      console.log("🔄 Shutdown initiated");
      break;
    case "SHUTDOWN_COMPLETE":
      console.log("✅ Shutdown completed successfully");
      break;
    case "SHUTDOWN_TIMEOUT":
      console.log("⏰ Shutdown timeout reached");
      break;
    case "SHUTDOWN_ERROR":
      console.log("❌ Shutdown error:", event.error?.message);
      break;
  }
});

For detailed information about graceful shutdown, see GRACEFUL_SHUTDOWN.md.

Configuration

PipelineConfig

interface PipelineConfig<TStep extends string> {
  steps: StepConfig<TStep>[];
  options?: StepOptions;
}

StepConfig

interface StepConfig<TStep extends string> {
  name: TStep;
  handler: StepHandler;
  errorHandlers?: {
    onError?: (error: Error, context: ErrorContext) => Promise<ErrorAction>;
    onRetry?: (context: ErrorContext) => Promise<void>;
    onContinue?: (context: ErrorContext) => Promise<void>;
    onStop?: (context: ErrorContext) => Promise<void>;
  };
  options?: StepOptions;
}

StepOptions

interface StepOptions {
  maxConcurrentWorkers?: number; // Default: 10
  retryStrategy?: {
    maxRetries: number; // Default: 3
    backoffMs: number; // Default: 1000
  };
  workerTimeout?: number; // Worker timeout
  transpileAlways?: boolean; // Always transpile workers
}

Error Actions

The library supports multiple error handling strategies:

  • RETRY: Retry the current step with configurable attempts
  • CONTINUE: Skip to a specified next step
  • STOP: Halt pipeline execution
  • CUSTOM: Implement custom error handling logic

How It Works Under the Hood

The pipeline is designed to process data through a series of steps, where each step can be executed independently and concurrently. Here's a breakdown of how it works:

  • Independent Worker Services: Each step in the pipeline has its own WorkerService instance. This allows each step to manage its own concurrency and execution independently.

  • Concurrent Processing: Within each step, multiple workers can operate concurrently, controlled by a semaphore that limits the number of active workers. This ensures that heavy processing tasks do not overwhelm the system.

  • Parallel Step Execution: While each step's workers operate concurrently, the steps themselves are processed in parallel. This means that as soon as one step completes, the next step can begin processing, allowing for a smooth and efficient data flow through the pipeline.

  • Event Propagation: The pipeline uses an event-based monitoring system to track the execution of each step. Events are emitted at the start and end of each step, allowing for detailed monitoring and logging of the pipeline's execution.

  • Error Handling and Retries: Each step can be configured with custom error handling strategies, including retries with exponential backoff. This ensures that transient errors do not halt the pipeline's execution.

  • Type Safety: The entire pipeline is built with TypeScript, providing type safety and ensuring that the data flowing through the pipeline is correctly typed and validated.

This architecture allows for a highly scalable and efficient data processing pipeline, capable of handling complex workflows with ease.

Contributing

Contributions are welcome! Please read our Contributing Guide before submitting a pull request.

License

MIT

Support

For support, please open an issue in the GitHub repository or contact the maintainers.