npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

typed-pipeline

v2.3.0

Published

Type-safe TypeScript pipelines with inferred steps, saveAs snapshots, retries, parallel fan-out, and async timeouts.

Downloads

683

Readme

typed-pipeline

npm npm downloads license

Type-safe, composable pipelines for TypeScript with inferred step parameters, saved intermediate results, concurrency helpers, and async execution.

Install

npm install typed-pipeline

Quick Start

完整可运行示例,展示 PipelinesaveAs()parallel()AsyncPipeline

import { AsyncPipeline, Pipeline } from 'typed-pipeline'

async function main() {
  const pipeline = new Pipeline<number>()
    .pipe(n => n * 2)
    .saveAs('doubled')
    .pipe((current, saved) => current + saved.doubled)
    .parallel(
      total => total,
      total => `total=${total}`,
      async total => ({ total, even: total % 2 === 0 }),
    )

  const result = await pipeline.run(5)
  const doubled = pipeline.getResult('doubled')

  console.log(result)
  console.log(doubled)

  const asyncResult = await new AsyncPipeline('typed-pipeline')
    .pipe(async text => text.toUpperCase())
    .pipe(text => `${text}!`)
    .timeout(1000)
    .value()

  console.log(asyncResult)
}

void main()

输出:

[20, 'total=20', { total: 20, even: true }]
10
'TYPED-PIPELINE!'

Core Concepts

Plain step

.pipe(n => n * 2)

$$-aware step

第一参数接收上一步输出,额外参数必须有默认值,这样运行时仍然可以按一元函数调用。

.pipe(($$, multiplier = 3) => $$ * multiplier)

Saved-aware step

第二参数接收 saveAs() 保存的结果对象。

new Pipeline<number>()
  .pipe(n => n * 2).saveAs('doubled')
  .pipe(n => n + 1).saveAs('incremented')
  .pipe((current, saved) => current + saved.doubled + saved.incremented)

运行时分派规则基于 fn.length

| Step kind | fn.length | Runtime call | |-----------|-------------|--------------| | Plain step | 1 | fn(current) | | $$-aware step | 1 | fn(current) | | Saved-aware step | 2 | fn(current, savedMap) |

API Reference

Pipeline

class Pipeline<
  TInput,
  TOutput = TInput,
  TSaved extends Record<string, unknown> = Record<never, never>,
> {
  constructor(
    jobs?: Array<Job<any, any>>,
    results?: Map<string, unknown>,
    extraResults?: Map<string, unknown>[],
  )

  static zip<TInput, const TPipelines extends readonly Pipeline<TInput, any, any>[]>(
    ...pipelines: TPipelines
  ): Pipeline<
    TInput,
    { [K in keyof TPipelines]: TPipelines[K] extends Pipeline<any, infer TOut, any> ? TOut : never },
    UnionToIntersection<
      TPipelines[number] extends Pipeline<any, any, infer TSaved> ? TSaved : never
    > & Record<string, unknown>
  >

  pipe<TNext>(step: (input: TOutput) => TNext | Promise<TNext>): Pipeline<TInput, Awaited<TNext>, TSaved>
  pipe<TNext>(step: ($$: Prev<TOutput>, ...rest: [first?: unknown, ...rest: (unknown | undefined)[]]) => TNext | Promise<TNext>): Pipeline<TInput, Awaited<TNext>, TSaved>
  pipe<TNext>(step: (current: TOutput, saved: TSaved) => TNext | Promise<TNext>): Pipeline<TInput, Awaited<TNext>, TSaved>

  parallel<TSteps extends readonly ((input: TOutput) => any)[]>(
    ...steps: TSteps
  ): Pipeline<TInput, { [K in keyof TSteps]: Awaited<ReturnType<TSteps[K]>> }, TSaved>

  fork<TNext>(
    count: number,
    step: (value: TOutput, index: number) => TNext | Promise<TNext>,
  ): Pipeline<TInput, Awaited<TNext>[], TSaved>

  bypass(step: (value: TOutput) => unknown | Promise<unknown>): Pipeline<TInput, TOutput, TSaved>
  tap(step: (value: TOutput) => unknown | Promise<unknown>): Pipeline<TInput, TOutput, TSaved>

  saveAs<TKey extends string>(
    key: TKey,
    options?: { clone?: boolean },
  ): Pipeline<TInput, TOutput, TSaved & Record<TKey, TOutput>>

  getResult<TKey extends keyof TSaved>(key: TKey): TSaved[TKey] | undefined

  waited(): Pipeline<TInput, Awaited<TOutput>, TSaved>

  inject<TKey extends keyof TSaved, TNext>(
    key: TKey,
    fn: (current: TOutput, saved: TSaved[TKey]) => TNext | Promise<TNext>,
  ): Pipeline<TInput, Awaited<TNext>, TSaved>

  withSaved<TNext>(
    fn: (current: TOutput, saved: TSaved) => TNext | Promise<TNext>,
  ): Pipeline<TInput, Awaited<TNext>, TSaved>

  concat<TNext, TOtherSaved extends Record<string, unknown>>(
    other: Pipeline<TOutput, TNext, TOtherSaved>,
  ): Pipeline<TInput, TNext, TSaved & TOtherSaved>

  retry<TNext>(
    attempts: number,
    step: (input: TOutput) => TNext | Promise<TNext>,
  ): Pipeline<TInput, Awaited<TNext>, TSaved>
  retry<TNext>(
    attempts: number,
    step: ($$: Prev<TOutput>, ...rest: [first?: unknown, ...rest: (unknown | undefined)[]]) => TNext | Promise<TNext>,
  ): Pipeline<TInput, Awaited<TNext>, TSaved>
  retry<TNext>(
    attempts: number,
    step: (current: TOutput, saved: TSaved) => TNext | Promise<TNext>,
  ): Pipeline<TInput, Awaited<TNext>, TSaved>

  run(input: TInput): Promise<TOutput>
  [Symbol.dispose](): void
}

