@verydia/flow-dsl
v0.1.0
Published
Flow DSL engine for Verydia - build complex agent workflows with a declarative API
Readme
@verydia/flow-dsl
Declarative TypeScript DSL for composing Verydia agents, LLM calls, and MCP tools into readable, telemetry-aware workflows.
Stage 14 adds optional guard- and tone-aware behavior at the flow and step level, without changing existing call sites.
Basics
import { flow } from "@verydia/flow-dsl";
const myFlow = flow<{ text: string }, { message: string }>({
id: "example-flow",
initialState(input) {
return { input, text: input.text };
},
selectOutput(state) {
return state.agentOutput as { message: string };
},
})
.transform({...})
.agent({...})
.llm({...})
.tool({...})
.build();Runtime deps (injected by your app):
metadatamemoryStoretoolsClient(MCP)llmRegistry+defaultModelReftelemetry(VerydiaTelemetry)
Flow-level guard and tone
You can declare default guard and tone behavior on the flow:
const guardedFlow = flow<{ text: string }, { message: any }>({
id: "guarded-flow",
guard: {
policyPackIds: ["hipaa", "kids-safe"],
},
tone: {
defaultProfileId: "clinicalFormal",
},
initialState(input) {
return { input, text: input.text };
},
selectOutput(state) {
return state.agentOutput as { message: any };
},
})
.agent({
id: "run-agent",
agent: myAgent,
selectInput(state) {
return { text: state.text as string };
},
mergeOutput(state, output) {
return { ...state, agentOutput: output };
},
})
.build();guard.policyPackIdscan include logical packs like"hipaa","gdpr-lite","kids-safe".tone.defaultProfileIdis anyToneProfileIdfrom@verydia/tone, e.g.clinicalFormal,legalFormal,kidsFriendly.
Step-level overrides
Agent and LLM steps can override the flow defaults:
flow({... , tone: { defaultProfileId: "clinicalFormal" }})
.llm({
id: "default-clinical",
modelRef: "custom:model-1",
buildMessages(state) { ... },
mergeResult(state, result) { ... },
})
.llm({
id: "kids-step",
modelRef: "custom:model-2",
toneProfileId: "kidsFriendly", // overrides flow default
buildMessages(state) { ... },
mergeResult(state, result) { ... },
})
.build();Guard overrides work similarly via guardPolicyPackIds on the step.
Guard behavior
For agent and LLM steps, when guard is configured:
- Determine policy pack ids:
- Step-level
guardPolicyPackIds - Else flow-level
guard.policyPackIds
- Step-level
- Build a
PolicyEnginefrom known packs (e.g. HIPAA →piiBasicPolicy, kids-safe →kidsSafeLanguagePolicy). - Run pre-execution policy check on the selected input text.
- Run the underlying agent/LLM.
- Run post-execution policy check on the step output text.
- If any policy result is blocking, the step output is replaced with a blocked payload (for LLM, the text is replaced with a JSON-encoded blocked structure).
Telemetry:
{
type: "policy.evaluate",
timestamp,
data: { policyPackIds, stepId, flowId, stage: "pre" | "post" }
}Tone behavior
For agent and LLM steps, when tone is configured:
- Determine profile id:
- Step-level
toneProfileId - Else flow-level
tone.defaultProfileId
- Step-level
- Create a
ToneEngineviadefaultToneEngineForProfileand apply it to the step output text. - If the text changed, the output is rewritten (string or
{ message: string }objects are updated).
Telemetry:
{
type: "tone.rewrite",
timestamp,
data: { profileId, stepId, flowId }
}This keeps guard/tone integration as a thin wrapper around existing Verydia primitives, without changing your agents, LLMs, or tools.
Graph flows & branches
In addition to linear flows, you can define a graph of nodes with branch points.
const triageFlow = flow<{ text: string; severity: "high" | "medium" }, { result: string}>({
id: "triage-graph",
initialState(input) {
return { input, text: input.text, severity: input.severity };
},
selectOutput(state) {
return { result: state.result as string };
},
})
.transform({
id: "normalize",
run(state) {
return { ...state, text: String(state.text ?? "").trim() };
},
})
.branch({
id: "route-severity",
routes: [
{ when: (s) => s.severity === "high", next: "call-doctor" },
{ when: (s) => s.severity === "medium", next: "send-advice" },
],
defaultNext: "fallback",
})
.agent({ id: "call-doctor", ... })
.llm({ id: "send-advice", ... })
.tool({ id: "fallback", toolId: "slack.postMessage", ... })
.build();When a branch node runs, it evaluates routes in order. The first predicate whose when(state) returns true determines the next node id. If none match, it falls back to defaultNext (if provided); otherwise the flow ends.
Checkpoints & resume
Graph flows write checkpoints to the MemoryStore before and after each node:
checkpoint.start: includesflowId,nodeId, and astateSnapshotbefore execution.checkpoint.end: includesflowId,nodeId, and thestateSnapshotafter execution.
Shape (stored as a MemoryRecord with kind: "event" and axes.task.planId = flowId):
memoryStore?.write({
id: `${flowId}:${nodeId}:${event}:${timestamp}`,
kind: "event",
text: "flow checkpoint",
data: {
flowId,
nodeId,
stateSnapshot: state,
event: "checkpoint.start" | "checkpoint.end",
timestamp,
},
axes: {
temporal: { createdAt: new Date(timestamp) },
task: { planId: flowId, stepId: nodeId, status: event === "checkpoint.start" ? "in-progress" : "done" },
},
});To resume a long-running flow, construct a FlowResumePointer from the latest checkpoint.end record for a node and call resume:
import type { FlowResumePointer } from "@verydia/flow-dsl";
const records = await memoryStore.query({
kind: "event",
axes: { task: { planId: "triage-graph" } },
});
const endRecords = records.filter((r) => (r.data as any).event === "checkpoint.end");
const last = endRecords[endRecords.length - 1];
const pointer: FlowResumePointer = {
flowId: "triage-graph",
lastNodeId: (last.data as any).nodeId,
timestamp: (last.data as any).timestamp,
};
const resumed = await triageFlow.resume?.(pointer, { memoryStore, telemetry });The graph executor restores the stateSnapshot from the checkpoint and continues from the node after lastNodeId, ensuring already-completed nodes are not re-run.
Telemetry in graph mode
Graph flows emit additional telemetry events alongside existing ones:
workflow.node— per-node execution timingdata: { flowId, nodeId, kind, startedAt, finishedAt }
workflow.checkpoint— checkpoint lifecycledata: { flowId, nodeId, event: "start" | "end" }
workflow.route— branch routing decisionsdata: { flowId, nodeId, branchTaken: nextNodeId | null }
These are emitted in addition to:
workflow.step— preserved for backward compatibilitymcp.call— MCP tool invocationsllm.invoke— LLM calls viainvokeLlmWithRegistrypolicy.evaluate— guardrail evaluationstone.rewrite— tone rewriting applications
Parallel and map nodes (v2 with concurrency)
Graph flows can use parallel and map nodes for basic fan-out/fan-in patterns.
Parallel nodes
Run multiple child nodes in parallel and merge their resulting states:
const parallelFlow = flow<{ value: number }, { value: number }>({
id: "parallel-flow",
initialState(input) {
return { input, value: input.value };
},
selectOutput(state) {
return { value: state.value as number };
},
})
.transform({
id: "normalize",
run(state) {
return { ...state, value: Number(state.value ?? 0) };
},
})
.parallel({
id: "do-parallel",
children: ["add-1", "add-2"],
mergeResults(state, childStates) {
const base = state.value as number;
const deltas = childStates.map((cs) => (cs.value as number) - base);
return { ...state, value: base + deltas.reduce((a, b) => a + b, 0) };
},
})
.transform({ id: "add-1", run: (s) => ({ ...s, value: (s.value as number) + 1 }) })
.transform({ id: "add-2", run: (s) => ({ ...s, value: (s.value as number) + 2 }) })
.build();In v2, children still must reference non-branch, non-parallel, non-map nodes (single-step
children). Parallel nodes also support an optional concurrency field:
.parallel({
id: "fanout-api",
children: ["callA", "callB", "callC"],
concurrency: 2, // at most 2 children in flight
mergeResults(state, childStates) { ... },
})Semantics:
concurrencyis an upper bound on children in flight.- If
concurrencyisundefinedor<= 0, behavior matches v1: all children are started in parallel (subject to the event loop and underlying async work). - The executor internally queues child executions using a small
runWithConcurrencyLimithelper, but node/step/checkpoint telemetry and guard/tone/cost behavior are unchanged.
Map nodes
Map nodes fan out over an array of items, run a child node per item (in parallel), and then
fan back in via mergeItemResult:
const mapFlow = flow<{ base: number }, { results: number[] }>({
id: "map-flow",
initialState(input) {
return { input, base: input.base };
},
selectOutput(state) {
return { results: (state.results as number[]) ?? [] };
},
})
.map({
id: "map-items",
selectItems() {
return [1, 2, 3];
},
child: "per-item",
mergeItemResult(state, _item, itemState) {
const results = (state.results as number[] | undefined) ?? [];
return { ...state, results: [...results, itemState.value as number] };
},
})
.transform({
id: "per-item",
run(state) {
const base = state.base as number;
const item = state.currentItem as number;
return { ...state, value: base * item };
},
})
.build();In v2:
childmust reference a non-branch, non-parallel, non-map node (single-step child).- The current item and index are exposed on state as
currentItemandcurrentIndexfor convenience, but you can also encode item context however you like inmergeItemResult. - Map nodes support an optional
concurrencyfield that bounds how many items are processed at once:
.map({
id: "map-items-constrained",
selectItems(state) {
return state.items as number[];
},
child: "per-item",
concurrency: 1, // process items one at a time
mergeItemResult(state, _item, itemState) {
const results = (state.results as number[] | undefined) ?? [];
return { ...state, results: [...results, itemState.value as number] };
},
})Semantics are analogous to parallel nodes:
concurrencyis an upper bound on items in flight.- If
concurrencyisundefinedor<= 0, behavior matches v1: all items are started in parallel.
Parallel and map nodes reuse the existing execution helpers, so guard, tone, LLM, MCP, and cost guard behavior all apply identically when concurrency limits are set.
mapEach sugar
For the common pattern "map an array field on state into an output field", the builder exposes a
mapEach(options) helper:
const squareFlow = flow<{ items: number[] }, { results: number[] }>({
id: "square-items",
initialState(input) {
return { input, items: input.items, results: [] as number[] };
},
selectOutput(state) {
return { results: state.results as number[] };
},
})
.transform({
id: "square",
run(state) {
const item = state.currentItem as number;
const squared = item * item;
return { ...state, value: squared, results: [squared] };
},
})
.mapEach({
itemsField: "items",
resultField: "results",
childId: "square",
// optional concurrency: 2,
})
.build();mapEach is intentionally opinionated but simple:
itemsFieldis read from the state (non-arrays are treated as[]).- The child node specified by
childIdis run once per item withcurrentItemandcurrentIndexpopulated. - After each item,
mergeItemResultappends eitheritemState[resultField],itemState.value, or the fullitemStateintoresultField.
You can always drop down to the lower-level .map({ ... }) API when you need full control over
per-item state and aggregation.
