npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

@backendkit-labs/pipeline

v0.3.2

Published

Type-safe async pipeline — Chain of Responsibility with stop-on-first / collect-all modes, conditional steps, observability hooks, and optional NestJS integration

Downloads

839

Readme

@backendkit-labs/pipeline

npm version CI License Node Docs

Type-safe async pipeline for Node.js — Chain of Responsibility pattern with stop-on-first / collect-all modes, conditional steps, observability hooks, and optional NestJS integration.

Each step in the pipeline receives the current context, transforms it, and returns a typed result. If a step fails, the pipeline can stop immediately or continue collecting all errors — your choice per pipeline.


Minimal Example

Self-contained runnable example — no NestJS, one file, realistic scenario.

git clone https://github.com/BackendKit-labs/backendkit-monorepo.git
cd backendkit-monorepo/examples/minimal-pipeline
npm install && npm start

Shows a 3-step order pipeline (validate → charge → ship) in stop-on-first mode. Runs three scenarios: happy path, validation failure (payment never called), and flaky payment. → full source


Installation

npm install @backendkit-labs/pipeline

NestJS peer dependencies (only for the /nestjs subpath):

npm install @nestjs/common @nestjs/core rxjs

TypeScript Configuration

Subpath exports (/nestjs)

This package uses the exports field in package.json to expose the /nestjs subpath. TypeScript's ability to resolve it depends on the moduleResolution setting in your tsconfig.json.

Modern resolution (recommended) — no extra config needed:

// tsconfig.json
{
  "compilerOptions": {
    "moduleResolution": "bundler"
  }
}

"bundler", "node16", and "nodenext" all understand the exports field natively. This is the recommended setting for any project using a bundler (Webpack, esbuild, Vite) or for NestJS projects on TypeScript ≥ 5.

Legacy resolution ("node") — add paths aliases:

NestJS projects generated before ~2024 default to "moduleResolution": "node", which ignores the exports field entirely. TypeScript won't find the types for @backendkit-labs/pipeline/nestjs unless you add explicit path aliases:

// tsconfig.json
{
  "compilerOptions": {
    "moduleResolution": "node",
    "paths": {
      "@backendkit-labs/pipeline/nestjs": [
        "./node_modules/@backendkit-labs/pipeline/dist/nestjs/index"
      ]
    }
  }
}

Why does this happen? The "node" resolver was designed before subpath exports existed. It only knows how to find main and types at the root of a package — it does not read the exports map. The paths alias manually points TypeScript to the right .d.ts file for the subpath.

The splitting: true tsup option (which this package uses) and this paths configuration solve completely different problems. splitting fixes a runtime class identity issue — ensuring there is only one copy of a class in memory across both bundles. The paths alias fixes a compile-time issue — helping TypeScript find the types. Both may be needed in a legacy project.


NestJS decorator support

NestJS requires two compiler options to be enabled:

// tsconfig.json
{
  "compilerOptions": {
    "experimentalDecorators": true,
    "emitDecoratorMetadata": true
  }
}

And reflect-metadata must be imported once at application startup, before any NestJS module is loaded:

// main.ts
import 'reflect-metadata';
import { NestFactory } from '@nestjs/core';
import { AppModule } from './app.module';

async function bootstrap() {
  const app = await NestFactory.create(AppModule);
  await app.listen(3000);
}
bootstrap();

NestJS CLI scaffolds both of these automatically. You only need to check this if you are setting up a project manually or if decorator-related DI errors appear at runtime.


Quick Start — Framework-agnostic

import { pipeline, Ok, Err } from '@backendkit-labs/pipeline';
import type { PipelineStep, StepResult } from '@backendkit-labs/pipeline';

interface OrderCtx {
  productId: string;
  quantity:  number;
  stock:     number;
  price:     number;
  total:     number;
}

interface OrderError {
  code:    string;
  message: string;
}

class StockStep implements PipelineStep<OrderCtx, OrderError> {
  async handle(ctx: OrderCtx): Promise<StepResult<OrderCtx, OrderError>> {
    if (ctx.stock < ctx.quantity) {
      return Err({ code: 'INSUFFICIENT_STOCK', message: 'Not enough stock' });
    }
    return Ok(ctx);
  }
}