方法说明:

| Method | Purpose | |--------|---------| | pipe() | 添加一个同步或异步转换步骤 | | parallel() | 并发执行多个步骤,输出强类型 tuple | | fork() | 将当前值复制到 n 个分支,并收集数组结果 | | bypass() / tap() | 执行副作用但不改变当前值 | | saveAs() | 保存当前值,供后续步骤或执行完成后读取 | | getResult() | 读取 saveAs() 保存的值 | | waited() | 将 Promise<Promise<T>> 扁平化为 Promise<T> | | inject() | 注入单个已保存值 | | withSaved() | 注入所有已保存值 | | concat() | 连接两个兼容的 Pipeline | | retry() | 对单个步骤做重试 | | run() | 执行 pipeline | | Pipeline.zip() | 多个 pipeline 共享同一输入并组合结果 | | [Symbol.dispose]() | 清空已保存结果,支持 TS 5.2 using |

AsyncPipeline

class AsyncPipeline<T> {
  constructor(input: T, steps?: Array<(input: any) => any>, timeoutMs?: number)

  pipe<U>(fn: (input: T) => U | Promise<U>): AsyncPipeline<U>

  parallel<Fns extends readonly ((input: T) => any)[]>(
    fns: [...Fns],
  ): AsyncPipeline<{ [K in keyof Fns]: Awaited<ReturnType<Fns[K]>> }>

  timeout(ms: number): AsyncPipeline<T>

  value(): Promise<T>
}

Utility Types

type Prev<T> = T & { readonly __fpipe_prev__: unique symbol }
type PipelineInput<P> = P extends Pipeline<infer In, any, any> ? In : never
type PipelineOutput<P> = P extends Pipeline<any, infer Out, any> ? Out : never
type PipelineSaved<P> = P extends Pipeline<any, any, infer Saved> ? Saved : never

Functional API

declare function fpipe<
  const Steps extends readonly AnyStep[],
  Seed extends StepInput<Steps[0]>,
>(
  ...steps: Steps
): (input: NoInfer<Seed>) => Promise<PipelineOutput<Steps, Seed>>

示例:

import { fpipe } from 'typed-pipeline'

const transform = fpipe(
  (n: number) => n * 2,
  n => `value:${n}`,
)

const result = await transform(5)

Examples

Pipeline.fork(n, fn)

fork() 会把当前值复制到多个分支,并把每个分支的结果收集成数组。fn 的第二个参数是分支索引。

import { Pipeline } from 'typed-pipeline'

const pipeline = new Pipeline<number>()
  .pipe(n => n * 10)
  .fork(4, async (value, index) => ({
    index,
    result: value + index,
  }))

const result = await pipeline.run(3)
// [
//   { index: 0, result: 30 },
//   { index: 1, result: 31 },
//   { index: 2, result: 32 },
//   { index: 3, result: 33 },
// ]

Pipeline.zip(...pipelines)

Pipeline.zip() 让多个 pipeline 共享同一个输入值执行,输出为 tuple,并且会保留各自 saveAs() 的结果。

import { Pipeline } from 'typed-pipeline'

const numeric = new Pipeline<number>()
  .pipe(n => n * 2)
  .saveAs('doubled')

const textual = new Pipeline<number>()
  .pipe(n => `n=${n}`)
  .saveAs('label')

const zipped = Pipeline.zip(numeric, textual)

const result = await zipped.run(7)
const doubled = zipped.getResult('doubled')
const label = zipped.getResult('label')

console.log(result)   // [14, 'n=7']
console.log(doubled)  // 14
console.log(label)    // 'n=7'

Pipeline.retry(n, fn)

retry(attempts, fn) 中的 attempts 表示额外重试次数,不是总次数。retry(2, fn) 最多会执行 3 次。

import { Pipeline } from 'typed-pipeline'

