npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

@brandboostinggmbh/observable-workflows

v0.22.1

Published

My awesome typescript library

Readme

Observable Workflows npm

Build Status

A powerful TypeScript library for creating observable, durable workflows with step-by-step tracking, comprehensive logging, and database persistence. Built specifically for Cloudflare Workers and D1 database environments.

Features

  • 🔍 Observable Execution - Track every step of your workflow with detailed logging
  • 💾 Durable Persistence - Automatic state persistence using Cloudflare D1 database
  • 🔄 Retry Mechanisms - Built-in workflow retry with step result reuse
  • 📊 Multi-tenancy Support - Isolate workflows by tenant for SaaS applications
  • Queue Integration - Support for queue-based workflow processing
  • 🏷️ Property Management - Attach and query custom properties on workflows
  • 🔎 Advanced Filtering - Filter workflows by status, properties, date ranges
  • 📝 Comprehensive Logging - Structured logging with multiple levels (info, warn, error)
  • 🔧 TypeScript First - Full type safety and excellent developer experience

Installation

npm install @brandboostinggmbh/observable-workflows

Quick Start

1. Define a Workflow

import { defineWorkflow } from '@brandboostinggmbh/observable-workflows'

interface EmailInput {
  to: string
  subject: string
  body: string
}

const sendEmailWorkflow = defineWorkflow<EmailInput>(
  'send-email',
  async (input, { step, console }) => {
    // Step 1: Validate email
    await step('validate-email', async () => {
      console.log('Validating email address:', input.to)
      if (!input.to.includes('@')) {
        throw new Error('Invalid email address')
      }
      return { valid: true }
    })

    // Step 2: Send email
    const result = await step('send-email', async () => {
      console.log('Sending email to:', input.to)
      // Your email sending logic here - simulate async operation
      await new Promise((resolve) => setTimeout(resolve, 100))
      return { messageId: 'msg_123', status: 'sent' }
    })

    // Step 3: Log result
    await step('log-result', async () => {
      console.log('Email sent successfully:', result.messageId)
      return { logged: true }
    })

    return result
  },
)

2. Create Workflow Context and Execute

import { createWorkflowContext } from '@brandboostinggmbh/observable-workflows'

// Create workflow context with D1 database
const workflowContext = createWorkflowContext({
  D1: env.D1, // Cloudflare D1 database binding
})

// Execute the workflow
const result = await workflowContext.call({
  workflow: sendEmailWorkflow,
  input: {
    to: '[email protected]',
    subject: 'Welcome!',
    body: 'Welcome to our service!',
  },
  workflowName: 'Welcome Email',
  tenantId: 'tenant-123',
})

3. Access Workflow Logs and Data

import { createLogAccessor } from '@brandboostinggmbh/observable-workflows'

const logAccessor = createLogAccessor({
  D1: env.D1,
  tenantId: 'tenant-123',
})

// List recent workflows
const workflows = await logAccessor.listWorkflows(10, 0)

// Get specific workflow with steps and logs
const workflow = await logAccessor.getWorkflow(instanceId)
console.log('Workflow status:', workflow.workflowStatus)
console.log('Steps:', workflow.steps)
console.log('Logs:', workflow.logs)

Core Concepts

Workflows

A workflow is a series of steps that execute in sequence. Each workflow has:

  • Type: A unique identifier for the workflow type
  • Instance: A specific execution of a workflow
  • Steps: Individual units of work within the workflow
  • Properties: Custom metadata that can be attached and queried

Steps

Steps are the individual units of work within a workflow. They provide:

  • Automatic retry - Failed steps can be retried
  • Result caching - Successful step results are cached for retry scenarios
  • Isolated logging - Each step has its own log context
  • Error handling - Failed steps capture error details

Tenancy

All workflows are isolated by tenant ID, enabling multi-tenant applications:

  • Each workflow execution requires a tenantId
  • Queries are automatically scoped to the tenant
  • Complete data isolation between tenants

