npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2025 – Pkg Stats / Ryan Hefner

@durable-effect/workflow

v0.0.1-next.25

Published

Write workflows that survive server restarts, network failures, and deployments. Your code picks up exactly where it left off.

Downloads

2,386

Readme

@durable-effect/workflow

Write workflows that survive server restarts, network failures, and deployments. Your code picks up exactly where it left off.

const orderWorkflow = Workflow.make((orderId: string) =>
  Effect.gen(function* () {
    const order = yield* Workflow.step("Fetch", fetchOrder(orderId));
    yield* Workflow.sleep("24 hours");  // Yes, actually sleep for a day
    yield* Workflow.step("Charge", chargeCard(order));
  })
);

This library brings Effect's composable, type-safe programming model to durable execution. Built by Matthew Sessions at Backpine Labs as an experiment in generalizing effectful code on a durable runtime.

Status: Experimental. API may have breaking changes. Currently only supports Cloudflare Durable Objects as the execution engine.


Table of Contents


Installation

pnpm add @durable-effect/workflow effect

High-Level Usage

Building a Basic Workflow

Workflows are built using Workflow.make(). A workflow is a function that takes an input and returns an Effect containing your workflow logic.

import { Effect } from "effect";
import { Workflow } from "@durable-effect/workflow";

const myWorkflow = Workflow.make((orderId: string) =>
  Effect.gen(function* () {
    // Fetch order data
    const order = yield* Workflow.step("Fetch order", fetchOrder(orderId));

    // Wait before processing
    yield* Workflow.sleep("5 seconds");

    // Process the order
    yield* Workflow.step("Process order", processOrder(order));

    // Send confirmation
    yield* Workflow.step("Send confirmation", sendEmail(order.email));
  })
);

Steps

Steps are the core building blocks of a workflow. Each step:

  • Has a unique name within the workflow
  • Automatically caches its result in Durable Object storage
  • Replays the cached result on workflow resume (skipping re-execution)
  • Must return a JSON-serializable value
// Define your business logic as a regular Effect
const processData = (input: string) =>
  Effect.gen(function* () {
    // Process data, call services, access databases, etc.
    yield* Effect.promise(() => new Promise(resolve => setTimeout(resolve, 3000)));

    return { id: input, status: "complete" };
  });

// Use it in a step - the result gets cached automatically
const result = yield* Workflow.step("Process data", processData(orderId));

// Same pattern for any effect
const user = yield* Workflow.step("Fetch user", fetchUser(userId));

// Step with non-serializable result - use Effect.asVoid to discard
yield* Workflow.step("Update database",
  updateRecord(id).pipe(Effect.asVoid)
);

// Step with complex result - extract serializable fields
yield* Workflow.step("Create order",
  createOrder(data).pipe(
    Effect.map((order) => ({ id: order.id, status: order.status }))
  )
);

Important: Step results must be serializable. If your effect returns a complex object (ORM result, class instance, etc.), map it to a plain object or use Effect.asVoid to discard it.

Sleep

Sleeps are fully durable. Your workflow can sleep for a few seconds or a few months - it all depends on your business use case. The workflow will resume exactly where it left off, even across deployments and server restarts.

// Short delays for rate limiting
yield* Workflow.sleep("30 seconds");

// Wait a day before sending a follow-up
yield* Workflow.sleep("24 hours");

// Subscription renewal in 30 days
yield* Workflow.sleep("30 days");

// Using milliseconds
yield* Workflow.sleep(5000);

Exporting the Workflow Class

To use your workflows with Cloudflare Workers, you need to:

  1. Define your workflows as a registry object
  2. Create the Durable Object class and client using createDurableWorkflows()
  3. Export the Workflows class from your worker entry point

Step 1: Define and Export Workflows

Create a file (e.g., workflows.ts) that defines and exports your workflows:

import { Effect } from "effect";
import { Workflow, Backoff, createDurableWorkflows } from "@durable-effect/workflow";

// Define your workflow (name comes from registry key)
const processOrderWorkflow = Workflow.make((orderId: string) =>
  Effect.gen(function* () {
    const order = yield* Workflow.step("Fetch order", fetchOrder(orderId));
    yield* Workflow.sleep("3 seconds");

    yield* Workflow.step("Process payment",
      processPayment(order).pipe(
        Workflow.retry({
          maxAttempts: 5,
          delay: Backoff.exponential({ base: "1 second", max: "60 seconds" }),
        })
      )
    );

    yield* Workflow.step("Send confirmation", sendEmail(order.email));
  })
);

// Create a registry of all workflows
// The key becomes the workflow name
const workflows = {
  processOrder: processOrderWorkflow,
} as const;

// Create and export the Durable Object class and client
export const { Workflows, WorkflowClient } = createDurableWorkflows(workflows);

