typed-pipeline
v2.3.0
Published
Type-safe TypeScript pipelines with inferred steps, saveAs snapshots, retries, parallel fan-out, and async timeouts.
Downloads
683
Maintainers
Readme
typed-pipeline
Type-safe, composable pipelines for TypeScript with inferred step parameters, saved intermediate results, concurrency helpers, and async execution.
Install
npm install typed-pipelineQuick Start
完整可运行示例,展示 Pipeline、saveAs()、parallel()、AsyncPipeline:
import { AsyncPipeline, Pipeline } from 'typed-pipeline'
async function main() {
const pipeline = new Pipeline<number>()
.pipe(n => n * 2)
.saveAs('doubled')
.pipe((current, saved) => current + saved.doubled)
.parallel(
total => total,
total => `total=${total}`,
async total => ({ total, even: total % 2 === 0 }),
)
const result = await pipeline.run(5)
const doubled = pipeline.getResult('doubled')
console.log(result)
console.log(doubled)
const asyncResult = await new AsyncPipeline('typed-pipeline')
.pipe(async text => text.toUpperCase())
.pipe(text => `${text}!`)
.timeout(1000)
.value()
console.log(asyncResult)
}
void main()输出:
[20, 'total=20', { total: 20, even: true }]
10
'TYPED-PIPELINE!'Core Concepts
Plain step
.pipe(n => n * 2)$$-aware step
第一参数接收上一步输出,额外参数必须有默认值,这样运行时仍然可以按一元函数调用。
.pipe(($$, multiplier = 3) => $$ * multiplier)Saved-aware step
第二参数接收 saveAs() 保存的结果对象。
new Pipeline<number>()
.pipe(n => n * 2).saveAs('doubled')
.pipe(n => n + 1).saveAs('incremented')
.pipe((current, saved) => current + saved.doubled + saved.incremented)运行时分派规则基于 fn.length:
| Step kind | fn.length | Runtime call |
|-----------|-------------|--------------|
| Plain step | 1 | fn(current) |
| $$-aware step | 1 | fn(current) |
| Saved-aware step | 2 | fn(current, savedMap) |
API Reference
Pipeline
class Pipeline<
TInput,
TOutput = TInput,
TSaved extends Record<string, unknown> = Record<never, never>,
> {
constructor(
jobs?: Array<Job<any, any>>,
results?: Map<string, unknown>,
extraResults?: Map<string, unknown>[],
)
static zip<TInput, const TPipelines extends readonly Pipeline<TInput, any, any>[]>(
...pipelines: TPipelines
): Pipeline<
TInput,
{ [K in keyof TPipelines]: TPipelines[K] extends Pipeline<any, infer TOut, any> ? TOut : never },
UnionToIntersection<
TPipelines[number] extends Pipeline<any, any, infer TSaved> ? TSaved : never
> & Record<string, unknown>
>
pipe<TNext>(step: (input: TOutput) => TNext | Promise<TNext>): Pipeline<TInput, Awaited<TNext>, TSaved>
pipe<TNext>(step: ($$: Prev<TOutput>, ...rest: [first?: unknown, ...rest: (unknown | undefined)[]]) => TNext | Promise<TNext>): Pipeline<TInput, Awaited<TNext>, TSaved>
pipe<TNext>(step: (current: TOutput, saved: TSaved) => TNext | Promise<TNext>): Pipeline<TInput, Awaited<TNext>, TSaved>
parallel<TSteps extends readonly ((input: TOutput) => any)[]>(
...steps: TSteps
): Pipeline<TInput, { [K in keyof TSteps]: Awaited<ReturnType<TSteps[K]>> }, TSaved>
fork<TNext>(
count: number,
step: (value: TOutput, index: number) => TNext | Promise<TNext>,
): Pipeline<TInput, Awaited<TNext>[], TSaved>
bypass(step: (value: TOutput) => unknown | Promise<unknown>): Pipeline<TInput, TOutput, TSaved>
tap(step: (value: TOutput) => unknown | Promise<unknown>): Pipeline<TInput, TOutput, TSaved>
saveAs<TKey extends string>(
key: TKey,
options?: { clone?: boolean },
): Pipeline<TInput, TOutput, TSaved & Record<TKey, TOutput>>
getResult<TKey extends keyof TSaved>(key: TKey): TSaved[TKey] | undefined
waited(): Pipeline<TInput, Awaited<TOutput>, TSaved>
inject<TKey extends keyof TSaved, TNext>(
key: TKey,
fn: (current: TOutput, saved: TSaved[TKey]) => TNext | Promise<TNext>,
): Pipeline<TInput, Awaited<TNext>, TSaved>
withSaved<TNext>(
fn: (current: TOutput, saved: TSaved) => TNext | Promise<TNext>,
): Pipeline<TInput, Awaited<TNext>, TSaved>
concat<TNext, TOtherSaved extends Record<string, unknown>>(
other: Pipeline<TOutput, TNext, TOtherSaved>,
): Pipeline<TInput, TNext, TSaved & TOtherSaved>
retry<TNext>(
attempts: number,
step: (input: TOutput) => TNext | Promise<TNext>,
): Pipeline<TInput, Awaited<TNext>, TSaved>
retry<TNext>(
attempts: number,
step: ($$: Prev<TOutput>, ...rest: [first?: unknown, ...rest: (unknown | undefined)[]]) => TNext | Promise<TNext>,
): Pipeline<TInput, Awaited<TNext>, TSaved>
retry<TNext>(
attempts: number,
step: (current: TOutput, saved: TSaved) => TNext | Promise<TNext>,
): Pipeline<TInput, Awaited<TNext>, TSaved>
run(input: TInput): Promise<TOutput>
[Symbol.dispose](): void
}方法说明:
| Method | Purpose |
|--------|---------|
| pipe() | 添加一个同步或异步转换步骤 |
| parallel() | 并发执行多个步骤,输出强类型 tuple |
| fork() | 将当前值复制到 n 个分支,并收集数组结果 |
| bypass() / tap() | 执行副作用但不改变当前值 |
| saveAs() | 保存当前值,供后续步骤或执行完成后读取 |
| getResult() | 读取 saveAs() 保存的值 |
| waited() | 将 Promise<Promise<T>> 扁平化为 Promise<T> |
| inject() | 注入单个已保存值 |
| withSaved() | 注入所有已保存值 |
| concat() | 连接两个兼容的 Pipeline |
| retry() | 对单个步骤做重试 |
| run() | 执行 pipeline |
| Pipeline.zip() | 多个 pipeline 共享同一输入并组合结果 |
| [Symbol.dispose]() | 清空已保存结果,支持 TS 5.2 using |
AsyncPipeline
class AsyncPipeline<T> {
constructor(input: T, steps?: Array<(input: any) => any>, timeoutMs?: number)
pipe<U>(fn: (input: T) => U | Promise<U>): AsyncPipeline<U>
parallel<Fns extends readonly ((input: T) => any)[]>(
fns: [...Fns],
): AsyncPipeline<{ [K in keyof Fns]: Awaited<ReturnType<Fns[K]>> }>
timeout(ms: number): AsyncPipeline<T>
value(): Promise<T>
}Utility Types
type Prev<T> = T & { readonly __fpipe_prev__: unique symbol }
type PipelineInput<P> = P extends Pipeline<infer In, any, any> ? In : never
type PipelineOutput<P> = P extends Pipeline<any, infer Out, any> ? Out : never
type PipelineSaved<P> = P extends Pipeline<any, any, infer Saved> ? Saved : neverFunctional API
declare function fpipe<
const Steps extends readonly AnyStep[],
Seed extends StepInput<Steps[0]>,
>(
...steps: Steps
): (input: NoInfer<Seed>) => Promise<PipelineOutput<Steps, Seed>>示例:
import { fpipe } from 'typed-pipeline'
const transform = fpipe(
(n: number) => n * 2,
n => `value:${n}`,
)
const result = await transform(5)Examples
Pipeline.fork(n, fn)
fork() 会把当前值复制到多个分支,并把每个分支的结果收集成数组。fn 的第二个参数是分支索引。
import { Pipeline } from 'typed-pipeline'
const pipeline = new Pipeline<number>()
.pipe(n => n * 10)
.fork(4, async (value, index) => ({
index,
result: value + index,
}))
const result = await pipeline.run(3)
// [
// { index: 0, result: 30 },
// { index: 1, result: 31 },
// { index: 2, result: 32 },
// { index: 3, result: 33 },
// ]Pipeline.zip(...pipelines)
Pipeline.zip() 让多个 pipeline 共享同一个输入值执行,输出为 tuple,并且会保留各自 saveAs() 的结果。
import { Pipeline } from 'typed-pipeline'
const numeric = new Pipeline<number>()
.pipe(n => n * 2)
.saveAs('doubled')
const textual = new Pipeline<number>()
.pipe(n => `n=${n}`)
.saveAs('label')
const zipped = Pipeline.zip(numeric, textual)
const result = await zipped.run(7)
const doubled = zipped.getResult('doubled')
const label = zipped.getResult('label')
console.log(result) // [14, 'n=7']
console.log(doubled) // 14
console.log(label) // 'n=7'Pipeline.retry(n, fn)
retry(attempts, fn) 中的 attempts 表示额外重试次数,不是总次数。retry(2, fn) 最多会执行 3 次。
import { Pipeline } from 'typed-pipeline'
let calls = 0
const pipeline = new Pipeline<number>()
.retry(2, async value => {
calls += 1
if (calls < 3) {
throw new Error('transient')
}
return value * 100
})
const result = await pipeline.run(5)
console.log(result) // 500
console.log(calls) // 3也支持 saved-aware step:
const pipeline = new Pipeline<number>()
.pipe(n => n + 1)
.saveAs('base')
.retry(1, (current, saved) => current + saved.base)AsyncPipeline.timeout(ms)
timeout(ms) 为整个 AsyncPipeline.value() 执行设置超时,不是单个步骤的独立超时。
import { AsyncPipeline } from 'typed-pipeline'
const fast = new AsyncPipeline(2)
.pipe(async value => {
await new Promise(resolve => setTimeout(resolve, 20))
return value * 3
})
.timeout(100)
console.log(await fast.value()) // 6
const slow = new AsyncPipeline(2)
.pipe(async value => {
await new Promise(resolve => setTimeout(resolve, 50))
return value * 3
})
.timeout(10)
await slow.value()
// throws Error('Pipeline timed out after 10ms')Pipeline.parallel(...steps)
Pipeline.parallel() 使用 rest arguments,不是数组。它会把同一个当前值发送到多个步骤,并返回 tuple 结果。
import { Pipeline } from 'typed-pipeline'
const pipeline = new Pipeline<number>()
.pipe(n => n + 1)
.parallel(
n => n * 2,
async n => `value=${n}`,
n => n % 2 === 0,
)
const result = await pipeline.run(4)
// [10, 'value=5', false]如果你想传数组,使用的是 AsyncPipeline.parallel([...]):
const result = await new AsyncPipeline(4)
.parallel([
n => n + 1,
async n => n * 2,
])
.value()
// [5, 8]saveAs() Semantics
saveAs() 只会订阅“最近一个步骤”的输出,并把该输出存到 pipeline 内部结果表里。
关键语义:
| Behavior | Details |
|----------|---------|
| Save point | .pipe(...).saveAs('x') 保存的是前一个 step 的输出,不是之后步骤的输出 |
| Read timing | 运行中可供后续 pipe((current, saved) => ...)、inject()、withSaved() 使用;运行后可通过 getResult() 读取 |
| Default storage | 默认保存的是同一个引用,适合原始值和不可变数据 |
| Clone mode | saveAs('x', { clone: true }) 使用 structuredClone();若环境不支持则回退到 JSON.parse(JSON.stringify(...)) |
| Scope | concat() 和 Pipeline.zip() 会把多个 pipeline 的 saved results 合并到 getResult() 查找范围里 |
| Prerequisite | 必须先有至少一个 step,直接在空 pipeline 上调用 saveAs() 会抛错 |
| Disposal | 调用 [Symbol.dispose]() 会清空当前 pipeline 自身的结果表 |
引用语义示例:
const pipeline = new Pipeline<number[]>()
.pipe(items => [...items, 1])
.saveAs('snapshot')
.pipe(items => {
items.push(2)
return items
})
await pipeline.run([])
console.log(pipeline.getResult('snapshot'))
// 默认会看到同一个数组引用,结果是 [1, 2]快照语义示例:
const pipeline = new Pipeline<number[]>()
.pipe(items => [...items, 1])
.saveAs('snapshot', { clone: true })
.pipe(items => {
items.push(2)
return items
})
await pipeline.run([])
console.log(pipeline.getResult('snapshot'))
// [1]Comparison
| Capability | typed-pipeline | RxJS | Promise.all |
|------------|------------------|------|---------------|
| Core model | Value-by-value typed transformation chain | Push-based stream abstraction | One-shot concurrency primitive |
| Type inference across steps | Strong and direct | Usually good, but depends on operator chains | N/A across sequential transforms |
| Saved intermediate values | Built-in via saveAs() | Usually external state or custom operators | No |
| Per-step composition | pipe(), concat(), inject(), withSaved() | Large operator ecosystem | Manual nesting |
| Fan-out concurrency | parallel(), fork(), zip() | forkJoin, combineLatest, mergeMap, etc. | Native strength |
| Retry single step | Built-in retry() | Via operators such as retry() | Manual wrapper logic |
| Timeout whole async run | Built-in AsyncPipeline.timeout() | Via operators and schedulers | Manual Promise.race() |
| Learning curve | Low | Higher | Very low |
| Best fit | Typed business/data pipelines in app code | Event streams, reactive UI, complex async flows | Parallel execution of known promises |
经验上:
| Use this when... | Choose |
|------------------|--------|
| 你要的是一步一步的类型安全转换,并且想保留中间结果 | typed-pipeline |
| 你处理的是持续事件流、取消、背压、热冷流 | RxJS |
| 你只是要并发等待几个 promise | Promise.all |
License
MIT
