@wpkernel/pipeline
v1.1.0
Published
Framework-agnostic pipeline orchestration primitives for WPKernel
Readme
@wpkernel/pipeline
A type-safe, dependency-aware workflow engine for orchestrating complex generation tasks.
Overview
@wpkernel/pipeline is a generic orchestration engine that turns sets of decoupled "helpers" into deterministic, topologically sorted execution plans.
While it powers WPKernel's code generation (assembling fragments into artifacts), the core is completely agnostic. You can use it to build:
- ETL Pipelines: Extract, Transform, and Load stages with shared state.
- Build Systems: Compile, Bundle, and Minify steps with precise ordering.
- Code Generators: The standard "Fragment → Builder" pattern.
It guarantees:
- Deterministic Ordering: Topologically sorts helpers based on
dependsOn. - Cycle Detection: Fails fast (halts execution) if dependencies form a loop.
- Robust Rollbacks: Extensions and helpers provide best-effort rollback hooks run LIFO, attempting all cleanup steps and reporting any rollback failures.
- Type Safety: Full TypeScript support for custom contexts, options, and artifacts.
Architecture Note
The package exports a single entry point @wpkernel/pipeline which provides the "Standard Pipeline" (Fragments & Builders). This is the recommended API for most consumers.
Under the hood, the package is split into:
- Standard Pipeline (
src/standard-pipeline): The opinionated implementation used by WPKernel. - Core Runner (
src/core/runner): A purely agnostic DAG execution engine.
Subpath imports (e.g., @wpkernel/pipeline/core) are available if you need to build a completely custom pipeline architecture using the runner primitives directly.
Installation
pnpm add @wpkernel/pipelineThe package ships pure TypeScript and has no runtime dependencies.
Usage
Standard Pipeline (Recommended)
Use createPipeline for the standard Fragment → Builder workflow used by WPKernel.
import { createPipeline } from '@wpkernel/pipeline';
const pipeline = createPipeline({
// Configuration
createContext: (ops) => ({ db: ops.db }),
createBuildOptions: () => ({}),
createFragmentState: () => ({}),
// Argument resolvers
createFragmentArgs: ({ context }) => ({ db: context.db }),
createBuilderArgs: ({ artifact }) => ({ artifact }),
});Custom Pipeline (Advanced)
For completely custom architectures (ETL, migrations, etc.), use makePipeline to define your own stages.
import { makePipeline } from '@wpkernel/pipeline';
const pipeline = makePipeline({
// Define the "Stages" of your pipeline
helperKinds: ['extract', 'transform', 'load'] as const,
createStages: (deps) => [
deps.makeLifecycleStage('extract'),
deps.makeLifecycleStage('transform'),
deps.makeLifecycleStage('load'),
deps.commitStage,
deps.finalizeResult,
],
createContext: (ops) => ({ db: ops.db }),
// ... logic for resolving args for your helpers ...
});2. Register Helpers
Helpers are the atomic units of work. They can be anything - functions, objects, or complex services.
// "Extract" helper
pipeline.use({
kind: 'extract',
key: 'users',
apply: async ({ context }) => {
return context.db.query('SELECT * FROM users');
},
});
// "Transform" helper (depends on generic extract logic)
pipeline.use({
kind: 'transform',
key: 'clean-users',
dependsOn: ['users'],
apply: ({ input }) => {
return input.map((u) => ({ ...u, name: u.name.trim() }));
},
});3. Run It
The pipeline resolves the graph, executes the content, and manages the lifecycle.
const result = await pipeline.run({ db: myDatabase });Concepts
Agnostic Helper Kinds
You are not limited to fixed roles. Define any kind of helper (e.g., 'validator', 'compiler', 'notifier') and map them to execution stages.
Dependency Graph
Pipeline creates a dependency graph for each kind of helper. If Helper B depends on Helper A, the runner ensures A executes before B (and passes A's output to B if configured).
Extensions & Lifecycles
Extensions wrap execution with hooks at specific lifecycle stages.
Standard Pipeline Lifecycles:
prepare → before-fragments → after-fragments → before-builders → after-builders → finalize
Note: Custom pipelines (using
makePipeline) can define arbitrary lifecycle stages. Extensions can hook into any stage, standard or custom, as long as it exists in the pipeline's execution plan.
Validation: The pipeline validates extension registrations. If an extension attempts to hook into an unscheduled lifecycle, the pipeline will log a warning instead of silently ignoring it.
Extension Registration (Sync & Async): extensions.use() returns MaybePromise<unknown>. It returns a Promise only if the extension's register method is asynchronous.
// Sync registration (e.g. simple helper bundles)
extensions.use(mySyncExtension);
// Async registration (e.g. database connections)
await extensions.use(myAsyncExtension);Recommendation: We recommend
awaiting registration when possible for consistency, but you may omit it if you are certain the extension initializes synchronously.pipeline.run()will automatically wait for any pending async registrations.
Rollbacks
The pipeline supports robust rollback for both helper application and extension lifecycle commit phases:
- Extensions: Can provide transactional overhead via the
commitphase. If extensive failure occurs,rollbackhooks are triggered. - Helpers: Can return a
rollbackfunction in their result. These are executed LIFO if a later failure occurs. - Robustness: The rollback stack continues execution even if individual rollback actions fail (errors are collected and reported).
Re-run Semantics
Diagnostics are per-run. Calling pipeline.run() automatically clears any previous runtime diagnostics to ensure a fresh state. Static diagnostics (e.g., registration conflicts) are preserved and re-emitted for each run.
Documentation
- Architecture Guide: Deep dive into the runner's internals and DAG resolution.
- API Reference: Generated TSDoc for all interfaces.
License
EUPL-1.2 © The Geekist
