npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

derrops-lib

v1.2.1

Published

A type-safe library for building sequential pipelines with data enrichment, conditional execution, and analytics

Readme

@derrops-lib

npm version license

A type-safe TypeScript library for building sequential data-enrichment pipelines. Each step receives everything accumulated so far, adds its own output, and passes the merged result to the next step. Checks can be attached to any step to assert business-logic conditions — and each step's ContinuePolicy controls what happens when something goes wrong.

Features

  • Type-safe enrichment — each step's output is merged into the accumulated type; TypeScript tracks the shape automatically so every subsequent step and check is fully typed
  • Chainable checks — attach multiple named checks to any step via .check(name, fn); checks run after execute and produce a rich PASS | FAIL | ERROR | NONE | TERMINAL status
  • Terminal checks — a check can return terminal: true to force pipeline failure and skip all remaining checks, regardless of success criteria
  • ContinuePolicy per step — declare how the pipeline should handle each failure mode (error, failure, timeout) directly on the step; all default to STOP
  • Conditional executionshouldRun predicates skip a step; its checks are recorded as NONE
  • Full retry policy — configurable per step with attempt count, backoff strategies, per-mode retry conditions, delay caps, and pipeline restart support
  • Pipeline restartsretry.restartFromStep rewinds the pipeline to an earlier step when a step exhausts its retries
  • Success criteria — override the default "all steps must succeed" verdict with minStepsSuccessful, maxStepsUnsuccessful, or minSuccessRate
  • Lifecycle callbacksonSuccess / onFailure / onRetry hooks per step
  • Timing records — every step and the pipeline as a whole record startedAt, finishedAt, and duration
  • Analytics — pluggable observer for step and pipeline events
  • Immutable builder.step() and .check() return a new pipeline instance; original is unchanged

Install

npm install @derrops-lib
# or
yarn add @derrops-lib
# or
pnpm add @derrops-lib

Quick Start

import { createPipeline } from '@derrops-lib'

type UserInput = { userId: string }

const pipeline = createPipeline<UserInput>({ name: 'User Onboarding' })
  .step({
    name: 'Fetch User',
    execute: async (ctx) => ({ userName: 'Alice', email: '[email protected]' }),
  })
  .step({
    name: 'Load Preferences',
    execute: async (ctx) => ({ theme: 'dark' as const }),
  })

const result = await pipeline.execute({ userId: 'u-1' })

if (result.success) {
  console.log(result.data.userName) // 'Alice'
  console.log(result.data.theme) // 'dark'
}

API

createPipeline<TInitial>(config)

Creates a new pipeline. TInitial is the shape passed to .execute().

const pipeline = createPipeline<{ userId: string }>({
  name: 'My Pipeline',
  analytics: myAnalytics, // optional — see Analytics section

  // Optional: override the default "all steps must succeed" verdict
  successCriteria: {
    minStepsSuccessful: 2, // at least N non-skipped steps must succeed
    maxStepsUnsuccessful: 1, // at most N non-skipped steps may fail
    minSuccessRate: 0.8, // at least 80% of non-skipped steps must succeed
  },
})

All successCriteria fields are optional and can be combined. Skipped steps are excluded from all counts. TERMINAL checks always force failure regardless of criteria.


.step(config | fn)

Appends a step. Returns a new pipeline whose accumulated type includes this step's output.

pipeline.step({
  name: 'Step Name', // optional — defaults to 'Step 0', 'Step 1', …

  // Receives all accumulated data plus a read-only snapshot of previous step records
  execute: async (ctx, steps) => ({ result: 42 }),

  // Guard — return false to skip this step
  shouldRun: (ctx) => ctx.data.lintErrors === 0,

  onSuccess: (output, accumulated) => {
    /* called after execute succeeds, before checks */
  },
  onFailure: (error, accumulated) => {
    /* called after the last failed attempt */
  },
  onRetry: (error, attempt, delay, accumulated) => {
    /* called after each failed attempt, before the backoff delay and next attempt */
  },

  // Full retry policy — use instead of the deprecated `retries` shorthand
  retry: {
    maxAttempts: 4, // total attempts including the first
    backoff: { type: 'exponential', initialDelay: 250 }, // see Backoff Strategies below
    on: { onError: true, onTimeout: false }, // which failure modes are retried
    maxDelay: 10_000, // cap per-attempt delay
    maxTotalDelay: 30_000, // abort retry loop if cumulative delay would exceed this
    restartFromStep: 'Authenticate', // rewind pipeline to this step on final failure
    maxRestarts: 2, // max pipeline restarts (default: 1)
  },

  timeout: 5000, // ms limit per attempt

  // What to do when this step fails — all default to 'STOP'
  policy: {
    error: 'STOP', // execute threw a non-timeout exception
    failure: 'CONTINUE', // a check returned success: false
    timeout: 'STOP', // execute exceeded the timeout
  },
})