let calls = 0

const pipeline = new Pipeline<number>()
  .retry(2, async value => {
    calls += 1
    if (calls < 3) {
      throw new Error('transient')
    }
    return value * 100
  })

const result = await pipeline.run(5)

console.log(result) // 500
console.log(calls)  // 3

也支持 saved-aware step:

const pipeline = new Pipeline<number>()
  .pipe(n => n + 1)
  .saveAs('base')
  .retry(1, (current, saved) => current + saved.base)

AsyncPipeline.timeout(ms)

timeout(ms) 为整个 AsyncPipeline.value() 执行设置超时,不是单个步骤的独立超时。

import { AsyncPipeline } from 'typed-pipeline'

const fast = new AsyncPipeline(2)
  .pipe(async value => {
    await new Promise(resolve => setTimeout(resolve, 20))
    return value * 3
  })
  .timeout(100)

console.log(await fast.value()) // 6

const slow = new AsyncPipeline(2)
  .pipe(async value => {
    await new Promise(resolve => setTimeout(resolve, 50))
    return value * 3
  })
  .timeout(10)

await slow.value()
// throws Error('Pipeline timed out after 10ms')

Pipeline.parallel(...steps)

Pipeline.parallel() 使用 rest arguments,不是数组。它会把同一个当前值发送到多个步骤,并返回 tuple 结果。

import { Pipeline } from 'typed-pipeline'

const pipeline = new Pipeline<number>()
  .pipe(n => n + 1)
  .parallel(
    n => n * 2,
    async n => `value=${n}`,
    n => n % 2 === 0,
  )

const result = await pipeline.run(4)
// [10, 'value=5', false]

如果你想传数组,使用的是 AsyncPipeline.parallel([...])

const result = await new AsyncPipeline(4)
  .parallel([
    n => n + 1,
    async n => n * 2,
  ])
  .value()
// [5, 8]

saveAs() Semantics

saveAs() 只会订阅“最近一个步骤”的输出,并把该输出存到 pipeline 内部结果表里。

关键语义:

| Behavior | Details | |----------|---------| | Save point | .pipe(...).saveAs('x') 保存的是前一个 step 的输出,不是之后步骤的输出 | | Read timing | 运行中可供后续 pipe((current, saved) => ...)inject()withSaved() 使用;运行后可通过 getResult() 读取 | | Default storage | 默认保存的是同一个引用,适合原始值和不可变数据 | | Clone mode | saveAs('x', { clone: true }) 使用 structuredClone();若环境不支持则回退到 JSON.parse(JSON.stringify(...)) | | Scope | concat()Pipeline.zip() 会把多个 pipeline 的 saved results 合并到 getResult() 查找范围里 | | Prerequisite | 必须先有至少一个 step,直接在空 pipeline 上调用 saveAs() 会抛错 | | Disposal | 调用 [Symbol.dispose]() 会清空当前 pipeline 自身的结果表 |

引用语义示例:

const pipeline = new Pipeline<number[]>()
  .pipe(items => [...items, 1])
  .saveAs('snapshot')
  .pipe(items => {
    items.push(2)
    return items
  })

await pipeline.run([])
console.log(pipeline.getResult('snapshot'))
// 默认会看到同一个数组引用,结果是 [1, 2]

快照语义示例:

const pipeline = new Pipeline<number[]>()
  .pipe(items => [...items, 1])
  .saveAs('snapshot', { clone: true })
  .pipe(items => {
    items.push(2)
    return items
  })

await pipeline.run([])
console.log(pipeline.getResult('snapshot'))
// [1]

Comparison

| Capability | typed-pipeline | RxJS | Promise.all | |------------|------------------|------|---------------| | Core model | Value-by-value typed transformation chain | Push-based stream abstraction | One-shot concurrency primitive | | Type inference across steps | Strong and direct | Usually good, but depends on operator chains | N/A across sequential transforms | | Saved intermediate values | Built-in via saveAs() | Usually external state or custom operators | No | | Per-step composition | pipe(), concat(), inject(), withSaved() | Large operator ecosystem | Manual nesting | | Fan-out concurrency | parallel(), fork(), zip() | forkJoin, combineLatest, mergeMap, etc. | Native strength | | Retry single step | Built-in retry() | Via operators such as retry() | Manual wrapper logic | | Timeout whole async run | Built-in AsyncPipeline.timeout() | Via operators and schedulers | Manual Promise.race() | | Learning curve | Low | Higher | Very low | | Best fit | Typed business/data pipelines in app code | Event streams, reactive UI, complex async flows | Parallel execution of known promises |

经验上:

| Use this when... | Choose | |------------------|--------| | 你要的是一步一步的类型安全转换,并且想保留中间结果 | typed-pipeline | | 你处理的是持续事件流、取消、背压、热冷流 | RxJS | | 你只是要并发等待几个 promise | Promise.all |

License

MIT