@durable-effect/workflow
v0.0.1-next.25
Published
Write workflows that survive server restarts, network failures, and deployments. Your code picks up exactly where it left off.
Downloads
2,386
Readme
@durable-effect/workflow
Write workflows that survive server restarts, network failures, and deployments. Your code picks up exactly where it left off.
const orderWorkflow = Workflow.make((orderId: string) =>
Effect.gen(function* () {
const order = yield* Workflow.step("Fetch", fetchOrder(orderId));
yield* Workflow.sleep("24 hours"); // Yes, actually sleep for a day
yield* Workflow.step("Charge", chargeCard(order));
})
);This library brings Effect's composable, type-safe programming model to durable execution. Built by Matthew Sessions at Backpine Labs as an experiment in generalizing effectful code on a durable runtime.
Status: Experimental. API may have breaking changes. Currently only supports Cloudflare Durable Objects as the execution engine.
Table of Contents
- Installation
- High-Level Usage
- Exporting the Workflow Class
- Using the Workflow Client
- Event Tracking
- Retry Features
- Timeouts
- Providing Services
- Recovery
- Automatic Data Purging
Installation
pnpm add @durable-effect/workflow effectHigh-Level Usage
Building a Basic Workflow
Workflows are built using Workflow.make(). A workflow is a function that takes an input and returns an Effect containing your workflow logic.
import { Effect } from "effect";
import { Workflow } from "@durable-effect/workflow";
const myWorkflow = Workflow.make((orderId: string) =>
Effect.gen(function* () {
// Fetch order data
const order = yield* Workflow.step("Fetch order", fetchOrder(orderId));
// Wait before processing
yield* Workflow.sleep("5 seconds");
// Process the order
yield* Workflow.step("Process order", processOrder(order));
// Send confirmation
yield* Workflow.step("Send confirmation", sendEmail(order.email));
})
);Steps
Steps are the core building blocks of a workflow. Each step:
- Has a unique name within the workflow
- Automatically caches its result in Durable Object storage
- Replays the cached result on workflow resume (skipping re-execution)
- Must return a JSON-serializable value
// Define your business logic as a regular Effect
const processData = (input: string) =>
Effect.gen(function* () {
// Process data, call services, access databases, etc.
yield* Effect.promise(() => new Promise(resolve => setTimeout(resolve, 3000)));
return { id: input, status: "complete" };
});
// Use it in a step - the result gets cached automatically
const result = yield* Workflow.step("Process data", processData(orderId));
// Same pattern for any effect
const user = yield* Workflow.step("Fetch user", fetchUser(userId));
// Step with non-serializable result - use Effect.asVoid to discard
yield* Workflow.step("Update database",
updateRecord(id).pipe(Effect.asVoid)
);
// Step with complex result - extract serializable fields
yield* Workflow.step("Create order",
createOrder(data).pipe(
Effect.map((order) => ({ id: order.id, status: order.status }))
)
);Important: Step results must be serializable. If your effect returns a complex object (ORM result, class instance, etc.), map it to a plain object or use Effect.asVoid to discard it.
Sleep
Sleeps are fully durable. Your workflow can sleep for a few seconds or a few months - it all depends on your business use case. The workflow will resume exactly where it left off, even across deployments and server restarts.
// Short delays for rate limiting
yield* Workflow.sleep("30 seconds");
// Wait a day before sending a follow-up
yield* Workflow.sleep("24 hours");
// Subscription renewal in 30 days
yield* Workflow.sleep("30 days");
// Using milliseconds
yield* Workflow.sleep(5000);Exporting the Workflow Class
To use your workflows with Cloudflare Workers, you need to:
- Define your workflows as a registry object
- Create the Durable Object class and client using
createDurableWorkflows() - Export the Workflows class from your worker entry point
Step 1: Define and Export Workflows
Create a file (e.g., workflows.ts) that defines and exports your workflows:
import { Effect } from "effect";
import { Workflow, Backoff, createDurableWorkflows } from "@durable-effect/workflow";
// Define your workflow (name comes from registry key)
const processOrderWorkflow = Workflow.make((orderId: string) =>
Effect.gen(function* () {
const order = yield* Workflow.step("Fetch order", fetchOrder(orderId));
yield* Workflow.sleep("3 seconds");
yield* Workflow.step("Process payment",
processPayment(order).pipe(
Workflow.retry({
maxAttempts: 5,
delay: Backoff.exponential({ base: "1 second", max: "60 seconds" }),
})
)
);
yield* Workflow.step("Send confirmation", sendEmail(order.email));
})
);
// Create a registry of all workflows
// The key becomes the workflow name
const workflows = {
processOrder: processOrderWorkflow,
} as const;
// Create and export the Durable Object class and client
export const { Workflows, WorkflowClient } = createDurableWorkflows(workflows);Step 2: Export from Worker Entry Point
In your main worker file (e.g., index.ts), export the Workflows class:
import { Workflows } from "./workflows";
// Export the Durable Object class
export { Workflows };
export default {
async fetch(request: Request, env: Env): Promise<Response> {
// Your fetch handler
},
};Step 3: Configure Wrangler
Add the Durable Object binding to your wrangler.jsonc:
{
"$schema": "node_modules/wrangler/config-schema.json",
"name": "my-worker",
"main": "src/index.ts",
"compatibility_date": "2025-11-28",
"durable_objects": {
"bindings": [
{
"name": "WORKFLOWS",
"class_name": "Workflows"
}
]
},
"migrations": [
{
"tag": "v1",
"new_classes": ["Workflows"]
}
]
}Using the Workflow Client
The WorkflowClient provides a type-safe, Effect-based interface for invoking and managing workflows. All methods are yieldable.
Creating a Client
Create a client from your Durable Object binding:
import { Effect } from "effect";
import { WorkflowClient } from "./workflows";
export const startWorkflow = (request: Request, env: Env) =>
Effect.gen(function* () {
const client = WorkflowClient.fromBinding(env.WORKFLOWS);
// Start a workflow - yields an Effect
const { id } = yield* client.runAsync({
workflow: "processOrder",
input: "order-123",
execution: { id: "order-123" }, // Optional: custom execution ID
});
return Response.json({ workflowId: id });
});Client Methods
All methods return Effects, making them yieldable:
const client = WorkflowClient.fromBinding(env.WORKFLOWS);
// Start a workflow asynchronously (returns immediately)
const { id } = yield* client.runAsync({
workflow: "processOrder",
input: orderId,
execution: { id: orderId }, // Optional custom ID
});
// id = "processOrder:order-123" (namespaced)
// Start a workflow synchronously (waits for completion/pause/failure)
const { id } = yield* client.run({
workflow: "processOrder",
input: orderId,
});
// Get workflow status
const status = yield* client.status(workflowId);
// Returns: { _tag: "Running" } | { _tag: "Completed", completedAt: number } | ...
// Get completed steps
const steps = yield* client.completedSteps(workflowId);
// Returns: ["Fetch order", "Process payment"]
// Get workflow metadata
const meta = yield* client.meta<MyMetaType>(workflowId, "myKey");
// Cancel a workflow
yield* client.cancel(workflowId, { reason: "User requested cancellation" });Using with Effect.runPromise
If you need to use the client outside of an Effect context:
const client = WorkflowClient.fromBinding(env.WORKFLOWS);
const { id } = await Effect.runPromise(
client.runAsync({
workflow: "processOrder",
input: orderId,
execution: { id: orderId },
})
);Service Pattern with Effect Tag
The client factory includes an Effect Tag for use with the service pattern:
const client = WorkflowClient.fromBinding(env.WORKFLOWS);
// Use the Tag for dependency injection
const program = Effect.gen(function* () {
const client = yield* WorkflowClient.Tag;
yield* client.runAsync({ workflow: "processOrder", input: "order-123" });
});
// Provide the client
Effect.runPromise(
program.pipe(Effect.provideService(WorkflowClient.Tag, client))
);Workflow Status Types
type WorkflowStatus =
| { _tag: "Pending" }
| { _tag: "Queued"; queuedAt: number }
| { _tag: "Running" }
| { _tag: "Paused"; reason: string; resumeAt: number }
| { _tag: "Completed"; completedAt: number }
| { _tag: "Failed"; error: unknown; failedAt: number }
| { _tag: "Cancelled"; cancelledAt: number; reason?: string };Event Tracking
Configure a tracker endpoint to monitor workflow execution and receive events.
Configuration
export const { Workflows, WorkflowClient } = createDurableWorkflows(workflows, {
tracker: {
// Required
endpoint: "https://events.example.com/ingest",
env: "production",
serviceKey: "my-service",
// Optional
batchSize: 10, // Events per batch (default: 10)
flushIntervalMs: 5000, // Auto-flush interval (default: 5000)
retry: {
maxAttempts: 3, // Retry failed sends (default: 3)
},
},
});Event Types
The tracker emits the following events:
Workflow Events:
workflow.started- Workflow execution beganworkflow.completed- Workflow finished successfullyworkflow.failed- Workflow failed with an errorworkflow.paused- Workflow paused (sleep/retry)workflow.resumed- Workflow resumed from pauseworkflow.cancelled- Workflow was cancelledworkflow.queued- Workflow queued for async execution
Step Events:
step.started- Step execution beganstep.completed- Step finished successfullystep.failed- Step failed with an error
Retry Events:
retry.scheduled- Retry attempt scheduledretry.exhausted- All retries exhausted
Sleep Events:
sleep.started- Sleep begansleep.completed- Sleep completed
Timeout Events:
timeout.set- Timeout deadline settimeout.exceeded- Timeout fired
Disabling Tracking
If no tracker is configured, events are not emitted:
// No tracker - events disabled
export const { Workflows, WorkflowClient } = createDurableWorkflows(workflows);Retry Features
The Workflow.retry() operator provides durable retries that persist across workflow restarts. Retries are applied inside a step:
yield* Workflow.step("External API call",
callExternalAPI().pipe(
Workflow.retry({ maxAttempts: 3, delay: "5 seconds" })
)
);Basic Retry Configuration
interface RetryOptions {
maxAttempts: number; // Number of retries (not including initial attempt)
delay?: DelayConfig; // Delay between retries
maxDuration?: string | number; // Total time budget for all attempts
jitter?: boolean; // Add randomness to delays (default: true)
}Examples:
// Fixed delay
Workflow.retry({ maxAttempts: 3, delay: "5 seconds" })
// No delay (immediate retry)
Workflow.retry({ maxAttempts: 3 })
// Custom delay function
Workflow.retry({
maxAttempts: 5,
delay: (attempt) => 1000 * Math.pow(2, attempt)
})Backoff Strategies
Import the Backoff namespace for advanced retry strategies:
import { Backoff } from "@durable-effect/workflow";Exponential Backoff
Delay grows exponentially: base * factor^attempt
Workflow.retry({
maxAttempts: 5,
delay: Backoff.exponential({
base: "1 second", // Starting delay
factor: 2, // Multiplier (default: 2)
max: "30 seconds", // Maximum delay cap
})
})
// Delays: 1s -> 2s -> 4s -> 8s -> 16s (capped at 30s)Linear Backoff
Delay grows linearly: initial + (attempt * increment)
Workflow.retry({
maxAttempts: 5,
delay: Backoff.linear({
initial: "1 second",
increment: "2 seconds",
max: "10 seconds",
})
})
// Delays: 1s -> 3s -> 5s -> 7s -> 9s (capped at 10s)Constant Backoff
Fixed delay between retries:
Workflow.retry({
maxAttempts: 3,
delay: Backoff.constant("5 seconds")
})Jitter
Jitter adds randomness to delays to prevent the "thundering herd" problem when many clients retry simultaneously. Jitter is enabled by default.
// Disable jitter
Workflow.retry({
maxAttempts: 3,
delay: "5 seconds",
jitter: false,
})Presets
Use built-in presets for common scenarios:
// Standard: 1s -> 2s -> 4s -> 8s -> 16s (max 30s)
Backoff.presets.standard()
// Aggressive: 100ms -> 200ms -> 400ms -> 800ms (max 5s)
// For internal services with low latency
Backoff.presets.aggressive()
// Patient: 5s -> 10s -> 20s -> 40s (max 2min)
// For rate-limited APIs
Backoff.presets.patient()
// Simple: 1s constant
// For polling scenarios
Backoff.presets.simple()Usage:
yield* Workflow.step("Call rate-limited API",
callAPI().pipe(
Workflow.retry({
maxAttempts: 10,
delay: Backoff.presets.patient(),
})
)
);Max Duration
Set a total time budget for all retry attempts:
Workflow.retry({
maxAttempts: 100,
delay: Backoff.exponential({ base: "1 second" }),
maxDuration: "5 minutes", // Stop retrying after 5 minutes total
})Selective Retry with Effect Error Handling
One of the most powerful features of using Effect with a durable runtime is fine-grained error control. You can use Effect's error handling to decide which errors should trigger retries and which should fail immediately.
Using catchTag to Skip Retries
import { Effect, Data } from "effect";
// Define typed errors
class ValidationError extends Data.TaggedError("ValidationError")<{
readonly message: string;
}> {}
class NetworkError extends Data.TaggedError("NetworkError")<{
readonly message: string;
}> {}
// Workflow with selective retry
const processPaymentWorkflow = Workflow.make((paymentId: string) =>
Effect.gen(function* () {
yield* Workflow.step("Process payment",
processPayment(paymentId).pipe(
// Catch validation errors - don't retry, fail immediately
Effect.catchTag("ValidationError", (err) =>
Effect.fail(new PaymentFailed({ reason: err.message }))
),
// Network errors bubble up for retry
Workflow.retry({
maxAttempts: 5,
delay: Backoff.presets.standard(),
})
)
);
})
);Timeouts
The Workflow.timeout() operator sets a deadline for step execution. The deadline persists across workflow restarts.
yield* Workflow.step("External API",
callExternalAPI().pipe(
Workflow.timeout("30 seconds")
)
);Timeout with Retry
When combining timeout and retry, the timeout applies to each attempt individually:
yield* Workflow.step("API call",
callAPI().pipe(
Workflow.timeout("30 seconds"), // Each attempt has 30 seconds
Workflow.retry({ maxAttempts: 3 })
)
);
// Total max time: 3 attempts * 30 seconds = 90 seconds (plus delays)Duration Formats
Both timeout and sleep accept string or number formats:
Workflow.timeout("30 seconds")
Workflow.timeout("5 minutes")
Workflow.timeout("2 hours")
Workflow.timeout(5000) // millisecondsProviding Services
Workflows support Effect's service pattern for dependency injection. Provide services at the end of your workflow using .pipe().
Basic Service Provision
import { Effect, Context, Layer } from "effect";
// Define a service
class EmailService extends Context.Tag("EmailService")<
EmailService,
{
readonly send: (to: string, body: string) => Effect.Effect<void>;
}
>() {}
// Create a layer
const EmailServiceLive = Layer.succeed(EmailService, {
send: (to, body) => Effect.promise(() => sendEmailViaAPI(to, body)),
});
// Workflow using the service
const notificationWorkflow = Workflow.make((userId: string) =>
Effect.gen(function* () {
const user = yield* Workflow.step("Fetch user", fetchUser(userId));
const emailService = yield* EmailService;
yield* Workflow.step("Send email",
emailService.send(user.email, "Hello!").pipe(Effect.asVoid)
);
}).pipe(
Effect.provide(EmailServiceLive)
)
);Multiple Services
const MyServices = Layer.mergeAll(
EmailServiceLive,
DatabaseServiceLive,
LoggingServiceLive
);
const complexWorkflow = Workflow.make((input: Input) =>
Effect.gen(function* () {
// ... workflow logic using services
}).pipe(
Effect.provide(MyServices)
)
);Error Types
The library exports typed errors for proper error handling:
import {
WorkflowClientError, // Client operation failed
StepCancelledError, // Step was cancelled
RetryExhaustedError, // All retries exhausted
WorkflowTimeoutError, // Step exceeded timeout
StorageError, // Durable Object storage error
OrchestratorError, // Orchestration error
WorkflowScopeError, // Operation used outside workflow
StepScopeError, // Sleep/sleepUntil used inside step
} from "@durable-effect/workflow";Recovery
Workflows automatically recover from infrastructure failures. If a workflow is in "Running" state when the Durable Object restarts, it will automatically schedule recovery.
Configuration
export const { Workflows, WorkflowClient } = createDurableWorkflows(workflows, {
recovery: {
staleThresholdMs: 30000, // Consider stale after 30s (default)
maxRecoveryAttempts: 3, // Max recovery attempts (default: 3)
recoveryDelayMs: 1000, // Delay before recovery (default: 1000)
},
});Automatic Data Purging
By default, workflow data (state, step results, metadata) persists in Durable Object storage indefinitely. For high-volume workflows, this can lead to storage bloat. Enable automatic purging to delete workflow data after completion.
Configuration
export const { Workflows, WorkflowClient } = createDurableWorkflows(workflows, {
purge: {
delay: "5 minutes", // Delete data 5 minutes after terminal state
},
});When enabled, workflow data is automatically purged after the workflow reaches a terminal state (completed, failed, or cancelled). The delay gives you time to query final status before cleanup.
Delay Formats
The delay option accepts Effect duration strings or milliseconds:
// String formats
purge: { delay: "30 seconds" }
purge: { delay: "5 minutes" }
purge: { delay: "1 hour" }
purge: { delay: "1 day" }
// Milliseconds
purge: { delay: 60000 }
What Gets Purged
When purge executes, all Durable Object storage for that workflow instance is deleted:
- Workflow state and status
- Step results and metadata
- Recovery tracking data
- Any custom metadata stored via
getMeta()
Disabling Purge
Omit the purge option to retain data indefinitely (default behavior):
// No purge - data retained forever
export const { Workflows, WorkflowClient } = createDurableWorkflows(workflows);Logs
When a purge executes, it logs:
[Workflow] Purged data for {instanceId} ({reason})Where reason is the terminal state that triggered the purge (completed, failed, or cancelled).
License
MIT
Built by Matthew Sessions at Backpine Labs