Bare function shorthand (equivalent to { execute: fn }):

pipeline.step(async (ctx) => ({ processed: true }))

Backoff Strategies

| Type | Behaviour | | ------------- | ---------------------------------------------------------------------------------------------- | | none | Retry immediately with no delay (default) | | fixed | Same delay ms before every attempt | | exponential | initialDelay * multiplier^N where N is the 0-based failure index; multiplier defaults to 2 | | steps | Explicit per-attempt delays; last value repeats once exhausted |

// Fixed delay
backoff: { type: 'fixed', delay: 500 }

// Exponential — 250 ms, 500 ms, 1000 ms, …
backoff: { type: 'exponential', initialDelay: 250 }

// Explicit cadence: 1 s, 5 s, 15 s, then 15 s for any further retries
backoff: { type: 'steps', delays: [1000, 5000, 15000] }

.check(fn) / .check(name, fn)

Attaches a check to the most recently added step. Multiple .check() calls stack onto the same step and run in order after execute succeeds.

pipeline
  .step({
    name: 'Validate',
    execute: async (ctx) => ({ score: computeScore(ctx) }),
    policy: { failure: 'STOP' }, // stop on any check failure (this is the default)
  })
  .check('Score positive', (ctx) => ({
    success: ctx.score > 0,
    message: `Score was ${ctx.score}`,
  }))
  .check('Score below limit', (ctx) => ({
    success: ctx.score < 1000,
    message: `Score ${ctx.score} exceeds limit`,
  }))