Step 2: Export from Worker Entry Point

In your main worker file (e.g., index.ts), export the Workflows class:

import { Workflows } from "./workflows";

// Export the Durable Object class
export { Workflows };

export default {
  async fetch(request: Request, env: Env): Promise<Response> {
    // Your fetch handler
  },
};

Step 3: Configure Wrangler

Add the Durable Object binding to your wrangler.jsonc:

{
  "$schema": "node_modules/wrangler/config-schema.json",
  "name": "my-worker",
  "main": "src/index.ts",
  "compatibility_date": "2025-11-28",

  "durable_objects": {
    "bindings": [
      {
        "name": "WORKFLOWS",
        "class_name": "Workflows"
      }
    ]
  },

  "migrations": [
    {
      "tag": "v1",
      "new_classes": ["Workflows"]
    }
  ]
}

Using the Workflow Client

The WorkflowClient provides a type-safe, Effect-based interface for invoking and managing workflows. All methods are yieldable.

Creating a Client

Create a client from your Durable Object binding:

import { Effect } from "effect";
import { WorkflowClient } from "./workflows";

export const startWorkflow = (request: Request, env: Env) =>
  Effect.gen(function* () {
    const client = WorkflowClient.fromBinding(env.WORKFLOWS);

    // Start a workflow - yields an Effect
    const { id } = yield* client.runAsync({
      workflow: "processOrder",
      input: "order-123",
      execution: { id: "order-123" }, // Optional: custom execution ID
    });

    return Response.json({ workflowId: id });
  });

Client Methods

All methods return Effects, making them yieldable:

const client = WorkflowClient.fromBinding(env.WORKFLOWS);

// Start a workflow asynchronously (returns immediately)
const { id } = yield* client.runAsync({
  workflow: "processOrder",
  input: orderId,
  execution: { id: orderId }, // Optional custom ID
});
// id = "processOrder:order-123" (namespaced)

// Start a workflow synchronously (waits for completion/pause/failure)
const { id } = yield* client.run({
  workflow: "processOrder",
  input: orderId,
});

// Get workflow status
const status = yield* client.status(workflowId);
// Returns: { _tag: "Running" } | { _tag: "Completed", completedAt: number } | ...

// Get completed steps
const steps = yield* client.completedSteps(workflowId);
// Returns: ["Fetch order", "Process payment"]

// Get workflow metadata
const meta = yield* client.meta<MyMetaType>(workflowId, "myKey");

// Cancel a workflow
yield* client.cancel(workflowId, { reason: "User requested cancellation" });

Using with Effect.runPromise

If you need to use the client outside of an Effect context:

const client = WorkflowClient.fromBinding(env.WORKFLOWS);

const { id } = await Effect.runPromise(
  client.runAsync({
    workflow: "processOrder",
    input: orderId,
    execution: { id: orderId },
  })
);

Service Pattern with Effect Tag

The client factory includes an Effect Tag for use with the service pattern:

const client = WorkflowClient.fromBinding(env.WORKFLOWS);

// Use the Tag for dependency injection
const program = Effect.gen(function* () {
  const client = yield* WorkflowClient.Tag;
  yield* client.runAsync({ workflow: "processOrder", input: "order-123" });
});

// Provide the client
Effect.runPromise(
  program.pipe(Effect.provideService(WorkflowClient.Tag, client))
);

Workflow Status Types

type WorkflowStatus =
  | { _tag: "Pending" }
  | { _tag: "Queued"; queuedAt: number }
  | { _tag: "Running" }
  | { _tag: "Paused"; reason: string; resumeAt: number }
  | { _tag: "Completed"; completedAt: number }
  | { _tag: "Failed"; error: unknown; failedAt: number }
  | { _tag: "Cancelled"; cancelledAt: number; reason?: string };

Event Tracking

Configure a tracker endpoint to monitor workflow execution and receive events.

Configuration

export const { Workflows, WorkflowClient } = createDurableWorkflows(workflows, {
  tracker: {
    // Required
    endpoint: "https://events.example.com/ingest",
    env: "production",
    serviceKey: "my-service",

    // Optional
    batchSize: 10,           // Events per batch (default: 10)
    flushIntervalMs: 5000,   // Auto-flush interval (default: 5000)
    retry: {
      maxAttempts: 3,        // Retry failed sends (default: 3)
    },
  },
});

Event Types

The tracker emits the following events:

Workflow Events:

  • workflow.started - Workflow execution began
  • workflow.completed - Workflow finished successfully
  • workflow.failed - Workflow failed with an error
  • workflow.paused - Workflow paused (sleep/retry)
  • workflow.resumed - Workflow resumed from pause
  • workflow.cancelled - Workflow was cancelled
  • workflow.queued - Workflow queued for async execution