class PricingStep implements PipelineStep<OrderCtx, OrderError> {
  async handle(ctx: OrderCtx): Promise<StepResult<OrderCtx, OrderError>> {
    return Ok({ ...ctx, total: ctx.price * ctx.quantity });
  }
}

// Build and run
const result = await pipeline<OrderCtx, OrderError>()
  .pipe(new StockStep())
  .pipe(new PricingStep())
  .run({ productId: 'p1', quantity: 2, stock: 10, price: 50, total: 0 });

if (result.ok) {
  console.log(result.value.total);        // 100
  console.log(result.executedSteps);      // ['StockStep', 'PricingStep']
} else {
  console.log(result.error.failedStep);   // 'StockStep'
  console.log(result.error.cause);        // { code: 'INSUFFICIENT_STOCK', ... }
}

Quick Start — NestJS

// order.pipeline.ts
import { definePipeline } from '@backendkit-labs/pipeline';
import type { OrderCtx, OrderError } from './order.types';

export const ORDER_PIPELINE = definePipeline<OrderCtx, OrderError>('order');
// app.module.ts
import { Module } from '@nestjs/common';
import { PipelineModule } from '@backendkit-labs/pipeline/nestjs';
import { ORDER_PIPELINE } from './order.pipeline';
import { StockStep, PricingStep, NotifyStep } from './steps';

@Module({
  imports: [
    PipelineModule.forRoot({
      pipelines: [
        {
          token:   ORDER_PIPELINE,
          steps:   [StockStep, PricingStep, NotifyStep],
          options: {
            onError: (step, err) => logger.error(`Pipeline failed at ${step}`, err),
          },
        },
      ],
    }),
  ],
})
export class AppModule {}
// order.service.ts
import { Injectable } from '@nestjs/common';
import { InjectPipeline } from '@backendkit-labs/pipeline/nestjs';
import { Pipeline } from '@backendkit-labs/pipeline';
import { ORDER_PIPELINE } from './order.pipeline';
import type { OrderCtx, OrderError } from './order.types';

@Injectable()
export class OrderService {
  constructor(
    @InjectPipeline(ORDER_PIPELINE)
    private readonly pipeline: Pipeline<OrderCtx, OrderError>,
  ) {}

  async processOrder(ctx: OrderCtx) {
    return this.pipeline.run(ctx);
  }
}

API

pipeline(options?)

Creates a new pipeline builder. Throws TypeError at run() time if mode is not 'stop-on-first' or 'collect-all'.

const p = pipeline<TContext, TError>(options?);

Options

pipeline<Ctx, Err>({
  // 'stop-on-first' — stop and return on the first failure (default)
  // 'collect-all'   — run all steps, accumulate every failure
  mode: 'stop-on-first',

  onStep(stepName, ctx) {
    logger.debug(`[pipeline] → ${stepName}`);
  },

  onStepComplete(stepName, ctx, durationMs) {
    metrics.timing(`step.${stepName}`, durationMs);
  },

  onError(stepName, error) {
    logger.error(`[pipeline] ✗ ${stepName}`, error);
  },

  onComplete(ctx, durationMs, metadata) {
    metrics.timing('pipeline.total', durationMs);
    // metadata: { executedSteps: string[], failures: PipelineStepFailure<Err>[] }
  },
});

.pipe(step)

Adds a step that always runs.

p.pipe(new StockStep())
 .pipe(new PricingStep());

.pipeIf(condition, step)

Adds a step that runs only when condition(ctx) returns true. The condition receives the context after all previous steps have transformed it. If the condition throws, it is treated as a step failure (same behaviour as the step itself failing).

p.pipe(new BaseStep())
 .pipeIf(ctx => ctx.hasDiscount, new DiscountStep())
 .pipe(new FinalStep());

.run(ctx)

Executes the pipeline and returns a PipelineRunResult.

const result = await p.run(initialCtx);