Advanced Usage

Workflow with Metadata and Properties

const processOrderWorkflow = defineWorkflow(
  {
    workflowType: 'process-order',
    metadata: { version: '1.0', category: 'ecommerce' },
  },
  async (input, { step, console, setWorkflowProperty, setWorkflowName }) => {
    // Set dynamic workflow name
    await setWorkflowName(`Order ${input.orderId}`)

    // Set searchable properties
    await setWorkflowProperty('orderId', input.orderId)
    await setWorkflowProperty('customerId', input.customerId)
    await setWorkflowProperty('amount', input.amount)

    const result = await step('process-payment', async () => {
      // Payment processing logic - simulate async operation
      await new Promise((resolve) => setTimeout(resolve, 100))
      return { transactionId: 'txn_123' }
    })

    await step('update-inventory', async () => {
      // Inventory update logic - simulate async operation
      await new Promise((resolve) => setTimeout(resolve, 100))
      return { updated: true }
    })

    return result
  },
)

Retry Failed Workflows

// Retry a failed workflow, reusing successful step results
await workflowContext.retry(processOrderWorkflow, failedWorkflowId, {
  reuseSuccessfulSteps: true, // Default: true
})

Queue-based Workflow Processing

import { createQueueWorkflowContext } from '@brandboostinggmbh/observable-workflows'

const queueContext = createQueueWorkflowContext({
  workflowContext,
  queue: env.WORKFLOW_QUEUE, // Cloudflare Queue binding
})

// Queue a workflow for processing
await queueContext.queueWorkflow({
  workflow: sendEmailWorkflow,
  input: emailData,
  workflowName: 'Queued Email',
  tenantId: 'tenant-123',
})

Advanced Filtering and Querying

// Filter workflows by properties and status
const filteredWorkflows = await logAccessor.listWorkflows(50, 0, {
  workflowStatus: ['completed', 'failed'],
  properties: {
    amount: { gt: 100 }, // Amount greater than 100
    customerId: { equals: 'customer-456' },
  },
  startTime: {
    gte: Date.now() - 24 * 60 * 60 * 1000, // Last 24 hours
  },
  workflowType: { contains: 'order' },
})

// Get workflow properties
const properties = await logAccessor.getWorkflowProperties(instanceId)

API Reference

Core Functions

defineWorkflow<T>(type, callback)

Defines a new workflow type.

Parameters:

  • type (string | object): Workflow type or configuration object
  • callback (function): Workflow implementation function

Returns: WorkflowFunction<T>

createWorkflowContext(options)

Creates a workflow execution context.

Parameters:

  • options.D1 (D1Database): Cloudflare D1 database instance
  • options.serializer? (Serializer): Custom serialization (optional)
  • options.idFactory? (function): Custom ID generation (optional)

Returns: WorkflowContextInstance

createLogAccessor(options)

Creates an accessor for querying workflow data.

Parameters:

  • options.D1 (D1Database): Cloudflare D1 database instance
  • options.tenantId (string): Tenant identifier
  • options.serializer? (Serializer): Custom serialization (optional)

Returns: Log accessor with query methods

Workflow Context Methods

call(params)

Executes a workflow.

Parameters (object):

  • workflow (WorkflowFunction): The workflow to execute
  • input (T): Input data for the workflow
  • workflowName (string): Human-readable workflow name
  • tenantId (string): Tenant identifier
  • parentInstanceId? (string): Parent workflow ID (for retries)
  • reuseSuccessfulSteps? (boolean): Whether to reuse successful steps in retries

retry(workflow, retryInstanceId, retryOptions?)

Retries a failed workflow.

Parameters:

  • workflow (WorkflowFunction): The workflow definition
  • retryInstanceId (string): ID of the workflow to retry
  • retryOptions? (RetryWorkflowOptions): Retry configuration

Step Context

Within a workflow, the step function provides:

await step('step-name', async ({ console }) => {
  console.log('Step is executing')
  console.error('Something went wrong')
  console.warn('Warning message')
  // Simulate async operation
  await new Promise((resolve) => setTimeout(resolve, 100))
  // Return step result
  return { success: true }
})

Workflow Context

The workflow callback receives a context with:

  • step (function): Execute a workflow step
  • console (ConsoleWrapper): Logging interface
  • setWorkflowName (function): Update workflow name
  • setWorkflowProperty (function): Set workflow property

Configuration

Custom Serialization

import { createWorkflowContext } from '@brandboostinggmbh/observable-workflows'

const customSerializer = {
  serialize: (obj: any) => JSON.stringify(obj),
  deserialize: (str: string) => JSON.parse(str),
}

const context = createWorkflowContext({
  D1: env.D1,
  serializer: customSerializer,
})

Custom ID Generation

const customIdFactory = () => `custom_${Date.now()}_${Math.random()}`

const context = createWorkflowContext({
  D1: env.D1,
  idFactory: customIdFactory,
})

Database Schema

The library automatically creates the following tables in your D1 database:

  • WorkflowTable: Stores workflow instances
  • StepTable: Stores individual step executions
  • LogTable: Stores all log entries
  • WorkflowProperties: Stores custom workflow properties

Tables are created automatically when first accessing the database.

Error Handling

The library provides comprehensive error handling:

  • Step Failures: Captured with full error details and stack traces
  • Workflow Failures: Marked with appropriate status and error information
  • Retry Logic: Failed workflows can be retried with step result reuse
  • Validation: Input validation and type checking
  • Automatic D1 Retry: Built-in retry logic with exponential backoff for transient D1 network errors

D1 Network Error Retry

The library automatically retries D1 database operations that fail due to transient network errors (e.g., "D1_ERROR: Network connection lost"). This makes workflows more resilient to temporary network issues.

Default Retry Configuration:

  • Maximum retry attempts: 3
  • Initial delay: 100ms
  • Maximum delay: 5000ms
  • Exponential backoff multiplier: 2
  • Jitter: Enabled

Custom Retry Configuration:

You can customize the retry behavior when creating the workflow context:

const workflowContext = createWorkflowContext({
  D1: env.D1,
  retryConfig: {
    maxAttempts: 5,        // Retry up to 5 times
    initialDelayMs: 200,   // Start with 200ms delay
    maxDelayMs: 10000,     // Cap delay at 10 seconds
    backoffMultiplier: 3,  // Triple the delay each time
    useJitter: true,       // Add randomization to prevent thundering herd
  },
})

Manual Retry for Custom Operations:

You can also use the retry utility directly for custom D1 operations:

import { retryD1Operation } from '@brandboostinggmbh/observable-workflows'

const result = await retryD1Operation(
  () => db.prepare("SELECT * FROM custom_table").first(),
  { maxAttempts: 3, initialDelayMs: 100 }
)

Best Practices

  1. Use Descriptive Step Names: Make step names descriptive for better observability
  2. Keep Steps Atomic: Each step should represent a single unit of work
  3. Handle Errors Gracefully: Use try-catch within steps for custom error handling
  4. Use Properties for Searching: Add relevant properties to make workflows searchable
  5. Monitor Performance: Use the logging data to monitor workflow performance
  6. Tenant Isolation: Always use consistent tenant IDs for proper data isolation

TypeScript Support

This library is built with TypeScript and provides full type safety:

interface MyInput {
  userId: string
  data: Record<string, any>
}

const typedWorkflow = defineWorkflow<MyInput>(
  'my-workflow',
  async (input, ctx) => {
    // input is fully typed as MyInput
    // ctx provides typed workflow context
  },
)

Changelog

See CHANGELOG.md for a detailed history of changes to this project.

Contributing

We welcome contributions! Please see our Contributing Guidelines for details.

License

MIT License © 2025 Tim Stepanov