norkostrat
v2.2.0
Published
Build Your Own Pipeline
Readme
norkostrat
A generic, pluggable multi-node pipeline system built on event-driven architecture. Define nodes as plain objects, wire them with topics, and run declarative pipelines. No Redis. No Docker. No external message broker.
Getting Started
npm install norkostratimport { PipelineRunner, createBus, createStore, type Pipeline, type Node } from "norkostrat";
// 1. Define your nodes (topic-agnostic)
const uppercaseNode: Node<string, string> = {
name: "uppercase",
async process(input) {
return { patch: { result: input.toUpperCase() } };
},
};
const logNode: Node<Record<string, unknown>, void> = {
name: "log",
async process(content) {
console.log("Result:", (content as any).result);
return {};
},
};
// 2. Define your pipeline — topics are wired here
const pipeline: Pipeline = {
name: "example",
startTopic: "input",
seedFn: (id, input) => ({ id, input }),
steps: [
{ node: uppercaseNode, inputTopic: "input", outputTopic: "uppercase.done" },
{ node: logNode, inputTopic: "uppercase.done", outputTopic: "pipeline.done" },
],
};
// 3. Run it
const bus = createBus();
const store = createStore();
const runner = new PipelineRunner(pipeline, bus, store);
await runner.start();
await runner.submitJob("hello world");How It Works
Each pipeline is a chain of nodes connected by topics. Data flows through the system via an event bus:
input → uppercaseNode → uppercase.done → logNode → pipeline.doneNodes process data, update the shared store, and publish results to the next topic — forming a directed acyclic graph.
Fluent Pipeline Builder
For more complex pipelines, use the fluent builder to wire steps together cleanly:
import { pipeline, createBus, createStore, PipelineRunner } from "norkostrat";
const myPipeline = pipeline("my-pipeline")
.step("input", uppercaseNode, "uppercase.done")
.step("uppercase.done", logNode, "log.done")
.withConfig({ maxRetries: 3 })
.build((id, input) => ({ id, input }));
const runner = new PipelineRunner(myPipeline, createBus(), createStore());
await runner.start();
await runner.submitJob("hello world");Advanced Features
Dynamic Routing
Nodes can dynamically choose the next topic at runtime by returning nextTopic. This overrides the outputTopic defined in the pipeline step.
const decisionNode: Node = {
name: "decision",
async process(input) {
const isReady = input.status === "ready";
return {
nextTopic: isReady ? "process.start" : "process.wait"
};
}
};Multi-Topic Nodes
A node can listen to multiple topics by declaring inputTopics. This is useful for nodes that act as "collectors" or "join" points in a graph.
const collectNode: Node = {
name: "collector",
inputTopics: ["stepA.done", "stepB.done"],
async process(input, ctx) {
console.log(`Received from: ${ctx.incomingTopic}`);
return { patch: { [ctx.incomingTopic]: "received" } };
}
};Resuming Jobs
You can resume a job or bypass the seedFn by passing seedData directly to submitJob.
// Resume a job with existing state
await runner.submitJob("some.topic", {
id: "existing-uuid",
status: "restarting",
previousData: { ... }
});Node Contract
Nodes are plain objects. No inheritance is required.
interface Node<TInput = unknown, TOutput = unknown> {
name: string;
/** Optional: listen to multiple topics */
inputTopics?: string[];
/** Core logic: input is the store record, returns patch/nextTopic */
process(input: TInput, ctx: NodeContext): Promise<NodeResult<TOutput>>;
}
interface NodeContext {
store: KVStore; // Synchronous KV store
bus: EventBus; // Async event bus
jobId: string; // Unique ID for this execution
incomingTopic: string; // Topic that triggered this node
attempt: number; // Current retry attempt (1-indexed)
record: Record<string, unknown>; // Full pre-fetched store record
}
interface NodeResult<T = unknown> {
/** Data to merge into the store (shallow merge) */
patch?: Record<string, unknown>;
/** Optional: override the pipeline's outputTopic */
nextTopic?: string;
/** Optional: internal result data */
data?: T;
}Configuration
Pipeline config is defined per-pipeline. The runner falls back to global defaults when not specified:
const myPipeline = pipeline("my-pipeline")
.withConfig({
maxRetries: 3,
retryBaseMs: 500,
outputDir: "./results",
pipelineTimeoutMs: 30 * 60 * 1000,
})
.build(seedFn);| Property | Default | Description |
|----------|---------|-------------|
| maxRetries | 2 | Max retry attempts per node step |
| retryBaseMs | 800 | Base delay for exponential backoff (ms) |
| outputDir | "./output" | Output directory for results |
| pipelineTimeoutMs | 1200000 | Pipeline timeout (20 min) |
API Reference
| Export | Type | Description |
|--------|------|-------------|
| pipeline(name) | function | Create a fluent PipelineBuilder |
| createBus() | function | Create an in-memory event bus |
| createStore() | function | Create an in-memory key-value store |
| PipelineRunner | class | Wire a pipeline definition to bus/store |
| Pipeline | interface | Declarative pipeline definition |
| Node | interface | Node contract (name, process) |
| NodeContext | interface | Context passed to node.process() |
| NodeResult | interface | Return value from node.process() |
| config | object | Global fallback defaults |
License
MIT