Step Events:

  • step.started - Step execution began
  • step.completed - Step finished successfully
  • step.failed - Step failed with an error

Retry Events:

  • retry.scheduled - Retry attempt scheduled
  • retry.exhausted - All retries exhausted

Sleep Events:

  • sleep.started - Sleep began
  • sleep.completed - Sleep completed

Timeout Events:

  • timeout.set - Timeout deadline set
  • timeout.exceeded - Timeout fired

Disabling Tracking

If no tracker is configured, events are not emitted:

// No tracker - events disabled
export const { Workflows, WorkflowClient } = createDurableWorkflows(workflows);

Retry Features

The Workflow.retry() operator provides durable retries that persist across workflow restarts. Retries are applied inside a step:

yield* Workflow.step("External API call",
  callExternalAPI().pipe(
    Workflow.retry({ maxAttempts: 3, delay: "5 seconds" })
  )
);

Basic Retry Configuration

interface RetryOptions {
  maxAttempts: number;              // Number of retries (not including initial attempt)
  delay?: DelayConfig;              // Delay between retries
  maxDuration?: string | number;    // Total time budget for all attempts
  jitter?: boolean;                 // Add randomness to delays (default: true)
}

Examples:

// Fixed delay
Workflow.retry({ maxAttempts: 3, delay: "5 seconds" })

// No delay (immediate retry)
Workflow.retry({ maxAttempts: 3 })

// Custom delay function
Workflow.retry({
  maxAttempts: 5,
  delay: (attempt) => 1000 * Math.pow(2, attempt)
})

Backoff Strategies

Import the Backoff namespace for advanced retry strategies:

import { Backoff } from "@durable-effect/workflow";

Exponential Backoff

Delay grows exponentially: base * factor^attempt

Workflow.retry({
  maxAttempts: 5,
  delay: Backoff.exponential({
    base: "1 second",       // Starting delay
    factor: 2,              // Multiplier (default: 2)
    max: "30 seconds",      // Maximum delay cap
  })
})
// Delays: 1s -> 2s -> 4s -> 8s -> 16s (capped at 30s)

Linear Backoff

Delay grows linearly: initial + (attempt * increment)

Workflow.retry({
  maxAttempts: 5,
  delay: Backoff.linear({
    initial: "1 second",
    increment: "2 seconds",
    max: "10 seconds",
  })
})
// Delays: 1s -> 3s -> 5s -> 7s -> 9s (capped at 10s)

Constant Backoff

Fixed delay between retries:

Workflow.retry({
  maxAttempts: 3,
  delay: Backoff.constant("5 seconds")
})

Jitter

Jitter adds randomness to delays to prevent the "thundering herd" problem when many clients retry simultaneously. Jitter is enabled by default.

// Disable jitter
Workflow.retry({
  maxAttempts: 3,
  delay: "5 seconds",
  jitter: false,
})

Presets

Use built-in presets for common scenarios:

// Standard: 1s -> 2s -> 4s -> 8s -> 16s (max 30s)
Backoff.presets.standard()

// Aggressive: 100ms -> 200ms -> 400ms -> 800ms (max 5s)
// For internal services with low latency
Backoff.presets.aggressive()

// Patient: 5s -> 10s -> 20s -> 40s (max 2min)
// For rate-limited APIs
Backoff.presets.patient()

// Simple: 1s constant
// For polling scenarios
Backoff.presets.simple()

Usage:

yield* Workflow.step("Call rate-limited API",
  callAPI().pipe(
    Workflow.retry({
      maxAttempts: 10,
      delay: Backoff.presets.patient(),
    })
  )
);

Max Duration

Set a total time budget for all retry attempts:

Workflow.retry({
  maxAttempts: 100,
  delay: Backoff.exponential({ base: "1 second" }),
  maxDuration: "5 minutes",  // Stop retrying after 5 minutes total
})

Selective Retry with Effect Error Handling

One of the most powerful features of using Effect with a durable runtime is fine-grained error control. You can use Effect's error handling to decide which errors should trigger retries and which should fail immediately.

Using catchTag to Skip Retries

import { Effect, Data } from "effect";

// Define typed errors
class ValidationError extends Data.TaggedError("ValidationError")<{
  readonly message: string;
}> {}

class NetworkError extends Data.TaggedError("NetworkError")<{
  readonly message: string;
}> {}

// Workflow with selective retry
const processPaymentWorkflow = Workflow.make((paymentId: string) =>
  Effect.gen(function* () {
    yield* Workflow.step("Process payment",
      processPayment(paymentId).pipe(
        // Catch validation errors - don't retry, fail immediately
        Effect.catchTag("ValidationError", (err) =>
          Effect.fail(new PaymentFailed({ reason: err.message }))
        ),
        // Network errors bubble up for retry
        Workflow.retry({
          maxAttempts: 5,
          delay: Backoff.presets.standard(),
        })
      )
    );
  })
);

