iterativeflow
v3.1.0
Published
Durable, iterative workflows on your own Postgres. Steps, sleeps, hooks, loops, cron — runs inside your Node app on graphile-worker + drizzle-orm.
Maintainers
Readme
iterativeflow
Durable, iterative flows on your own Postgres.
Inspired by Trigger.dev and Temporal — same idea (write a flow as code, suspend for hours or days, survive crashes), but it runs inside your Node app on graphile-worker + drizzle-orm. No separate service to host.
Schemas use Standard Schema — any compliant validator works (zod, valibot, arktype, …).
const onboard = flow("onboard")
.input(z.object({ userId: z.string() }))
.step("create-account", async ({ input, signal }) => createAccount(input.userId, { signal }))
.sleep("3d")
.signal("survey", { schema: z.object({ score: z.number() }) })
.output(({ input }) => ({ score: input.score }))
.build();
const handle = engine.register(onboard);
const { runId } = await handle.start({ userId: "u_1" });
// 3 days later, from a webhook:
const result = await engine.signal(runId, "survey", { score: 9 });
switch (result.kind) {
case "delivered": // the run was awaiting; now resumes
case "buffered": // signal arrived first; consumed on arm
case "duplicate": // already accepted; idempotent
case "expired": // signal's timeout fired; reject the webhook
}
const out = await handle.result(runId); // resolves when terminalThat run lives in Postgres for three days. Workers can crash, deploys can roll, the process can be killed and restarted — when the timer fires, the flow resumes from where it left off.
- Steps with retries, backoff, per-step timeouts, and
AbortSignalin the step args - Sleeps and external signals lasting days or weeks (
ctx.signal(name)) ctx.invoke(child, input)for child flows / fan-outhandle.result(runId)blocks until terminal (via Postgres LISTEN/NOTIFY)engine.listRuns({ tag, status, since })for ops dashboards- Versioned flows — edit a flow's shape and you get a loud error, not silent breakage. Loop bodies are checked for rename/kind drift too.
- At-least-once via a transactional outbox; a reconciler picks up anything stranded
Install
npm install iterativeflow drizzle-orm graphile-worker pgPeers: drizzle-orm, graphile-worker, pg.
Setup
1. Generate the schema file in your project
npx iterativeflow generate-schema
# wrote ./iterativeflow-schema.tsThis emits a drizzle schema file at the project root (override with --out). The file is typed against your drizzle-orm — so db.select().from(flowTables.runs) and drizzle-kit migration generation work regardless of which drizzle version iterativeflow itself was built against. Re-run the command after upgrading iterativeflow.
2. Add it to your drizzle.config.ts
// drizzle.config.ts
import { defineConfig } from "drizzle-kit";
export default defineConfig({
dialect: "postgresql",
schema: ["./db/your-schema.ts", "./iterativeflow-schema.ts"],
out: "./drizzle",
dbCredentials: { url: process.env.DATABASE_URL! },
});3. Customize (optional)
You own the generated file. Rename tables, switch pgSchema names, add columns, add indexes. When you customize, pass your flowTables to createEngine({ tables: flowTables }) so the engine knows about the renames — otherwise the engine queries the default workflow.* schema and your customizations break it. The default createEngine({ db, pool }) works with the unmodified generated file.
4. Install both schemas
# install graphile-worker's schema
node -e "import('graphile-worker').then(m => m.migrate({ pgPool: pool }))"
# install iterativeflow's workflow.* schema
npx drizzle-kit generate && npx drizzle-kit migrateOr apply iterativeflow's bundled SQL directly: psql -f node_modules/iterativeflow/migrations/0000_init.sql.
Hello flow
import { Pool } from "pg";
import { drizzle } from "drizzle-orm/node-postgres";
import { createEngine, flow } from "iterativeflow";
import { z } from "zod";
const pool = new Pool({ connectionString: process.env.DATABASE_URL });
const db = drizzle(pool);
const engine = createEngine({ db, pool });
const onboard = flow("onboard")
.version(1)
.input(z.object({ userId: z.string() }))
.step("account", ({ input }) => createAccount(input.userId))
.sleep("3d")
.signal("survey", { schema: z.object({ score: z.number() }) })
.output(({ input }) => ({ score: input.score }))
.build();
const handle = engine.register(onboard);
await engine.listen();
const { runId } = await handle.start({ userId: "u_1" });
await engine.signal(runId, "survey", { score: 9 }); // from a webhook later
const out = await handle.result(runId); // resolves when terminalDefaults you should know
A few load-bearing semantics that surprise people. Read once.
- Steps are memoized forever. Once a step result is stored, the body never re-runs for that
runId. A code change to a step body between deploys → resumed runs use the OLD result. Bump the flow.version(N)to get the new code. - The top of the flow body re-runs on every resume. Memoized steps short-circuit; signals/sleeps short-circuit. Don't put side effects at the top level — wrap them in
ctx.step. Date.now()/Math.random()at the top level is non-deterministic. Wrap inctx.step("now", () => Date.now())to memoize.AbortSignalmust be honored. A step that ignoressignalkeeps running after a timeout/cancel — the engine throws on time but the work continues. Passsignaltofetch,pg,undici, OpenAI SDKs.- Error codes are stable across patches; error messages are not. Alert on
code, log the message. engine.signal(runId, name, payload)is single-consumer. Not pub/sub. Each call delivers to one armedctx.signal(or buffers for the first arm).- Idempotency keys are scoped to
(name, version, key). Cross-version dedup is intentionally NOT happening. Bumping.version(N)lets the same key start a fresh run. - No defaults you might assume exist:
concurrencydefault5— bump for high throughputdefaultStepTimeoutMsdefaultundefined— a step can hang forever unless you set itlimits.*Bytesdefaultundefined— no payload size cap unless you set itretentiondefaultundefined—eventsandrunstables grow forever unless configured- cron
timezonedefaultUTC— settimezone: "America/Los_Angeles"etc. if you need local time
- The pool is yours.
engine.stop()does NOT callpool.end(). Call it yourself in your shutdown sequence. maxRunAttemptsdefault100. Poison-pill runs die after that withRUN_ATTEMPTS_EXHAUSTED.ctx.invokehas tree caps.limits.maxInvokeDepthdefault10(root counts as 1);limits.maxChildrenPerRundefault1000. Exceeding either throwsINVOKE_DEPTH_EXCEEDED/INVOKE_FANOUT_EXCEEDEDnon-retryably. Stops accidental infinite recursion or runaway fan-out from filling the runs table.
Full reference: docs/replay-semantics.md, docs/signals.md.
The model
flowchart LR
i(("input I")) -->|"I"| a["step a<br/>fn returns A"]
a -->|"A"| s["sleep 3d<br/>transparent"]
s -->|"A"| h["signal survey<br/>delivers payload P"]
h -->|"P"| o(("output O"))A flow is a linear chain (with optional loops). Each .step() fn is memoized by (runId, cursor_key) and re-runs only if no result is stored. sleep and signal suspend the run durably; the engine resumes it later from snapshot.
Inside a step / flow body:
async (ctx) => {
const x = await ctx.step("fetch", async ({ signal }) => fetch(url, { signal }));
await ctx.sleep("1h");
const survey = await ctx.signal<{ score: number }>("survey", {
timeout: "7d",
});
const summary = await ctx.invoke(childHandle, { x, survey }); // child flow
return summary;
};Production
const engine = createEngine({
db,
pool, // caller-owned; ≥ concurrency + headroom
logger: consoleLogger(), // or your own Logger
concurrency: 10,
maxRunAttempts: 100, // hard ceiling — stops poison-pill loops
defaultStepTimeoutMs: 30 * 60_000, // 30m fallback per step
retention: {
eventsOlderThan: "30d",
runsOlderThan: "90d",
schedule: "0 * * * *", // hourly
},
limits: {
maxInputBytes: 256 * 1024,
maxStepResultBytes: 256 * 1024,
maxSignalPayloadBytes: 64 * 1024,
},
metrics: {
runStarted: ({ name }) => counters.runs_started.inc({ name }),
runCompleted: ({ name, durationMs }) => histograms.run_duration.observe({ name }, durationMs),
stepFinished: ({ status, durationMs }) => histograms.step.observe({ status }, durationMs),
signalDelivered: ({ kind }) => counters.signals.inc({ kind }),
},
});
const detach = engine.attachShutdownSignals();
await engine.listen();AbortSignal in steps. Every step fn receives { input, signal, attempt }. Pass signal to fetch, undici, pg, openai SDKs — engine.cancel(runId) propagates an abort. With defaultStepTimeoutMs set, a hung step gets a step "name" exceeded timeoutMs=... error AND the abort fires.
Multi-tenant idempotency. The unique constraint is (name, version, idempotencyKey). For multi-tenant deployments prefix the key yourself: idempotencyKey: \${tenantId}:${requestId}``.
Pool ownership. createEngine doesn't own the pg.Pool. Call engine.stop() then pool.end() in your shutdown path.
Versioning. .version(N) enforces positive integers and forbids regression. Changes to a flow's shape between versions are caught by the replay-compat check — including renames inside loop bodies (occurrence count inside a loop is dynamic, but base names are still verified).
Non-retryable errors. Throw FlowRuntimeError with nonRetryable: true to skip retries on a permanent failure:
import { FlowRuntimeError } from "iterativeflow";
await ctx.step("charge", async () => {
if (declined) {
throw new FlowRuntimeError({
code: "CARD_DECLINED",
message: "issuer declined",
nonRetryable: true,
});
}
});Full concepts, versioning, failure modes, and reference: docs/guide.md. Worked use cases (checkout, onboarding, multi-agent AI + human-in-loop, multi-signer, saga, account deletion): docs/examples/.
License
MIT
