@brandboostinggmbh/observable-workflows
v0.22.1
Published
My awesome typescript library
Keywords
Readme
Observable Workflows 
A powerful TypeScript library for creating observable, durable workflows with step-by-step tracking, comprehensive logging, and database persistence. Built specifically for Cloudflare Workers and D1 database environments.
Features
- 🔍 Observable Execution - Track every step of your workflow with detailed logging
- 💾 Durable Persistence - Automatic state persistence using Cloudflare D1 database
- 🔄 Retry Mechanisms - Built-in workflow retry with step result reuse
- 📊 Multi-tenancy Support - Isolate workflows by tenant for SaaS applications
- ⚡ Queue Integration - Support for queue-based workflow processing
- 🏷️ Property Management - Attach and query custom properties on workflows
- 🔎 Advanced Filtering - Filter workflows by status, properties, date ranges
- 📝 Comprehensive Logging - Structured logging with multiple levels (info, warn, error)
- 🔧 TypeScript First - Full type safety and excellent developer experience
Installation
npm install @brandboostinggmbh/observable-workflowsQuick Start
1. Define a Workflow
import { defineWorkflow } from '@brandboostinggmbh/observable-workflows'
interface EmailInput {
to: string
subject: string
body: string
}
const sendEmailWorkflow = defineWorkflow<EmailInput>(
'send-email',
async (input, { step, console }) => {
// Step 1: Validate email
await step('validate-email', async () => {
console.log('Validating email address:', input.to)
if (!input.to.includes('@')) {
throw new Error('Invalid email address')
}
return { valid: true }
})
// Step 2: Send email
const result = await step('send-email', async () => {
console.log('Sending email to:', input.to)
// Your email sending logic here - simulate async operation
await new Promise((resolve) => setTimeout(resolve, 100))
return { messageId: 'msg_123', status: 'sent' }
})
// Step 3: Log result
await step('log-result', async () => {
console.log('Email sent successfully:', result.messageId)
return { logged: true }
})
return result
},
)2. Create Workflow Context and Execute
import { createWorkflowContext } from '@brandboostinggmbh/observable-workflows'
// Create workflow context with D1 database
const workflowContext = createWorkflowContext({
D1: env.D1, // Cloudflare D1 database binding
})
// Execute the workflow
const result = await workflowContext.call({
workflow: sendEmailWorkflow,
input: {
to: '[email protected]',
subject: 'Welcome!',
body: 'Welcome to our service!',
},
workflowName: 'Welcome Email',
tenantId: 'tenant-123',
})3. Access Workflow Logs and Data
import { createLogAccessor } from '@brandboostinggmbh/observable-workflows'
const logAccessor = createLogAccessor({
D1: env.D1,
tenantId: 'tenant-123',
})
// List recent workflows
const workflows = await logAccessor.listWorkflows(10, 0)
// Get specific workflow with steps and logs
const workflow = await logAccessor.getWorkflow(instanceId)
console.log('Workflow status:', workflow.workflowStatus)
console.log('Steps:', workflow.steps)
console.log('Logs:', workflow.logs)Core Concepts
Workflows
A workflow is a series of steps that execute in sequence. Each workflow has:
- Type: A unique identifier for the workflow type
- Instance: A specific execution of a workflow
- Steps: Individual units of work within the workflow
- Properties: Custom metadata that can be attached and queried
Steps
Steps are the individual units of work within a workflow. They provide:
- Automatic retry - Failed steps can be retried
- Result caching - Successful step results are cached for retry scenarios
- Isolated logging - Each step has its own log context
- Error handling - Failed steps capture error details
Tenancy
All workflows are isolated by tenant ID, enabling multi-tenant applications:
- Each workflow execution requires a
tenantId - Queries are automatically scoped to the tenant
- Complete data isolation between tenants
Advanced Usage
Workflow with Metadata and Properties
const processOrderWorkflow = defineWorkflow(
{
workflowType: 'process-order',
metadata: { version: '1.0', category: 'ecommerce' },
},
async (input, { step, console, setWorkflowProperty, setWorkflowName }) => {
// Set dynamic workflow name
await setWorkflowName(`Order ${input.orderId}`)
// Set searchable properties
await setWorkflowProperty('orderId', input.orderId)
await setWorkflowProperty('customerId', input.customerId)
await setWorkflowProperty('amount', input.amount)
const result = await step('process-payment', async () => {
// Payment processing logic - simulate async operation
await new Promise((resolve) => setTimeout(resolve, 100))
return { transactionId: 'txn_123' }
})
await step('update-inventory', async () => {
// Inventory update logic - simulate async operation
await new Promise((resolve) => setTimeout(resolve, 100))
return { updated: true }
})
return result
},
)Retry Failed Workflows
// Retry a failed workflow, reusing successful step results
await workflowContext.retry(processOrderWorkflow, failedWorkflowId, {
reuseSuccessfulSteps: true, // Default: true
})Queue-based Workflow Processing
import { createQueueWorkflowContext } from '@brandboostinggmbh/observable-workflows'
const queueContext = createQueueWorkflowContext({
workflowContext,
queue: env.WORKFLOW_QUEUE, // Cloudflare Queue binding
})
// Queue a workflow for processing
await queueContext.queueWorkflow({
workflow: sendEmailWorkflow,
input: emailData,
workflowName: 'Queued Email',
tenantId: 'tenant-123',
})Advanced Filtering and Querying
// Filter workflows by properties and status
const filteredWorkflows = await logAccessor.listWorkflows(50, 0, {
workflowStatus: ['completed', 'failed'],
properties: {
amount: { gt: 100 }, // Amount greater than 100
customerId: { equals: 'customer-456' },
},
startTime: {
gte: Date.now() - 24 * 60 * 60 * 1000, // Last 24 hours
},
workflowType: { contains: 'order' },
})
// Get workflow properties
const properties = await logAccessor.getWorkflowProperties(instanceId)API Reference
Core Functions
defineWorkflow<T>(type, callback)
Defines a new workflow type.
Parameters:
type(string | object): Workflow type or configuration objectcallback(function): Workflow implementation function
Returns: WorkflowFunction<T>
createWorkflowContext(options)
Creates a workflow execution context.
Parameters:
options.D1(D1Database): Cloudflare D1 database instanceoptions.serializer?(Serializer): Custom serialization (optional)options.idFactory?(function): Custom ID generation (optional)
Returns: WorkflowContextInstance
createLogAccessor(options)
Creates an accessor for querying workflow data.
Parameters:
options.D1(D1Database): Cloudflare D1 database instanceoptions.tenantId(string): Tenant identifieroptions.serializer?(Serializer): Custom serialization (optional)
Returns: Log accessor with query methods
Workflow Context Methods
call(params)
Executes a workflow.
Parameters (object):
workflow(WorkflowFunction): The workflow to executeinput(T): Input data for the workflowworkflowName(string): Human-readable workflow nametenantId(string): Tenant identifierparentInstanceId?(string): Parent workflow ID (for retries)reuseSuccessfulSteps?(boolean): Whether to reuse successful steps in retries
retry(workflow, retryInstanceId, retryOptions?)
Retries a failed workflow.
Parameters:
workflow(WorkflowFunction): The workflow definitionretryInstanceId(string): ID of the workflow to retryretryOptions?(RetryWorkflowOptions): Retry configuration
Step Context
Within a workflow, the step function provides:
await step('step-name', async ({ console }) => {
console.log('Step is executing')
console.error('Something went wrong')
console.warn('Warning message')
// Simulate async operation
await new Promise((resolve) => setTimeout(resolve, 100))
// Return step result
return { success: true }
})Workflow Context
The workflow callback receives a context with:
step(function): Execute a workflow stepconsole(ConsoleWrapper): Logging interfacesetWorkflowName(function): Update workflow namesetWorkflowProperty(function): Set workflow property
Configuration
Custom Serialization
import { createWorkflowContext } from '@brandboostinggmbh/observable-workflows'
const customSerializer = {
serialize: (obj: any) => JSON.stringify(obj),
deserialize: (str: string) => JSON.parse(str),
}
const context = createWorkflowContext({
D1: env.D1,
serializer: customSerializer,
})Custom ID Generation
const customIdFactory = () => `custom_${Date.now()}_${Math.random()}`
const context = createWorkflowContext({
D1: env.D1,
idFactory: customIdFactory,
})Database Schema
The library automatically creates the following tables in your D1 database:
- WorkflowTable: Stores workflow instances
- StepTable: Stores individual step executions
- LogTable: Stores all log entries
- WorkflowProperties: Stores custom workflow properties
Tables are created automatically when first accessing the database.
Error Handling
The library provides comprehensive error handling:
- Step Failures: Captured with full error details and stack traces
- Workflow Failures: Marked with appropriate status and error information
- Retry Logic: Failed workflows can be retried with step result reuse
- Validation: Input validation and type checking
- Automatic D1 Retry: Built-in retry logic with exponential backoff for transient D1 network errors
D1 Network Error Retry
The library automatically retries D1 database operations that fail due to transient network errors (e.g., "D1_ERROR: Network connection lost"). This makes workflows more resilient to temporary network issues.
Default Retry Configuration:
- Maximum retry attempts: 3
- Initial delay: 100ms
- Maximum delay: 5000ms
- Exponential backoff multiplier: 2
- Jitter: Enabled
Custom Retry Configuration:
You can customize the retry behavior when creating the workflow context:
const workflowContext = createWorkflowContext({
D1: env.D1,
retryConfig: {
maxAttempts: 5, // Retry up to 5 times
initialDelayMs: 200, // Start with 200ms delay
maxDelayMs: 10000, // Cap delay at 10 seconds
backoffMultiplier: 3, // Triple the delay each time
useJitter: true, // Add randomization to prevent thundering herd
},
})Manual Retry for Custom Operations:
You can also use the retry utility directly for custom D1 operations:
import { retryD1Operation } from '@brandboostinggmbh/observable-workflows'
const result = await retryD1Operation(
() => db.prepare("SELECT * FROM custom_table").first(),
{ maxAttempts: 3, initialDelayMs: 100 }
)Best Practices
- Use Descriptive Step Names: Make step names descriptive for better observability
- Keep Steps Atomic: Each step should represent a single unit of work
- Handle Errors Gracefully: Use try-catch within steps for custom error handling
- Use Properties for Searching: Add relevant properties to make workflows searchable
- Monitor Performance: Use the logging data to monitor workflow performance
- Tenant Isolation: Always use consistent tenant IDs for proper data isolation
TypeScript Support
This library is built with TypeScript and provides full type safety:
interface MyInput {
userId: string
data: Record<string, any>
}
const typedWorkflow = defineWorkflow<MyInput>(
'my-workflow',
async (input, ctx) => {
// input is fully typed as MyInput
// ctx provides typed workflow context
},
)Changelog
See CHANGELOG.md for a detailed history of changes to this project.
Contributing
We welcome contributions! Please see our Contributing Guidelines for details.
License
MIT License © 2025 Tim Stepanov