The check function receives two arguments:

  1. ctx — the fully enriched data object (initial input + all previous steps + this step's output)
  2. steps — a read-only array of StepRecord for every step that completed before the current one
.check('Only if fetched', (ctx, steps) => ({
  success: steps.find(s => s.name === 'Fetch User')?.skipped
    ? true
    : ctx.user != null,
}))

CheckFnResult fields:

| Field | Type | Description | | ---------- | --------- | --------------------------------------------------------------------------------------- | | success | boolean | Whether the assertion passed | | message | string? | Human-readable reason; used as the pipeline error message on halt | | terminal | true? | When set, stops all remaining checks and forces pipeline failure regardless of criteria |

All checks on a step always run to completion before the pipeline evaluates whether to halt — unless a check returns terminal: true, which short-circuits the remaining checks immediately.


.execute(initialInput)

Runs the pipeline and returns a PipelineResult.

const result = await pipeline.execute({ userId: 'u-1' })

// `data` is always present — even on failure
console.log(result.data)
console.log(result.timing.duration) // total ms

if (!result.success) {
  console.error(result.error.message)
  console.log(result.terminated) // true if a terminal check forced failure
}

console.log(result.restarts) // number of pipeline restarts triggered by restartFromStep

PipelineResult<TData>:

type PipelineResult<TData> =
  | {
      success: true
      data: TData
      steps: StepRecord[]
      timing: Timing
      restarts: number
    }
  | {
      success: false
      data: TData
      error: Error
      steps: StepRecord[]
      timing: Timing
      terminated: boolean // true when a TERMINAL check forced failure
      restarts: number
    }

data is present in both branches so callers can inspect the fully enriched context even when the pipeline fails — important for access-control and audit pipelines that need to log what was learned before denying a request.


Check Status

Every check produces a CheckResult with a status field:

| Status | Meaning | | ---------- | ------------------------------------------------------------------------------------------------------ | | PASS | Check ran and returned success: true | | FAIL | Check ran and returned success: false | | ERROR | Check function threw an unexpected error | | NONE | Check did not run — either the step was skipped via shouldRun, or an earlier check was TERMINAL | | TERMINAL | Check returned terminal: true — forces pipeline failure, remaining checks on this step become NONE |

Whether a FAIL or ERROR halts the pipeline is controlled by policy.failure and policy.error on the step — not by the check result itself. A TERMINAL check always halts the pipeline regardless of policy.

Inspect check outcomes via result.steps:

for (const step of result.steps) {
  console.log(step.name, step.skipped, step.executeFailed, step.succeeded)
  console.log(`duration: ${step.timing.duration}ms`)
  for (const attempt of step.attempts) {
    console.log(`attempt ${attempt.attempt}: timedOut=${attempt.timedOut}`)
  }
  for (const check of step.checks) {
    console.log(check.name, check.result.status, check.result.message)
  }
}

Step and Pipeline Records

StepRecord

Every step visited during a pipeline run produces a StepRecord in result.steps.

| Field | Type | Description | | --------------- | ----------------- | ------------------------------------------------------------------ | | name | string | Display name of the step | | skipped | boolean | true when shouldRun returned false | | executeFailed | boolean | true when execute threw (all retries exhausted) | | succeeded | boolean | true when the step ran, execute succeeded, and all checks passed | | attempts | AttemptRecord[] | One record per execute call made; empty for skipped steps | | checks | CheckRecord[] | Ordered list of check outcomes | | timing | Timing | Wall-clock metrics for the entire step (all attempts combined) |

AttemptRecord

Steps with a retry policy may make multiple execute calls. Each call produces one AttemptRecord.

| Field | Type | Description | | ---------- | --------- | --------------------------------------------------- | | attempt | number | 1-based attempt number | | error | Error? | The error thrown by this attempt; absent on success | | timedOut | boolean | true when this attempt was aborted by timeout | | timing | Timing | Wall-clock metrics for this single attempt |

Timing

| Field | Type | Description | | ------------ | -------- | -------------------------------------------- | | startedAt | number | Unix timestamp (ms) when execution began | | finishedAt | number | Unix timestamp (ms) when execution completed | | duration | number | Elapsed milliseconds |


Pipeline Success Rules

result.success is true only when the configured success criteria pass. By default (no successCriteria):

  • every execute completed without throwing, and
  • every check on every step returned PASS

With successCriteria, the threshold is relaxed — see createPipeline above. A TERMINAL check always forces success: false regardless of criteria.

A single FAIL or ERROR check anywhere sets success: false (unless overridden by successCriteria). The steps array always contains a complete record of what happened.


Data Enrichment

Each step receives the full accumulated object and adds new fields:

Initial:      { userId: 'u-1' }
              ↓ Fetch User
Accumulated:  { userId: 'u-1', userName: 'Alice', email: '[email protected]' }
              ↓ Load Preferences
Accumulated:  { userId: 'u-1', userName: 'Alice', email: '[email protected]', theme: 'dark' }

TypeScript tracks this at compile time — every step and check has full autocomplete and type checking.


Access Control Example

policy: { failure: 'CONTINUE' } lets the pipeline keep running after a check fails, collecting as much context as possible before a final denial decision:

import { createPipeline } from '@derrops-lib'

type AuthCtx = { authContext: { domain: string; ipAddress: string } }

const pipeline = createPipeline<AuthCtx>({ name: 'Auth Pipeline' })
  .step({
    name: 'IP Lookup',
    execute: async (ctx) => ({ ipCheck: await lookupIp(ctx.authContext.ipAddress) }),
    policy: { failure: 'CONTINUE' }, // keep going even if IP check fails
  })
  .check('IP not malicious', (ctx) => ({
    success: !ctx.ipCheck.isMalicious,
    message: `IP ${ctx.authContext.ipAddress} flagged as malicious`,
  }))

  .step({
    name: 'Resolve Tenant',
    execute: async (ctx) => ({ tenant: await findTenant(ctx.authContext.domain) }),
    policy: { failure: 'CONTINUE' }, // keep going even if tenant lookup fails
  })
  .check('Tenant exists', (ctx) => ({
    success: ctx.tenant !== undefined,
  }))
  .check('IP whitelisted', (ctx) => ({
    success: ctx.tenant?.allowedIps.includes(ctx.authContext.ipAddress) ?? false,
  }))

const result = await pipeline.execute({
  authContext: { domain: 'example.com', ipAddress: '1.2.3.4' },
})

// result.success → false if any check failed
// result.data    → fully enriched, always available for logging
// result.steps   → per-step check records with PASS/FAIL/ERROR/NONE status

Conditional Steps (shouldRun)

pipeline
  .step({
    name: 'Lint',
    execute: async (ctx) => ({ lintErrors: runLint(ctx.sourceDir) }),
  })
  .step({
    name: 'Compile',
    execute: async (ctx) => ({ compiledFiles: compile(ctx.sourceDir) }),
    shouldRun: (ctx) => ctx.data.lintErrors === 0, // skip if lint failed
  })

shouldRun receives a StepContext with a data field containing the current accumulated data. When it returns false, all checks attached to that step are recorded with status NONE.


Retries & Timeouts

Use the retry policy for full control over retry behaviour:

pipeline.step({
  name: 'Flaky API Call',
  execute: async (ctx) => fetchWithRetry(ctx.url),
  retry: {
    maxAttempts: 4, // 1 initial + 3 retries
    backoff: { type: 'exponential', initialDelay: 200 }, // 200 ms, 400 ms, 800 ms, …
    maxDelay: 5000, // cap each delay at 5 s
    on: { onError: true, onTimeout: true }, // retry both errors and timeouts
  },
  timeout: 2000, // each attempt must complete within 2 s
  policy: { timeout: 'CONTINUE' }, // keep going if it still times out after all retries
  onRetry: (err, attempt, delay, ctx) => logger.warn('Retrying', { attempt, delay }),
  onFailure: (err, ctx) => logger.error('All retries failed', { err, ctx }),
})

Timeouts throw a StepTimeoutError (a subclass of Error), which lets the policy distinguish between a timeout and a regular execute error.

Pipeline Restarts (restartFromStep)

When a step exhausts its retries, retry.restartFromStep rewinds the pipeline to an earlier step and re-runs from there — useful when a downstream failure means earlier work (e.g. token refresh) needs to be repeated.

import { createPipeline } from '@derrops-lib'

const pipeline = createPipeline<{ url: string }>({ name: 'Auth + Call' })
  .step({
    name: 'Authenticate',
    execute: async (ctx) => ({ token: await getToken() }),
  })
  .step({
    name: 'Call API',
    execute: async (ctx) => ({
      response: await fetch(ctx.url, { headers: { Authorization: ctx.token } }),
    }),
    retry: {
      maxAttempts: 3,
      backoff: { type: 'fixed', delay: 500 },
      restartFromStep: 'Authenticate', // rewind and re-authenticate on final failure
      maxRestarts: 2, // allow up to 2 full pipeline restarts
    },
    policy: { error: 'STOP' },
  })

restartFromStep accepts a step name (string) or a 0-based index (number). The target must be an earlier step. maxRestarts defaults to 1.


Analytics

import { AnalyticsCollector } from '@derrops-lib'

const analytics: AnalyticsCollector = {
  onStepStart: (name, input) => trace.start(name),
  onStepAttempt: (name, attempt, error, delay) =>
    logger.warn(`${name} attempt ${attempt} failed, retrying in ${delay}ms`),
  onStepComplete: (name, result, ms) => metrics.record(name, ms, result.success),
  onStepSkipped: (name, reason) => logger.info(`${name} skipped: ${reason}`),
  onPipelineRestart: (pipelineName, fromStep, restartNumber) =>
    logger.info(`Restarting from "${fromStep}" (restart #${restartNumber})`),
  onPipelineComplete: (name, totalMs) => metrics.pipelineDuration(name, totalMs),
  onPipelineError: (name, error) => logger.error(`${name} crashed`, error),
}

const pipeline = createPipeline<Input>({ name: 'My Pipeline', analytics })

Events:

| Event | When fired | | -------------------- | ----------------------------------------------------------------------- | | onStepStart | Immediately before a step's first execute call | | onStepAttempt | After each failed attempt, before the backoff delay (not on the last) | | onStepComplete | After execute completes (success or final failure) and after all checks | | onStepSkipped | When shouldRun returned false | | onPipelineRestart | When restartFromStep rewinds the pipeline to an earlier step | | onPipelineComplete | After all steps complete, before returning the result | | onPipelineError | When an unexpected error escapes the pipeline's own error handling |

License

MIT