Timeouts

The Workflow.timeout() operator sets a deadline for step execution. The deadline persists across workflow restarts.

yield* Workflow.step("External API",
  callExternalAPI().pipe(
    Workflow.timeout("30 seconds")
  )
);

Timeout with Retry

When combining timeout and retry, the timeout applies to each attempt individually:

yield* Workflow.step("API call",
  callAPI().pipe(
    Workflow.timeout("30 seconds"),  // Each attempt has 30 seconds
    Workflow.retry({ maxAttempts: 3 })
  )
);
// Total max time: 3 attempts * 30 seconds = 90 seconds (plus delays)

Duration Formats

Both timeout and sleep accept string or number formats:

Workflow.timeout("30 seconds")
Workflow.timeout("5 minutes")
Workflow.timeout("2 hours")
Workflow.timeout(5000)  // milliseconds

Providing Services

Workflows support Effect's service pattern for dependency injection. Provide services at the end of your workflow using .pipe().

Basic Service Provision

import { Effect, Context, Layer } from "effect";

// Define a service
class EmailService extends Context.Tag("EmailService")<
  EmailService,
  {
    readonly send: (to: string, body: string) => Effect.Effect<void>;
  }
>() {}

// Create a layer
const EmailServiceLive = Layer.succeed(EmailService, {
  send: (to, body) => Effect.promise(() => sendEmailViaAPI(to, body)),
});

// Workflow using the service
const notificationWorkflow = Workflow.make((userId: string) =>
  Effect.gen(function* () {
    const user = yield* Workflow.step("Fetch user", fetchUser(userId));

    const emailService = yield* EmailService;
    yield* Workflow.step("Send email",
      emailService.send(user.email, "Hello!").pipe(Effect.asVoid)
    );
  }).pipe(
    Effect.provide(EmailServiceLive)
  )
);

Multiple Services

const MyServices = Layer.mergeAll(
  EmailServiceLive,
  DatabaseServiceLive,
  LoggingServiceLive
);

const complexWorkflow = Workflow.make((input: Input) =>
  Effect.gen(function* () {
    // ... workflow logic using services
  }).pipe(
    Effect.provide(MyServices)
  )
);

Error Types

The library exports typed errors for proper error handling:

import {
  WorkflowClientError,      // Client operation failed
  StepCancelledError,       // Step was cancelled
  RetryExhaustedError,      // All retries exhausted
  WorkflowTimeoutError,     // Step exceeded timeout
  StorageError,             // Durable Object storage error
  OrchestratorError,        // Orchestration error
  WorkflowScopeError,       // Operation used outside workflow
  StepScopeError,           // Sleep/sleepUntil used inside step
} from "@durable-effect/workflow";

Recovery

Workflows automatically recover from infrastructure failures. If a workflow is in "Running" state when the Durable Object restarts, it will automatically schedule recovery.

Configuration

export const { Workflows, WorkflowClient } = createDurableWorkflows(workflows, {
  recovery: {
    staleThresholdMs: 30000,     // Consider stale after 30s (default)
    maxRecoveryAttempts: 3,       // Max recovery attempts (default: 3)
    recoveryDelayMs: 1000,        // Delay before recovery (default: 1000)
  },
});

Automatic Data Purging

By default, workflow data (state, step results, metadata) persists in Durable Object storage indefinitely. For high-volume workflows, this can lead to storage bloat. Enable automatic purging to delete workflow data after completion.

Configuration

export const { Workflows, WorkflowClient } = createDurableWorkflows(workflows, {
  purge: {
    delay: "5 minutes",  // Delete data 5 minutes after terminal state
  },
});

When enabled, workflow data is automatically purged after the workflow reaches a terminal state (completed, failed, or cancelled). The delay gives you time to query final status before cleanup.

Delay Formats

The delay option accepts Effect duration strings or milliseconds:

// String formats
purge: { delay: "30 seconds" }
purge: { delay: "5 minutes" }
purge: { delay: "1 hour" }
purge: { delay: "1 day" }

// Milliseconds
purge: { delay: 60000 }

What Gets Purged

When purge executes, all Durable Object storage for that workflow instance is deleted:

  • Workflow state and status
  • Step results and metadata
  • Recovery tracking data
  • Any custom metadata stored via getMeta()

Disabling Purge

Omit the purge option to retain data indefinitely (default behavior):

// No purge - data retained forever
export const { Workflows, WorkflowClient } = createDurableWorkflows(workflows);

Logs

When a purge executes, it logs:

[Workflow] Purged data for {instanceId} ({reason})

Where reason is the terminal state that triggered the purge (completed, failed, or cancelled).


License

MIT


Built by Matthew Sessions at Backpine Labs