@cook-step/stepflow-graph
v0.3.2
Published
StepFlow graph for ASL-like state machine workflows with scope validation
Maintainers
Readme
@cook-step/stepflow-graph
Graph implementation for ASL-like state machine workflows (AWS Step Functions style).
Overview
@cook-step/stepflow-graph extends @cook-step/graph with state machine semantics:
- Cycles allowed - State machines can loop
- Rules-based validation - Based on ASL state types
- startAt concept - Entry point for execution
- Branches as sub-graphs - Parallel/Map branches are separate instances
┌─────────────────────────────────────────────────────────┐
│ StepFlowGraph │
├─────────────────────────────────────────────────────────┤
│ RuleValidator │ FlowQueries │ EdgeFinder │
├─────────────────────────────────────────────────────────┤
│ @cook-step/graph │
└─────────────────────────────────────────────────────────┘Installation
pnpm add @cook-step/stepflow-graph @cook-step/graphQuick Start
import { StepFlowGraph, SocketId } from "@cook-step/stepflow-graph";
const graph = new StepFlowGraph();
// Add nodes
graph.addNode({
id: "ProcessOrder",
type: "Task",
position: { x: 0, y: 0 },
data: { resource: "arn:aws:lambda:process-order" },
});
graph.addNode({
id: "Done",
type: "Succeed",
position: { x: 200, y: 0 },
data: {},
});
// Add edge
graph.addEdge({
id: "e1",
source: { nodeId: "ProcessOrder", socketId: "next" as SocketId },
target: { nodeId: "Done" },
type: "next",
});
// Set start node
graph.setStartAt("ProcessOrder");
// Validate
const result = graph.validate();
console.log(result.valid); // trueState Types
| Type | Description | Max Outputs |
|------|-------------|-------------|
| Task | Execute a resource | 1 (next) + N (catch) |
| Pass | Pass input to output | 1 (next) |
| Wait | Wait for time/timestamp | 1 (next) |
| Choice | Conditional branching | N (choices) + 1 (default) |
| Parallel | Execute branches in parallel | 1 (next) + N (catch) |
| Map | Iterate over array | 1 (next) + N (catch) |
| Succeed | Terminal success | 0 |
| Fail | Terminal failure | 0 |
API Reference
Constructor
const graph = new StepFlowGraph(config?: {
metadata?: Record<string, unknown>;
eventEmitter?: IEventEmitter<StepFlowGraphEvents>;
});Start At
graph.setStartAt("nodeId");
graph.startAt; // → "nodeId" | nullMetadata
// Get/set metadata (for editor state like startPosition)
graph.getMetadata(); // → Record<string, unknown>
graph.setMetadata({ startPosition: { x: 50, y: 20 } });Node Operations
// Add node
graph.addNode({
id: "MyTask",
type: "Task",
position: { x: 0, y: 0 },
data: { resource: "arn:aws:..." },
sockets?: [...], // Auto-generated if not provided
retry?: [...],
catch?: [...],
isEnd?: boolean,
});
// Other operations
graph.removeNode(nodeId): boolean;
graph.updateNode(nodeId, updates): boolean;
graph.getNode(nodeId): StepFlowNode | undefined;
graph.getNodes(): StepFlowNode[];
graph.hasNode(nodeId): boolean;Edge Operations
// Add edge
graph.addEdge({
id: "e1",
source: { nodeId: "A", socketId: "next" },
target: { nodeId: "B" },
type: "next", // "next" | "choice" | "default" | "catch"
});
// Other operations
graph.removeEdge(edgeId): boolean;
graph.getEdge(edgeId): StepFlowEdge | undefined;
graph.getEdges(): StepFlowEdge[];
graph.hasEdge(edgeId): boolean;
graph.getNodeInputEdges(nodeId): StepFlowEdge[];
graph.getNodeOutputEdges(nodeId): StepFlowEdge[];Validation
// Validate entire graph (includes branches/iterators in cascade)
const result = graph.validate();
// → {
// valid: boolean, // true if no errors
// errors: [...], // Structural problems
// warnings: [...], // Development issues
// isProductionReady: boolean // true if no errors AND no warnings
// }
// Validate specific edge
graph.validateEdge(edgeId): EdgeValidationResult;
// Get invalid edges
graph.getInvalidEdges(): EdgeId[];
// Check production readiness
if (result.isProductionReady) {
// Safe to deploy
} else if (result.valid) {
// Can execute but has warnings
} else {
// Cannot execute - has errors
}Validation checks:
- startAt exists and points to valid node
- Edge types allowed for source node type
- Terminal nodes have no outputs
- Min/max edge counts per node type
- No self-references
- Socket without edge (warning)
- Orphan nodes (warning)
- Missing default in Choice (warning)
- Cascade validation: All branches and iterators are validated recursively
Cascade validation errors are prefixed with the path:
// Error in branch:
// id: "ParallelNode/branches[0]/graph"
// message: "[ParallelNode/branches[0]] Graph must have a startAt node"
// Error in nested iterator:
// id: "ParallelNode/branches[0]/MapNode/iterator/graph"
// message: "[ParallelNode/branches[0]][MapNode/iterator] ..."Edge Finding (UI)
// Find valid targets when dragging from a socket
const targets = graph.findValidTargets(sourceNodeId, sourceSocketId);
// → [{ nodeId, valid: boolean, reason?: string }, ...]
// Find valid sources when dragging to a node
const sources = graph.findValidSources(targetNodeId);
// → [{ nodeId, socketId, valid: boolean, reason?: string }, ...]
// Check specific connection
graph.canConnect(sourceNodeId, sourceSocketId, targetNodeId): boolean;Flow Queries
// Reachability from startAt
graph.getReachableNodes(): Set<NodeId>;
graph.getUnreachableNodes(): NodeId[];
// Terminal analysis
graph.getTerminalNodes(): StepFlowNode[];
graph.getDeadEndNodes(): StepFlowNode[]; // No output but not terminal
// By type
graph.getNodesByStateType("Task"): StepFlowNode[];
// Path analysis
graph.hasPath(sourceId, targetId): boolean;Socket Management
// Add socket to node (for Choice conditions)
graph.addSocket(nodeId, {
id: "choice:2",
type: "choice",
conditionIndex: 2,
});
// Remove socket
graph.removeSocket(nodeId, socketId): boolean;Branches (Parallel/Map)
Parallel and Map nodes can have sub-graphs (branches/iterators) that are separate StepFlowGraph instances.
// === PARALLEL BRANCHES ===
// Create branches
const branch1 = new StepFlowGraph();
branch1.addNode({ id: "Task1", type: "Task", ... });
branch1.setStartAt("Task1");
const branch2 = new StepFlowGraph();
branch2.addNode({ id: "Task2", type: "Task", ... });
branch2.setStartAt("Task2");
// Set all branches at once
graph.setBranches("ParallelNode", [branch1, branch2]);
// Or add one at a time
graph.addBranch("ParallelNode", branch3);
// Get branches
const branches = graph.getBranches("ParallelNode");
// → StepFlowGraph[]
// Remove branch by index
graph.removeBranch("ParallelNode", 1);
// === MAP ITERATOR ===
// Create iterator
const iterator = new StepFlowGraph();
iterator.addNode({ id: "ProcessItem", type: "Task", ... });
iterator.setStartAt("ProcessItem");
// Set iterator
graph.setIterator("MapNode", iterator);
// Get iterator
const iter = graph.getIterator("MapNode");
// → StepFlowGraph | undefined
// Remove iterator
graph.removeIterator("MapNode");
// === UTILITIES ===
// Check if node has branches or iterator
graph.hasBranchData("ParallelNode"); // → booleanSerialization
// Serialize (includes branches/iterators recursively)
const data = graph.serialize();
// → { startAt, nodes, edges, metadata }
// Nodes with branches: { ..., branches: [...] }
// Nodes with iterator: { ..., iterator: {...} }
// Load (reconstructs branches/iterators)
graph.load(data);
// Import (static)
const newGraph = StepFlowGraph.import(data, config?);
// Clone
const clone = graph.clone();
// Clear (also clears branches/iterators)
graph.clear();Events
graph.on("stepflow:validation:changed", (edgeId, result) => { ... });
graph.on("stepflow:validation:completed", (result) => { ... });
graph.on("stepflow:startAt:changed", (nodeId) => { ... });
graph.on("stepflow:socket:added", (nodeId, socket) => { ... });
graph.on("stepflow:socket:removed", (nodeId, socketId) => { ... });
// Also inherits all Graph events:
graph.on("node:added", (node) => { ... });
graph.on("node:removed", (nodeId, node) => { ... });
graph.on("edge:added", (edge) => { ... });
graph.on("edge:removed", (edgeId, edge) => { ... });Statistics
graph.getStats();
// → {
// totalNodes, totalEdges,
// reachableNodes, unreachableNodes,
// terminalNodes, deadEndNodes,
// choiceNodes, branchingNodes,
// validation: { total, valid, invalid },
// startAt
// }Examples
Choice State
graph.addNode({
id: "CheckStatus",
type: "Choice",
position: { x: 100, y: 0 },
data: {
conditions: [
{ variable: "$.status", operator: "stringEquals", value: "approved" },
{ variable: "$.status", operator: "stringEquals", value: "rejected" },
],
},
sockets: [
{ id: "choice:0" as SocketId, type: "choice", conditionIndex: 0 },
{ id: "choice:1" as SocketId, type: "choice", conditionIndex: 1 },
{ id: "default" as SocketId, type: "default" },
],
});
// Connect each branch
graph.addEdge({
id: "e1",
source: { nodeId: "CheckStatus", socketId: "choice:0" as SocketId },
target: { nodeId: "HandleApproved" },
type: "choice",
conditionIndex: 0,
});Task with Catch
graph.addNode({
id: "ProcessData",
type: "Task",
position: { x: 0, y: 0 },
data: { resource: "arn:aws:lambda:process" },
sockets: [
{ id: "next" as SocketId, type: "next" },
{ id: "catch:0" as SocketId, type: "catch" },
],
catch: [
{ errorEquals: ["States.TaskFailed"], resultPath: "$.error" },
],
});
// Normal flow
graph.addEdge({
id: "e1",
source: { nodeId: "ProcessData", socketId: "next" as SocketId },
target: { nodeId: "Success" },
type: "next",
});
// Error handler
graph.addEdge({
id: "e2",
source: { nodeId: "ProcessData", socketId: "catch:0" as SocketId },
target: { nodeId: "HandleError" },
type: "catch",
catchIndex: 0,
});Cycle (Loop)
// Cycles are allowed in StepFlowGraph!
graph.addNode({ id: "Process", type: "Task", ... });
graph.addNode({ id: "Check", type: "Choice", ... });
// Process → Check
graph.addEdge({
source: { nodeId: "Process", socketId: "next" },
target: { nodeId: "Check" },
type: "next",
});
// Check → Process (loop back)
graph.addEdge({
source: { nodeId: "Check", socketId: "choice:0" },
target: { nodeId: "Process" }, // Loop!
type: "choice",
});Parallel with Branches
// Main graph
const graph = new StepFlowGraph();
graph.addNode({
id: "ProcessInParallel",
type: "Parallel",
position: { x: 0, y: 0 },
data: { resultPath: "$.results" },
sockets: [{ id: "next" as SocketId, type: "next" }],
});
graph.addNode({
id: "Done",
type: "Succeed",
position: { x: 200, y: 0 },
data: {},
});
graph.addEdge({
id: "e1",
source: { nodeId: "ProcessInParallel", socketId: "next" as SocketId },
target: { nodeId: "Done" },
type: "next",
});
graph.setStartAt("ProcessInParallel");
// Branch 1: Send Email
const branch1 = new StepFlowGraph();
branch1.addNode({
id: "SendEmail",
type: "Task",
position: { x: 0, y: 0 },
data: { resource: "arn:aws:lambda:send-email" },
sockets: [],
isEnd: true,
});
branch1.setStartAt("SendEmail");
// Branch 2: Send SMS
const branch2 = new StepFlowGraph();
branch2.addNode({
id: "SendSMS",
type: "Task",
position: { x: 0, y: 0 },
data: { resource: "arn:aws:lambda:send-sms" },
sockets: [],
isEnd: true,
});
branch2.setStartAt("SendSMS");
// Attach branches
graph.setBranches("ProcessInParallel", [branch1, branch2]);Map with Iterator
// Main graph
const graph = new StepFlowGraph();
graph.addNode({
id: "ProcessItems",
type: "Map",
position: { x: 0, y: 0 },
data: {
itemsPath: "$.orders",
maxConcurrency: 10,
resultPath: "$.processedOrders",
},
sockets: [{ id: "next" as SocketId, type: "next" }],
});
graph.addNode({
id: "Done",
type: "Succeed",
position: { x: 200, y: 0 },
data: {},
});
graph.addEdge({
id: "e1",
source: { nodeId: "ProcessItems", socketId: "next" as SocketId },
target: { nodeId: "Done" },
type: "next",
});
graph.setStartAt("ProcessItems");
// Iterator: Process each order
const iterator = new StepFlowGraph();
iterator.addNode({
id: "ValidateOrder",
type: "Task",
position: { x: 0, y: 0 },
data: { resource: "arn:aws:lambda:validate-order" },
sockets: [{ id: "next" as SocketId, type: "next" }],
});
iterator.addNode({
id: "ProcessOrder",
type: "Task",
position: { x: 200, y: 0 },
data: { resource: "arn:aws:lambda:process-order" },
sockets: [],
isEnd: true,
});
iterator.addEdge({
id: "e1",
source: { nodeId: "ValidateOrder", socketId: "next" as SocketId },
target: { nodeId: "ProcessOrder" },
type: "next",
});
iterator.setStartAt("ValidateOrder");
// Attach iterator
graph.setIterator("ProcessItems", iterator);Normalizers (ASL ↔ Graph)
Convert between ASL-like definitions and StepFlowGraph.
fromDefinition - ASL → Graph
import { fromDefinition, StepFlowDefinition } from "@cook-step/stepflow-graph";
const definition: StepFlowDefinition = {
startAt: "ProcessOrder",
states: {
ProcessOrder: {
type: "Task",
resource: "arn:aws:lambda:process",
next: "Done",
},
Done: {
type: "Succeed",
},
},
};
const graph = fromDefinition(definition, {
xSpacing: 200, // Auto-layout spacing
ySpacing: 100,
});
// Branches are automatically converted to StepFlowGraph instances
const definitionWithBranches: StepFlowDefinition = {
startAt: "Parallel",
states: {
Parallel: {
type: "Parallel",
branches: [
{ startAt: "Task1", states: { Task1: { type: "Task", resource: "...", end: true } } },
{ startAt: "Task2", states: { Task2: { type: "Pass", end: true } } },
],
end: true,
},
},
};
const graphWithBranches = fromDefinition(definitionWithBranches);
const branches = graphWithBranches.getBranches("Parallel"); // → StepFlowGraph[]toDefinition - Graph → ASL
import { toDefinition } from "@cook-step/stepflow-graph";
const definition = toDefinition(graph, {
includeEditorMetadata: true, // Save node positions
schema: "https://...", // Optional $schema
});
// Result:
// {
// startAt: "ProcessOrder",
// states: { ... },
// _editor: {
// nodePositions: { ... },
// startPosition: { x: 50, y: 20 } // Visual "START" indicator position
// }
// }
// Branches are automatically converted back to ASL format
const defWithBranches = toDefinition(graphWithBranches);
// Result:
// {
// startAt: "Parallel",
// states: {
// Parallel: {
// type: "Parallel",
// branches: [
// { startAt: "Task1", states: { Task1: { ... } } },
// { startAt: "Task2", states: { Task2: { ... } } },
// ],
// end: true
// }
// }
// }Roundtrip
// ASL → Graph → ASL
const graph = fromDefinition(originalDefinition);
// ... modify graph ...
const newDefinition = toDefinition(graph);Editor Metadata (_editor)
The _editor field stores visual editor metadata (not part of ASL spec):
| Field | Type | Description |
|-------|------|-------------|
| nodePositions | Record<string, {x, y}> | Position of each node |
| startPosition | {x, y} | Position of the visual "START" indicator |
| viewport | {x, y, zoom} | Canvas viewport state |
The "START" indicator is a visual element in editors that shows where execution begins. It's not a real node - just points to startAt. The startPosition allows editors to save its position when users drag it.
// Set startPosition via graph metadata
const metadata = graph.getMetadata();
graph.setMetadata({ ...metadata, startPosition: { x: 50, y: 20 } });
// It will be saved in _editor when exporting
const def = toDefinition(graph);
// def._editor.startPosition → { x: 50, y: 20 }Validation Rules
Rules by StateType
| Rule | State Types | minOutEdges | maxOutEdges | allowedEdgeTypes | canHaveEnd |
|------|-------------|-------------|-------------|------------------|------------|
| rule:terminal | Succeed, Fail | 0 | 0 | - | No |
| rule:linear | Task, Pass, Wait | 0 | 1 | next, catch | Yes |
| rule:choice | Choice | 2 | ∞ | choice, default | No |
| rule:parallel | Parallel, Map | 0 | 1 | next, catch | Yes |
Edge Validation (validateEdge)
| Check | Error | |-------|-------| | Edge not found | "Edge not found" | | Source node missing | "Source node not found" | | Target node missing | "Target node not found" | | Self-reference | "Self-reference not allowed" | | Terminal with output | "Terminal state X cannot have outgoing edges" | | Invalid edge type | "Edge type X not allowed for Y" |
Connection Validation (canConnect)
| Check | Error | |-------|-------| | Self-connection | "Cannot connect to self" | | Source node missing | "Source node not found" | | Target node missing | "Target node not found" | | Terminal source | "X cannot have outgoing connections" | | Socket not found | "Socket X not found" | | Socket already connected | "Socket already has a connection" |
Graph Validation (validate())
Errors
| Code | Type | Description |
|------|------|-------------|
| MISSING_START_AT | graph | No startAt defined |
| INVALID_START_AT | graph | startAt points to non-existent node |
| INVALID_EDGE_TYPE | edge | Edge type not allowed for source node |
| MISSING_REQUIRED_EDGES | node | Node has fewer edges than minimum |
| TOO_MANY_EDGES | node | Node has more edges than maximum |
| SELF_REFERENCE | edge | Edge connects node to itself |
Warnings
| Code | Type | Description |
|------|------|-------------|
| SOCKET_WITHOUT_EDGE | socket | Socket has no outgoing edge (includes socketId) |
| MISSING_DEFAULT | node | Choice without default branch |
| ORPHAN_NODE | node | Node has no incoming edges (not startAt) |
Validation Flow
addEdge(edge)
│
├─► canConnect() ← Before creating
│ • Self-connection?
│ • Nodes exist?
│ • Terminal source?
│ • Socket exists?
│ • Socket available?
│
├─► graph.addEdge() ← Create edge
│
└─► validateEdge() ← After creating
• Edge exists?
• Nodes exist?
• Self-reference?
• Terminal source?
• EdgeType allowed?
validate() ← Full graph validation
├── startAt valid?
├── All edges valid?
├── Node edge counts OK?
├── Choice has default?
└── Orphan nodes?Visual Feedback (React Flow Integration)
StepFlowGraph provides computed flags for visual editor integration. These flags are not serialized.
Node: hasSocketWarnings
Indicates if any socket on the node has no outgoing edge.
// After validate(), nodes have hasSocketWarnings
graph.validate();
const node = graph.getNode("MyTask");
if (node?.hasSocketWarnings) {
// Show warning icon on node card
}Edge: useless
Indicates if the edge's source socket no longer exists.
const edge = graph.getEdge("e1");
if (edge?.useless) {
// Show edge with low opacity / dashed stroke
}Warning: SOCKET_WITHOUT_EDGE
Includes socketId for pinpointing which socket needs attention.
const result = graph.validate();
// Find warnings for a specific node
const socketWarnings = result.warnings.filter(
w => w.code === "SOCKET_WITHOUT_EDGE" && w.id === "MyTask"
);
for (const warning of socketWarnings) {
console.log(`Socket ${warning.socketId} has no edge`);
// Paint this socket handle red
}React Flow Example
// Node component
function CustomNode({ data }) {
const node = data.node;
const warnings = data.validationResult?.warnings || [];
const socketWarnings = warnings
.filter(w => w.code === "SOCKET_WITHOUT_EDGE" && w.id === node.id)
.map(w => w.socketId);
return (
<div className={node.hasSocketWarnings ? "node-warning" : ""}>
{node.sockets.map(socket => (
<Handle
key={socket.id}
id={socket.id}
className={socketWarnings.includes(socket.id) ? "socket-warning" : ""}
/>
))}
</div>
);
}
// Edge component
function CustomEdge({ data }) {
const edge = data.edge;
return (
<path className={edge.useless ? "edge-useless" : ""} />
);
}.node-warning {
border-color: #f59e0b;
}
.socket-warning {
background-color: #ef4444 !important;
}
.edge-useless {
opacity: 0.4;
stroke-dasharray: 5, 5;
}Serialization Note
Computed flags (hasSocketWarnings, useless) are automatically removed during serialization and recalculated when needed:
const serialized = graph.serialize();
// → No hasSocketWarnings or useless in output
const newGraph = StepFlowGraph.import(serialized);
newGraph.validate(); // Recalculates hasSocketWarnings
newGraph.getEdges(); // Recalculates uselessDifferences from DAGGraph
| Feature | DAGGraph | StepFlowGraph | |---------|----------|---------------| | Cycles | Prohibited | Allowed | | Type validation | JSON Schema compatibility | None | | Execution order | topologicalSort | startAt + transitions | | Validation | Type + Business rules | Rules only | | Registry | OverlayRegistry | Fixed rules | | Multiplicity | Per socket (multiple flag) | 1 edge per socket |
Architecture
See docs/ARCHITECTURE.md for detailed architecture documentation.
License
MIT