// Success
result.ok            // true
result.value         // final context
result.executedSteps // ['StockStep', 'PricingStep']
result.durationMs    // total duration

// Failure
result.ok                    // false
result.error.failedStep      // 'StockStep'
result.error.cause           // original typed error
result.error.executedSteps   // steps that ran before the failure
result.error.durationMs      // total duration
result.error.failures        // all failures — one entry for stop-on-first, N for collect-all
result.error.mode            // 'stop-on-first' | 'collect-all'

Ok(value) / Err(error)

Helpers for returning step results.

import { Ok, Err } from '@backendkit-labs/pipeline';

async handle(ctx): Promise<StepResult<Ctx, Err>> {
  if (!valid) return Err({ code: 'INVALID' });
  return Ok({ ...ctx, validated: true });
}

PipelineStep<TContext, TError>

Interface your step classes implement.

import type { PipelineStep, StepResult } from '@backendkit-labs/pipeline';

class MyStep implements PipelineStep<Ctx, MyError> {
  // Optional — overrides constructor.name in error reports and hook calls
  readonly stepName = 'MyStep';

  async handle(ctx: Ctx): Promise<StepResult<Ctx, MyError>> {
    // ...
  }
}

Error Modes

stop-on-first (default)

Stops at the first failure. Use when later steps depend on earlier ones being successful.

pipeline({ mode: 'stop-on-first' })
  .pipe(new AuthStep())      // if this fails → stop, PaymentStep never runs
  .pipe(new PaymentStep())
  .run(ctx);

collect-all

Runs every step regardless of failures. Use when steps are independent and you want to report all errors at once — e.g., form validation.

pipeline({ mode: 'collect-all' })
  .pipe(new ValidateNameStep())
  .pipe(new ValidateEmailStep())
  .pipe(new ValidatePhoneStep())
  .run(formData);

// result.error.failures → [{ step: 'ValidateEmailStep', cause: ... }, { step: 'ValidatePhoneStep', cause: ... }]

NestJS Integration

definePipeline<TContext, TError>(name)

Creates a typed injection token. Define it once and share across module and service.

export const ORDER_PIPELINE = definePipeline<OrderCtx, OrderError>('order');
// PipelineToken<OrderCtx, OrderError>

PipelineModule.forRoot(options)

Registers pipelines globally. Each step class is resolved via NestJS DI, so steps can inject other services.

PipelineModule.forRoot({
  pipelines: [
    {
      token:   ORDER_PIPELINE,
      steps:   [StockStep, PricingStep, NotifyStep],  // resolved via DI
      options: { mode: 'stop-on-first', onError: ... },
    },
  ],
})

@InjectPipeline(token)

Parameter decorator for injecting a pipeline into a service.

constructor(
  @InjectPipeline(ORDER_PIPELINE)
  private readonly orderPipeline: Pipeline<OrderCtx, OrderError>,
) {}

Use Cases

| Scenario | Mode | |---|---| | Order processing (stock → payment → notify) | stop-on-first | | Form / DTO validation (collect all field errors) | collect-all | | User onboarding (KYC → plan → welcome email) | stop-on-first | | File processing (validate → scan → compress → upload) | stop-on-first | | Webhook processing (verify signature → parse → deduplicate → route) | stop-on-first | | Pricing pipeline (base → volume discount → tax → currency) | stop-on-first |


Design Notes

Context is immutable by convention

Each step returns a new context object rather than mutating the existing one. This makes each step's input/output explicit and easy to trace in logs.

// Do this
return Ok({ ...ctx, total: ctx.price * ctx.quantity });

// Not this
ctx.total = ctx.price * ctx.quantity;
return Ok(ctx);

Steps are plain classes

Steps don't extend a base class or require special decorators. They just implement PipelineStep<TContext, TError>. This makes them easy to test in isolation:

const result = await new StockStep().handle({ stock: 0, quantity: 5, ... });
expect(result.ok).toBe(false);

NestJS DI class identity

PipelineModule.forRoot() resolves step classes via NestJS DI and wires them into the pipeline at startup. All steps share the same DI context — no class identity issues.


License

Apache-2.0 — BackendKit Labs