@nullplatform/workflow-engine
v0.0.6
Published
SDK for building workflow execution engines. Provides the contracts, utilities, and testing tools needed to integrate any durable execution backend (Temporal, Restate, etc.) into the nullplatform workflow system.
Readme
@nullplatform/workflow-engine
SDK for building workflow execution engines. Provides the contracts, utilities, and testing tools needed to integrate any durable execution backend (Temporal, Restate, etc.) into the nullplatform workflow system.
Architecture
An engine bridges the nullplatform API with a durable execution backend. It has two sides:
API layer Engine Backend
┌─────────────┐ WorkflowEngine ┌──────────────┐ ┌──────────┐
│ REST API │◄────────────────────►│ YourEngine │─────────────►│ Temporal │
│ │ start, signal, │ │ native SDK │ Restate │
│ │ resolveHumanTask │ │ │ etc. │
└─────────────┘ └──────┬───────┘ └──────────┘
│
DurableRuntime
PolicyAdapter
│
┌──────▼───────┐
│ Workflow code │
│ (interpreter │
│ or code-first)│
└──────────────┘You implement three interfaces:
| Interface | Purpose | Where it runs |
|---|---|---|
| WorkflowEngine | API-facing: start executions, send signals, resolve human tasks | API process |
| DurableRuntime | Execution-facing: the 4 primitives that workflow code runs on | Worker/sandbox |
| PolicyAdapter | Maps retry/timeout policies to your backend's native options | Worker/sandbox |
WorkflowEngine
The API layer calls this interface to manage workflow lifecycle.
import type { WorkflowEngine, StartExecutionParams } from '@nullplatform/workflow-engine';
export class MyEngine implements WorkflowEngine {
readonly type = 'my-engine';
async init(): Promise<void> {
// Connect to your backend (TCP, HTTP, gRPC...)
}
async startExecution(params: StartExecutionParams): Promise<{ refs: Record<string, unknown> }> {
// params contains:
// executionId — unique execution ID
// workflowId — workflow definition ID
// graph — SerializableWorkflowGraph (the full DAG)
// input — workflow input data
// callbackBaseUrl — URL for ExecutionObserver to report back
// Start the workflow in your backend and return tracking refs
const result = await this.client.start(params);
return { refs: { runId: result.runId } };
}
async waitForCompletion(executionId: string, timeoutMs: number): Promise<{
completed: boolean;
result?: unknown;
}> {
// Block until the execution finishes or timeout elapses
}
async sendSignal(executionId: string, signalName: string, ...args: unknown[]): Promise<void> {
// Deliver a named signal (cancel, pause, resume) to a running execution
}
async resolveHumanTask(executionId: string, nodeKey: string, resolution: {
status: string;
output?: unknown;
error?: unknown;
}): Promise<void> {
// Unblock a node waiting for human input
}
async close(): Promise<void> {
// Disconnect from backend
}
}Optional: Scheduler
If your backend supports cron-like scheduling, implement createScheduler:
createScheduler(workflowId: string, workflowVersionId: string): Scheduler {
return {
async create(options: ScheduleOptions): Promise<ScheduleHandle> { ... },
getHandle(scheduleId: string): ScheduleHandle { ... },
};
}DurableRuntime
The 4 primitives that workflow code executes on. Every operation must survive process restarts.
import type { DurableRuntime, DurableRunOptions, DurablePromiseHandle } from '@nullplatform/workflow-engine';
export class MyDurableRuntime implements DurableRuntime {
readonly executionId: string;
constructor(executionId: string) {
this.executionId = executionId;
}
async run<T>(key: string, fn: () => T | Promise<T>, options?: DurableRunOptions): Promise<T> {
// Execute fn durably — memoized on replay.
// The key uniquely identifies this invocation within the execution.
// On replay, return the previously recorded result without re-executing fn.
// options.engineOptions contains your backend's native run options
// (set by your PolicyAdapter).
}
async sleep(ms: number): Promise<void> {
// Durable timer — must survive process restarts.
// A 24-hour sleep should resume after 24 hours even if the process
// was restarted 10 times in between.
}
promise<T>(name: string): DurablePromiseHandle<T> {
// Return a handle for a durable promise identified by name.
// Used for human-in-the-loop patterns and external signals.
return {
get: async (timeoutMs?: number): Promise<T> => {
// Block until the promise is resolved, or throw on timeout.
},
resolve: async (value: T): Promise<void> => {
// Resolve the promise, unblocking any pending get() call.
},
};
}
}PolicyAdapter
Converts the platform's resolved retry/timeout policies into your backend's native options format.
import type { PolicyAdapter, ResolvedPolicies, DurableRunOptions } from '@nullplatform/workflow-engine';
export const myPolicyAdapter: PolicyAdapter = {
toRunOptions(policies: ResolvedPolicies): DurableRunOptions {
// policies.retry — RetryPolicy (maxAttempts, strategy, intervals, etc.)
// policies.timeout — TimeoutPolicy (perAttemptSeconds, totalSeconds, etc.)
return {
engineOptions: {
// Your backend's native retry/timeout config
maxRetries: policies.retry.maxAttempts,
timeout: `${policies.timeout.perAttemptSeconds}s`,
},
};
},
};The SDK provides a 4-level policy cascade via resolvePolicies():
Platform defaults → Spec defaults → Workflow defaults → Node overridesimport { resolvePolicies } from '@nullplatform/workflow-engine';
const resolved = resolvePolicies({
specDefaultRetryPolicy: spec.definition.defaultRetryPolicy,
specDefaultTimeoutPolicy: spec.definition.defaultTimeoutPolicy,
workflowDefaultRetryPolicy: workflow.defaultRetryPolicy,
workflowDefaultTimeoutPolicy: workflow.defaultTimeoutPolicy,
nodeRetryPolicy: node.retryPolicy,
nodeTimeoutPolicy: node.timeoutPolicy,
});
// resolved.retry — merged RetryPolicy
// resolved.timeout — merged TimeoutPolicyUtilities
SpecRegistry
Holds node specifications keyed by name. Engines use this to look up the spec for each graph node.
import { SpecRegistry } from '@nullplatform/workflow-engine';
const registry = new SpecRegistry();
registry.register(mySpec); // registers under spec.definition.name
const spec = registry.get('set'); // throws if not found
const exists = registry.has('set'); // boolean checkExpression Resolution
Evaluates {{ }} templates in node input using upstream outputs.
import { resolveExpressions, buildExpressionContext } from '@nullplatform/workflow-engine';
const context = buildExpressionContext(
completedNodeOutputs, // Map<string, unknown>
workflowInput,
{ id: executionId, error: executionError },
itemIndex, // for-each loop index
);
const resolved = resolveExpressions(rawNodeInput, context);
// {{ $node("fetch").output.data }} → actual value
// {{ $input.name }} → workflow input fieldExecutionContext Factory
Builds the context object passed to spec.execute(). Includes logger, HTTP client, abort signal, and sentinel waitForHuman().
import { createExecutionContext, WaitForHumanError } from '@nullplatform/workflow-engine';
const ctx = createExecutionContext({
executionId: 'exec-123',
nodeKey: 'fetch-data',
workflowId: 'wf-1',
});
// Pass to spec execution:
const result = await spec.execute(input, ctx);Specs that need human input call ctx.waitForHuman(), which throws WaitForHumanError. Your interpreter catches this and blocks on a durable promise.
ExecutionObserver
Reports execution lifecycle back to the API server (node started, completed, failed).
import { HttpExecutionObserver } from '@nullplatform/workflow-engine';
// Remote mode — engine runs as a separate process
const callback = new HttpExecutionObserver('http://localhost:3000');
await callback.createNodeExecution(executionId, {
nodeKey: 'fetch-data',
specName: 'http-request',
status: 'running',
startedAt: new Date().toISOString(),
});Error Utilities
import { WorkflowNodeError, extractNodeError } from '@nullplatform/workflow-engine';
// Wrap a typed error
throw new WorkflowNodeError({ code: 'TIMEOUT', message: 'Node timed out' });
// Extract the deepest structured error from a cause chain
const nodeError = extractNodeError(caughtError);Testing
The SDK provides InMemoryRuntime and testPolicyAdapter for testing workflows without any infrastructure.
import { InMemoryRuntime, testPolicyAdapter } from '@nullplatform/workflow-engine';
const runtime = new InMemoryRuntime('test-exec-1');
// run() memoizes — second call with same key returns cached result
const result = await runtime.run('step-1', () => computeValue());
// sleep() is instant (no-op)
await runtime.sleep(60_000); // returns immediately
// promise() works with real Promises
const handle = runtime.promise<string>('human:approval');
// Resolve from test code (simulates human task completion)
runtime.resolvePromise('human:approval', { status: 'completed', output: { approved: true } });
const value = await handle.get();
// testPolicyAdapter returns empty options (no engine-specific config)
const options = testPolicyAdapter.toRunOptions(resolvedPolicies); // {}Use with WorkflowContext (workflow-sdk)
import { InMemoryRuntime, testPolicyAdapter, SpecRegistry } from '@nullplatform/workflow-engine';
import { WorkflowContext, executeWorkflow } from '@nullplatform/workflow';
const runtime = new InMemoryRuntime('test-1');
const registry = new SpecRegistry();
registry.register(mySpec);
const result = await executeWorkflow(myWorkflow, {
runtime,
policyAdapter: testPolicyAdapter,
registry,
}, { name: 'test' });Putting It All Together
A minimal engine implementation has this structure:
packages/engine-mine/
├── src/
│ ├── mine-engine.ts # WorkflowEngine implementation
│ ├── mine-runtime.ts # DurableRuntime implementation
│ ├── mine-policies.ts # PolicyAdapter + native options mapping
│ ├── interpreter.ts # DAG interpreter (uses SpecRegistry, expressions, etc.)
│ └── index.ts # Public exports
├── package.json
└── tsconfig.jsonYour package.json depends on:
{
"dependencies": {
"@nullplatform/workflow-engine": "1.0.0",
"your-backend-sdk": "..."
}
}The interpreter (graph executor) typically:
- Receives a
SerializableWorkflowGraphwith nodes and edges - Topologically walks the graph, dispatching nodes when dependencies are satisfied
- For each node: resolves expressions via
resolveExpressions(), loads the spec fromSpecRegistry, resolves policies viaresolvePolicies(), builds anExecutionContext, and callsspec.execute(input, ctx) - Handles special node types:
durableSleepnodes callruntime.sleep(),humanTasknodes block onruntime.promise() - Reports progress via
ExecutionObserver(createNodeExecution, updateNodeExecution, updateExecution) - Returns an
InterpreterResultwith output, status, and optional error
