npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

@kylebegeman/pulse

v0.5.0

Published

Universal workflow engine — signal-driven automation for multi-tenant applications

Readme


Overview

Pulse is a TypeScript library that powers event-driven workflow automation. Your application emits signals (triggers), and the engine matches them to registered workflows, scheduling and executing steps in sequence with full persistence, observability, and replay support.

Signal → Match → Schedule Steps → Evaluate Conditions → Execute Actions → Record Results

Built for multi-tenant SaaS applications. Each signal, workflow, and run is scoped to a tenant. The engine runs in-process alongside your application and uses your existing PostgreSQL database and Redis instance.


Table of Contents


Quick Start

import { createEngine } from '@kylebegeman/pulse'
import { Pool } from 'pg'
import Redis from 'ioredis'

// 1. Create the engine
const engine = createEngine({
  db: new Pool({ connectionString: process.env.DATABASE_URL }),
  redis: new Redis(process.env.REDIS_URL),
})

// 2. Register a trigger type with optional schema validation
engine.registerTrigger('heartbeat.missed', {
  source: 'heartbeat',
  resourceType: 'service',
})

// 3. Register an action handler
engine.registerAction('create_incident', async (ctx) => {
  const incident = await createIncident({
    service: ctx.trigger.resourceId,
    tenant: ctx.tenantId,
    reason: 'Heartbeat missed',
  })
  ctx.log('Incident created', { incidentId: incident.id })
  return { success: true, data: { incidentId: incident.id } }
}, { replaySafe: true })

// 4. Register a condition
engine.registerCondition('is_still_failing', async (ctx) => {
  const status = await checkServiceHealth(ctx.trigger.resourceId)
  return status === 'unhealthy'
})

// 5. Run migrations (creates pulse_* tables)
await engine.migrate()

// 6. Create a workflow
await engine.createWorkflow({
  tenantId: 'workspace_1',
  name: 'Incident on missed heartbeat',
  triggerType: 'heartbeat.missed',
  steps: [
    { type: 'delay', name: 'wait_5m', delayMs: 5 * 60 * 1000 },
    { type: 'condition', name: 'is_still_failing' },
    { type: 'action', name: 'create_incident' },
  ],
  config: {},
  isEnabled: true,
})

// 7. Start the engine (BullMQ workers begin processing)
await engine.start()

// 8. Emit signals from anywhere in your application
await engine.emit({
  tenantId: 'workspace_1',
  source: 'heartbeat',
  type: 'heartbeat.missed',
  resourceType: 'service',
  resourceId: 'api-server',
  payload: { lastSeenAt: new Date().toISOString() },
})

// 9. Graceful shutdown
await engine.stop()

Installation

Pulse is a private package. Install from GitHub:

# package.json
"@kylebegeman/pulse": "github:mrbagels/pulse"

# or via SSH
npm install git+ssh://[email protected]:mrbagels/pulse.git

Peer requirements:

| Dependency | Purpose | |---|---| | pg | PostgreSQL client — engine uses your connection pool | | ioredis | Redis client — used by BullMQ for job queuing |


Core Concepts

Signals (Triggers)

A signal is a structured event emitted by your application. Every signal includes a tenant, a source system, a type, and optional resource and payload data.

await engine.emit({
  tenantId: 'workspace_1',       // Required — tenant scope
  source: 'heartbeat',           // Required — originating system
  type: 'heartbeat.missed',      // Required — event type
  resourceType: 'service',       // Optional — what kind of resource
  resourceId: 'api-server',      // Optional — which specific resource
  environment: 'production',     // Optional — environment scope
  payload: { lastSeenAt: '...' } // Optional — arbitrary data
})

Triggers are registered to declare their source, expected resource type, and optional payload schema (validated with Zod):

import { z } from 'zod'

engine.registerTrigger('heartbeat.missed', {
  source: 'heartbeat',
  resourceType: 'service',
  payloadSchema: z.object({
    lastSeenAt: z.string().datetime(),
  }),
})

All emitted signals are persisted to the database for auditing and replay.


Workflows

A workflow is a named sequence of steps that runs when a matching signal is emitted. Workflows are stored in the database and scoped to a tenant.

await engine.createWorkflow({
  tenantId: 'workspace_1',
  name: 'Incident on missed heartbeat',
  triggerType: 'heartbeat.missed',        // Matches signals of this type
  environmentFilter: 'production',         // Optional — only match this environment
  resourceTypeFilter: 'service',           // Optional — only match this resource type
  steps: [
    { type: 'delay', name: 'wait_5m', delayMs: 300_000 },
    { type: 'condition', name: 'is_still_failing' },
    { type: 'action', name: 'create_incident' },
  ],
  config: { severity: 'high' },            // Passed to handlers via ctx.config
  isEnabled: true,
})

When a signal is emitted, the engine finds all enabled workflows matching the signal type (and optional environment/resource filters), creates a run for each, and begins scheduling steps.

Workflows can be enabled/disabled at runtime:

await engine.disableWorkflow('wfd_abc123')
await engine.enableWorkflow('wfd_abc123')

Steps

Each workflow contains an ordered list of steps. Steps execute sequentially — each step must complete before the next begins.

action — Execute logic

Calls a registered handler function. The handler receives a WorkflowContext and returns an ActionResult.

engine.registerAction('send_alert', async (ctx) => {
  await sendSlackMessage(ctx.config.channel, `Alert for ${ctx.trigger.resourceId}`)
  return { success: true, data: { sent: true } }
}, { replaySafe: false }) // Will be skipped during replay
// In workflow steps:
{ type: 'action', name: 'send_alert' }

Retry policy: Action steps can define a retry policy for automatic retries on failure:

{
  type: 'action',
  name: 'send_alert',
  retryPolicy: { maxAttempts: 3, backoffMs: 5000 },
  timeoutMs: 30_000,  // Optional step timeout
}

condition — Branch on logic

Evaluates a boolean. If the condition returns false, the workflow completes early by default (status: completed, not failed).

engine.registerCondition('monitor_still_failing', async (ctx) => {
  const health = await checkHealth(ctx.trigger.resourceId)
  return health.status === 'down'
})
// In workflow steps:
{ type: 'condition', name: 'monitor_still_failing' }

onFalse behavior: Control what happens when a condition returns false:

// Default: complete the workflow early
{ type: 'condition', name: 'check', onFalse: 'complete' }

// Skip the next step and continue
{ type: 'condition', name: 'check', onFalse: 'skip' }

// Skip the next N steps
{ type: 'condition', name: 'check', onFalse: 3 }

delay — Wait before continuing

Pauses execution for a duration in milliseconds. Uses BullMQ delayed jobs for reliable scheduling. Maximum delay: 30 days.

// In workflow steps:
{ type: 'delay', name: 'wait_5_minutes', delayMs: 5 * 60 * 1000 }

parallel — Execute branches concurrently

Runs multiple branches of steps simultaneously. See Parallel Steps below.


Parallel Steps

The parallel step type executes multiple branches concurrently. Each branch is an independent sequence of steps. All branches must complete before the workflow continues to the next top-level step.

await engine.createWorkflow({
  tenantId: 'workspace_1',
  name: 'Multi-channel notification',
  triggerType: 'alert.triggered',
  steps: [
    {
      type: 'parallel',
      name: 'notify_all_channels',
      branches: [
        // Branch 0: Email notification
        [
          { type: 'action', name: 'send_email' },
          { type: 'action', name: 'log_email_sent' },
        ],
        // Branch 1: Slack notification
        [
          { type: 'action', name: 'send_slack' },
        ],
        // Branch 2: SMS notification
        [
          { type: 'action', name: 'send_sms' },
        ],
      ],
    },
    // This step runs after ALL branches complete
    { type: 'action', name: 'mark_notified' },
  ],
  config: {},
  isEnabled: true,
})

Behavior:

  • Each branch executes independently and concurrently via BullMQ jobs
  • Branches can contain any step types (action, condition, delay)
  • All branches share the run's context object
  • The parallel step completes when all branches complete
  • If any branch fails, the parallel step fails and the run fails
  • A parallel step must have at least 2 branches

Branch step tracking: Each step within a branch is tracked with a branchIndex and parentStepRunId, linking it back to the parallel step that spawned it. This is visible in the step runs returned by getRunSteps().


Cron-Triggered Workflows

Workflows can be scheduled to run on a recurring basis using cron expressions. When a cron fires, the engine automatically emits a trigger of the workflow's triggerType, which matches and creates a new run.

await engine.createWorkflow({
  tenantId: 'workspace_1',
  name: 'Daily cleanup',
  triggerType: 'maintenance.cleanup',
  steps: [
    { type: 'action', name: 'cleanup_old_records' },
    { type: 'action', name: 'send_cleanup_report' },
  ],
  config: {},
  isEnabled: true,
  cronExpression: '0 2 * * *',  // Run daily at 2:00 AM
})

Behavior:

  • Cron jobs are managed via BullMQ repeatable jobs on a dedicated cron queue
  • When engine.start() is called, all enabled cron definitions are restored
  • Enabling/disabling a workflow also starts/stops its cron job
  • The cron fires a trigger with source: 'cron' and the workflow's triggerType
  • Standard cron expression format: minute hour dayOfMonth month dayOfWeek

Examples of cron expressions:

  • * * * * * — every minute
  • 0 */6 * * * — every 6 hours
  • 0 9 * * 1-5 — weekdays at 9:00 AM
  • 0 0 1 * * — first day of every month at midnight

Handler Context

Every action and condition handler receives a WorkflowContext object:

engine.registerAction('my_action', async (ctx) => {
  ctx.tenantId     // string — current tenant
  ctx.trigger      // TriggerEnvelope — the signal that started this run
  ctx.run          // WorkflowRun — current run state
  ctx.step         // WorkflowStepRun — current step state
  ctx.config       // Record<string, unknown> — workflow-level config
  ctx.isReplay     // boolean — true if this is a replay execution
  ctx.emit(...)    // Emit another signal (enables workflow chaining)
  ctx.log(...)     // Write to execution log

  return { success: true }
})

The emit function on the context lets actions trigger other workflows, enabling powerful workflow chaining patterns. See the chaining example.


Lifecycle Hooks

The engine supports lifecycle hooks for observability and integration:

const engine = createEngine({
  db: pool,
  redis: redisClient,
  onStepComplete: (event) => {
    // Called after each step completes, fails, or is skipped
    console.log(`Step ${event.step.stepName}: ${event.step.status}`)
  },
  onRunComplete: (event) => {
    // Called when a run reaches completed, failed, or canceled status
    console.log(`Run ${event.run.id}: ${event.status}`)
  },
})

Hooks are fire-and-forget — errors in hooks do not affect workflow execution.


Replay

All signals are persisted, enabling replay — re-processing a historical signal through the matching pipeline as if it were emitted again.

// Full replay — creates new runs and executes all steps
await engine.replay('trg_abc123')

// Dry run — creates runs but skips all actions (logs only)
await engine.replay('trg_abc123', { dryRun: true })

Replay safety: Each action declares whether it is safe to re-execute via the replaySafe option:

// Safe to replay — idempotent or read-only
engine.registerAction('create_incident', handler, { replaySafe: true })

// Not safe to replay — side effects like emails, charges
engine.registerAction('send_email', handler, { replaySafe: false })

During replay, actions with replaySafe: false are skipped (status: skipped). This prevents duplicate emails, charges, or other non-idempotent side effects.

Runs created by replay are marked with isReplay: true and handlers can check ctx.isReplay to adjust behavior.


Run Timeline

The run timeline provides a chronological view of everything that happened during a workflow run — useful for debugging dashboards and audit trails.

const timeline = await engine.getRunTimeline('run_abc123')

for (const entry of timeline) {
  console.log(`[${entry.timestamp}] ${entry.type}`, {
    stepName: entry.stepName,
    detail: entry.detail,
  })
}

Entry types:

  • run_created — run was created
  • step_scheduled — step was queued for execution
  • step_started — step began executing
  • step_completed — step finished successfully
  • step_failed — step failed with an error
  • step_skipped — step was skipped (condition false, replay, etc.)
  • run_completed — run finished successfully
  • run_failed — run ended due to a step failure
  • run_canceled — run was canceled (includes cancel reason in detail)
  • log — execution log entry from ctx.log() (includes level, message, data in detail)

The timeline is assembled from a single SQL query across runs, steps, and execution logs tables, ordered chronologically.


Cancel & Retry

Canceling a Running Workflow

Cancel an in-progress run to stop further step execution:

const canceledRun = await engine.cancelRun('run_abc123', 'No longer needed')

Behavior:

  • Sets the run status to canceled with a timestamp and optional reason
  • Marks all pending/scheduled step runs as skipped
  • Removes pending BullMQ jobs for that run
  • The executor checks for canceled status before executing each step
  • Fires the onRunComplete hook with status: 'canceled'
  • Cannot cancel runs that are already completed, failed, or canceled

Retrying Failed Runs

Resume a failed run from the step that failed:

const retriedRun = await engine.retryRun('run_abc123')

Behavior:

  • Finds the first failed step in the run
  • Resets that step to pending status (clears error, timestamps)
  • Sets the run back to running status
  • Re-enqueues the step in BullMQ with its original retry policy
  • Cannot retry runs that aren't in failed status

Listing Failed Runs

Query failed runs for monitoring dashboards:

// All failed runs
const allFailed = await engine.getFailedRuns()

// Failed runs for a specific tenant
const tenantFailed = await engine.getFailedRuns('workspace_1')

API Reference

createEngine(config)

Creates and returns an engine instance.

import { createEngine } from '@kylebegeman/pulse'

const engine = createEngine({
  db: pool,              // Required — pg Pool instance
  redis: redisClient,    // Required — ioredis instance
  tablePrefix: 'pulse_',    // Optional — table name prefix (default: 'pulse_')
  queuePrefix: 'pulse',     // Optional — BullMQ queue prefix (default: 'pulse')
  concurrency: 5,        // Optional — step worker concurrency (default: 5)
  onStepComplete: (e) => {},  // Optional — lifecycle hook
  onRunComplete: (e) => {},   // Optional — lifecycle hook
})

Engine Methods

Registration

| Method | Description | |---|---| | registerTrigger(type, registration) | Register a signal type with source, optional resource type, and optional Zod payload schema | | registerAction(name, handler, options?) | Register an action handler. Set replaySafe: true/false in options | | registerCondition(name, handler) | Register a condition handler that returns a boolean |

Signals

| Method | Returns | Description | |---|---|---| | emit(trigger) | Promise<TriggerEnvelope> | Emit a signal. Persists the trigger, matches workflows, and creates runs |

Lifecycle

| Method | Returns | Description | |---|---|---| | start() | Promise<void> | Start BullMQ workers and restore cron jobs. Call after registering handlers | | stop() | Promise<void> | Graceful shutdown — waits for active jobs to complete, stops cron workers |

Queries

| Method | Returns | Description | |---|---|---| | getRun(runId) | Promise<WorkflowRun \| null> | Get a workflow run by ID | | getRunSteps(runId) | Promise<WorkflowStepRun[]> | Get all step runs for a workflow run (includes branch steps) | | getRunsByTrigger(triggerId) | Promise<WorkflowRun[]> | Get all runs spawned by a trigger | | getTrigger(triggerId) | Promise<TriggerEnvelope \| null> | Get a persisted trigger by ID |

Timeline

| Method | Returns | Description | |---|---|---| | getRunTimeline(runId) | Promise<RunTimelineEntry[]> | Chronological lifecycle view of a run — steps, logs, status changes |

Cancel & Retry

| Method | Returns | Description | |---|---|---| | cancelRun(runId, reason?) | Promise<WorkflowRun> | Cancel an in-progress run. Removes pending jobs | | retryRun(runId) | Promise<WorkflowRun> | Retry a failed run from the failed step | | getFailedRuns(tenantId?) | Promise<WorkflowRun[]> | List failed runs, optionally filtered by tenant |

Replay

| Method | Returns | Description | |---|---|---| | replay(triggerId, options?) | Promise<WorkflowRun[]> | Re-emit a historical trigger. Pass { dryRun: true } to skip actions |

Workflow Management

| Method | Returns | Description | |---|---|---| | createWorkflow(definition) | Promise<WorkflowDefinition> | Create a new workflow definition. Schedules cron if applicable | | enableWorkflow(definitionId) | Promise<void> | Enable a workflow definition. Starts cron if defined | | disableWorkflow(definitionId) | Promise<void> | Disable a workflow definition. Stops cron if defined |

Schema

| Method | Returns | Description | |---|---|---| | migrate() | Promise<void> | Create pulse_* tables in your database (idempotent) |


Type Reference

interface TriggerInput {
  tenantId: string
  source: string
  type: string
  resourceType?: string
  resourceId?: string
  environment?: string
  payload?: Record<string, unknown>
}
interface TriggerEnvelope extends TriggerInput {
  id: string          // Auto-generated (e.g., 'trg_...')
  createdAt: Date
}
interface TriggerRegistration {
  source: string
  resourceType?: string
  payloadSchema?: ZodSchema    // Optional Zod schema for payload validation
}
interface WorkflowDefinition {
  id: string
  tenantId: string
  name: string
  description?: string
  triggerType: string
  environmentFilter?: string
  resourceTypeFilter?: string
  steps: WorkflowStep[]
  config: Record<string, unknown>
  isEnabled: boolean
  cronExpression?: string       // Cron schedule (e.g., '0 2 * * *')
  createdAt: Date
  updatedAt: Date
}
type StepType = 'action' | 'condition' | 'delay' | 'parallel'

interface WorkflowStep {
  type: StepType
  name: string
  config?: Record<string, unknown>
  delayMs?: number                      // Only for 'delay' steps
  timeoutMs?: number                    // Step execution timeout
  retryPolicy?: RetryPolicy             // Retry on failure
  onFalse?: 'complete' | 'skip' | number  // Condition false behavior
  branches?: WorkflowStep[][]           // Only for 'parallel' steps
}
type WorkflowStatus = 'pending' | 'running' | 'waiting' | 'completed' | 'failed' | 'canceled'

interface WorkflowRun {
  id: string
  definitionId: string
  tenantId: string
  triggerId: string
  status: WorkflowStatus
  context: Record<string, unknown>
  currentStepIndex: number
  isReplay: boolean
  definitionSnapshot: DefinitionSnapshot
  startedAt?: Date
  completedAt?: Date
  failedAt?: Date
  canceledAt?: Date               // When the run was canceled
  cancelReason?: string           // Why the run was canceled
  createdAt: Date
  updatedAt: Date
}
type StepStatus = 'pending' | 'scheduled' | 'running' | 'completed' | 'failed' | 'skipped'

interface WorkflowStepRun {
  id: string
  runId: string
  tenantId: string
  stepIndex: number
  stepType: StepType
  stepName: string
  status: StepStatus
  scheduledFor?: Date
  startedAt?: Date
  completedAt?: Date
  result?: Record<string, unknown>
  errorMessage?: string
  branchIndex?: number            // Branch index for parallel step branches
  parentStepRunId?: string        // Parent step run ID for branch steps
  createdAt: Date
}
interface RunTimelineEntry {
  timestamp: Date
  type:
    | 'run_created' | 'run_completed' | 'run_failed' | 'run_canceled'
    | 'step_scheduled' | 'step_started' | 'step_completed'
    | 'step_failed' | 'step_skipped' | 'log'
  stepIndex?: number
  stepName?: string
  stepType?: string
  detail?: Record<string, unknown>
}
interface DefinitionSnapshot {
  steps: WorkflowStep[]
  config: Record<string, unknown>
  triggerType: string
}
interface RetryPolicy {
  maxAttempts: number    // 1-10
  backoffMs: number      // 100-300000 ms
}
interface WorkflowContext {
  tenantId: string
  trigger: TriggerEnvelope
  run: WorkflowRun
  step: WorkflowStepRun
  config: Record<string, unknown>
  isReplay: boolean
  emit: (trigger: TriggerInput) => Promise<TriggerEnvelope>
  log: (message: string, data?: Record<string, unknown>) => void
}
interface ActionResult {
  success: boolean
  data?: Record<string, unknown>
  error?: string
}
interface ActionOptions {
  replaySafe: boolean    // Whether this action can be re-executed during replay
}
interface EngineConfig {
  db: Pool                    // pg Pool instance
  redis: unknown              // ioredis instance
  tablePrefix?: string        // Default: 'pulse_'
  queuePrefix?: string        // Default: 'pulse'
  concurrency?: number        // Step worker concurrency (default: 5)
  onStepComplete?: LifecycleHook<StepCompleteEvent>
  onRunComplete?: LifecycleHook<RunCompleteEvent>
}
interface StepCompleteEvent {
  run: WorkflowRun
  step: WorkflowStepRun
}

interface RunCompleteEvent {
  run: WorkflowRun
  status: 'completed' | 'failed' | 'canceled'
}
interface ReplayOptions {
  dryRun?: boolean    // If true, skip all actions (log only)
}
interface ExecutionLog {
  id: string
  runId: string
  stepRunId?: string
  tenantId: string
  level: 'info' | 'warn' | 'error'
  message: string
  data?: Record<string, unknown>
  createdAt: Date
}

Database Schema

The engine creates tables in your PostgreSQL database with the prefix pulse_ (configurable). All tables use TEXT primary keys with auto-generated IDs. Migrations are idempotent — safe to call on every startup.

await engine.migrate()

pulse_triggers

Persisted signal records for auditing and replay.

| Column | Type | Description | |---|---|---| | id | TEXT PK | Trigger ID (e.g., trg_...) | | tenant_id | TEXT | Tenant scope | | source | TEXT | Originating system | | type | TEXT | Signal type | | resource_type | TEXT | Resource category | | resource_id | TEXT | Specific resource | | environment | TEXT | Environment scope | | payload | JSONB | Arbitrary event data | | created_at | TIMESTAMPTZ | When the signal was emitted |

Indexes: (tenant_id, type), (created_at)

pulse_workflow_definitions

Workflow templates that define step sequences and matching rules.

| Column | Type | Description | |---|---|---| | id | TEXT PK | Definition ID | | tenant_id | TEXT | Tenant scope | | name | TEXT | Human-readable name | | description | TEXT | Optional description | | trigger_type | TEXT | Signal type to match | | environment_filter | TEXT | Optional environment filter | | resource_type_filter | TEXT | Optional resource type filter | | steps | JSONB | Ordered step definitions | | config | JSONB | Config passed to handlers | | is_enabled | BOOLEAN | Whether the workflow is active | | cron_expression | TEXT | Optional cron schedule for automatic trigger emission | | created_at | TIMESTAMPTZ | Creation time | | updated_at | TIMESTAMPTZ | Last update time |

Indexes: (tenant_id, trigger_type, is_enabled)

pulse_workflow_runs

Execution records — one per workflow triggered by a signal.

| Column | Type | Description | |---|---|---| | id | TEXT PK | Run ID | | definition_id | TEXT FK | Workflow definition | | tenant_id | TEXT | Tenant scope | | trigger_id | TEXT FK | Signal that started this run | | status | TEXT | pending / running / waiting / completed / failed / canceled | | context | JSONB | Shared run context | | current_step_index | INTEGER | Active step position | | is_replay | BOOLEAN | Whether this is a replay run | | definition_snapshot | JSONB | Frozen copy of workflow definition at run creation | | started_at | TIMESTAMPTZ | When execution began | | completed_at | TIMESTAMPTZ | When execution completed | | failed_at | TIMESTAMPTZ | When execution failed | | canceled_at | TIMESTAMPTZ | When the run was canceled | | cancel_reason | TEXT | Why the run was canceled | | created_at | TIMESTAMPTZ | Creation time | | updated_at | TIMESTAMPTZ | Last update time |

Indexes: (tenant_id, status), (trigger_id)

pulse_workflow_step_runs

Individual step execution records within a run.

| Column | Type | Description | |---|---|---| | id | TEXT PK | Step run ID | | run_id | TEXT FK | Parent workflow run | | tenant_id | TEXT | Tenant scope | | step_index | INTEGER | Position in step sequence | | step_type | TEXT | action / condition / delay / parallel | | step_name | TEXT | Registered handler name | | status | TEXT | pending / scheduled / running / completed / failed / skipped | | scheduled_for | TIMESTAMPTZ | When the step should execute (for delays) | | started_at | TIMESTAMPTZ | When execution began | | completed_at | TIMESTAMPTZ | When execution completed | | result | JSONB | Action result data | | error_message | TEXT | Error details on failure | | branch_index | INTEGER | Branch index within a parallel step (null for top-level) | | parent_step_run_id | TEXT | Parent parallel step run ID (null for top-level) | | created_at | TIMESTAMPTZ | Creation time |

Indexes: (run_id, step_index), (status, scheduled_for) WHERE status = 'scheduled'

pulse_execution_logs

Structured execution logs written by ctx.log() calls.

| Column | Type | Description | |---|---|---| | id | TEXT PK | Log entry ID | | run_id | TEXT FK | Parent workflow run | | step_run_id | TEXT FK | Associated step (optional) | | tenant_id | TEXT | Tenant scope | | level | TEXT | info / warn / error | | message | TEXT | Log message | | data | JSONB | Structured log data | | created_at | TIMESTAMPTZ | When the log was written |

Indexes: (run_id, created_at)


Runtime Guarantees

Execution Semantics

Pulse provides at-least-once execution semantics. If a worker crashes after an action completes but before the engine records success, the step may execute again on retry. Design your actions to be idempotent where possible — especially for side effects like sending emails, charging customers, or creating external resources.

You can derive idempotency keys from the workflow context passed to every action:

engine.registerAction('send_email', async (ctx) => {
  const idempotencyKey = `${ctx.run.id}:${ctx.step.name}`
  // Use this key with your email provider to prevent duplicates
})

Step Claiming

Every step is claimed atomically before execution using a compare-and-set query:

UPDATE step_runs SET status = 'running', started_at = NOW()
WHERE id = $1 AND status IN ('pending', 'scheduled')
RETURNING *

If two workers pick up the same job from the queue (e.g., due to retry or crash recovery), only one will successfully claim the step. The other receives a null result and silently no-ops.

Run State Transitions

Run status changes are guarded by the current status. Invalid transitions (e.g., completing an already-canceled run) are rejected:

pending  → running, canceled
running  → waiting, completed, failed, canceled
waiting  → running, canceled
failed   → running (retry)

Attempts to transition from an unexpected state throw an error rather than silently corrupting run state.

Cancellation

When a run is canceled:

  1. The run status is set to canceled with a timestamp and optional reason.
  2. All pending and scheduled step runs are marked as skipped.
  3. A step that is already running will complete its current execution — the engine checks run status before scheduling the next step.

Retry

Retrying a failed run:

  1. Finds the first failed step run and resets it to pending.
  2. Sets the run status back to running.
  3. The step is re-enqueued and executes with the original retry policy.
  4. Execution continues from the failed step — previously completed steps are not re-run.

Replay

Replay re-emits a historical trigger to create a new run:

  • A new run is created with isReplay: true.
  • Actions flagged with replaySafe: false are skipped during replay execution.
  • All other steps execute normally against the replayed trigger data.
  • Replay is useful for debugging, backfilling, and testing new workflows against historical signals.

Parallel Execution

Parallel steps fan out into concurrent branches:

  • Each branch executes independently with its own step runs.
  • All branches must complete for the parallel step to succeed.
  • If any branch fails, pending sibling branch steps are canceled and the run is marked as failed.
  • Context from each branch is merged — each step writes to context[actionName], so branch steps should have unique action names.

Context Accumulation

Each action's return value is stored in the run context keyed by action name:

// After "check_status" action returns { healthy: true }
// context.check_status === { healthy: true }

// Next action can access it:
engine.registerAction('notify', async (ctx) => {
  if (ctx.context.check_status.healthy) { /* ... */ }
})

Step names within a workflow must be unique, preventing key collisions.


Architecture

┌─────────────────────────────────────────────────────────────────┐
│                        Your Application                         │
│                                                                 │
│   engine.emit({ type: 'heartbeat.missed', ... })                │
│       │                                                         │
│       ▼                                                         │
│   ┌──────────┐     ┌───────────┐     ┌───────────────────────┐  │
│   │ Registry │     │  Matcher  │────▶│   Run Manager         │  │
│   │          │     │           │     │   (state machine)     │  │
│   │ triggers │     │ finds     │     │                       │  │
│   │ actions  │     │ matching  │     │ pending → running     │  │
│   │ conditions│    │ workflows │     │ → waiting → completed │  │
│   └──────────┘     └───────────┘     └───────┬───────────────┘  │
│                                              │                  │
│                                              ▼                  │
│   ┌──────────────────────┐     ┌─────────────────────────────┐  │
│   │   Step Scheduler     │     │     Step Executor           │  │
│   │                      │────▶│                             │  │
│   │ BullMQ delayed jobs  │     │ delay → condition → action  │  │
│   │ + parallel branches  │     │ + parallel branch dispatch  │  │
│   └──────────────────────┘     └─────────────────────────────┘  │
│                                                                 │
│   ┌────────────────────┐  ┌─────────────────────────────────┐   │
│   │  Replay Manager    │  │       Execution Logs            │   │
│   │                    │  │                                 │   │
│   │ re-emit historical │  │  structured logs per run/step   │   │
│   │ signals with safety│  │  ctx.log() → pulse_execution_logs│   │
│   └────────────────────┘  └─────────────────────────────────┘   │
│                                                                 │
│   ┌────────────────────┐  ┌─────────────────────────────────┐   │
│   │   Cron Manager     │  │       Run Timeline              │   │
│   │                    │  │                                 │   │
│   │ BullMQ repeatable  │  │  chronological event view       │   │
│   │ jobs → auto-emit   │  │  for debugging & dashboards     │   │
│   └────────────────────┘  └─────────────────────────────────┘   │
│                                                                 │
│   ┌──────────────┐        ┌────────────────┐                    │
│   │  PostgreSQL  │        │     Redis      │                    │
│   │  (your DB)   │        │  (BullMQ jobs) │                    │
│   └──────────────┘        └────────────────┘                    │
└─────────────────────────────────────────────────────────────────┘

Module Breakdown

| Module | File | Purpose | |---|---|---| | Engine Factory | src/index.ts | createEngine() — wires everything together | | Types | src/types.ts | All TypeScript interfaces and type definitions | | Registry | src/registry.ts | Trigger, action, and condition registration | | Matcher | src/matcher.ts | Signal-to-workflow matching, run creation, and validation | | Run Manager | src/runs.ts | Workflow run lifecycle, state machine, timeline, cancel | | Scheduler | src/scheduler.ts | Step scheduling with BullMQ delayed jobs + parallel branches | | Executor | src/executor.ts | Step dispatch — delay, condition, action, parallel | | Cron Manager | src/cron.ts | Cron-triggered workflow scheduling via BullMQ repeatable jobs | | Replay | src/replay.ts | Trigger persistence and replay support | | Queue | src/queue.ts | BullMQ queue and worker setup (match, step, cron queues) | | Logs | src/logs.ts | Execution logging and query helpers | | Schema | src/schema/ | Table definitions and migration runner |


Examples

Monitoring: Incident on Missed Heartbeat

A service stops sending heartbeats. Wait 5 minutes, check if it's still down, then create an incident.

// Register handlers
engine.registerTrigger('heartbeat.missed', {
  source: 'heartbeat',
  resourceType: 'service',
})

engine.registerCondition('service_still_down', async (ctx) => {
  const status = await healthCheck(ctx.trigger.resourceId)
  return status !== 'healthy'
})

engine.registerAction('create_incident', async (ctx) => {
  const incident = await db.incidents.create({
    tenantId: ctx.tenantId,
    serviceId: ctx.trigger.resourceId,
    severity: ctx.config.severity || 'medium',
    title: `Service ${ctx.trigger.resourceId} is unresponsive`,
  })
  ctx.log('Incident created', { incidentId: incident.id })
  return { success: true, data: { incidentId: incident.id } }
}, { replaySafe: true })

engine.registerAction('notify_oncall', async (ctx) => {
  await pagerduty.trigger({
    service: ctx.trigger.resourceId,
    severity: ctx.config.severity,
  })
  return { success: true }
}, { replaySafe: false }) // Don't re-page during replay

// Create the workflow
await engine.createWorkflow({
  tenantId: 'workspace_1',
  name: 'Incident on missed heartbeat',
  triggerType: 'heartbeat.missed',
  environmentFilter: 'production',
  steps: [
    { type: 'delay', name: 'wait_5m', delayMs: 5 * 60 * 1000 },
    { type: 'condition', name: 'service_still_down' },
    { type: 'action', name: 'create_incident' },
    { type: 'action', name: 'notify_oncall' },
  ],
  config: { severity: 'high' },
  isEnabled: true,
})

Billing: Trial Expiration Workflow

A trial is about to expire. Send a warning email, wait, then downgrade if they haven't upgraded.

engine.registerTrigger('trial.expiring', {
  source: 'billing',
  resourceType: 'subscription',
})

engine.registerAction('send_trial_warning', async (ctx) => {
  await emails.send({
    to: ctx.trigger.payload.email,
    template: 'trial-expiring',
    data: { daysLeft: ctx.trigger.payload.daysLeft },
  })
  return { success: true }
}, { replaySafe: false })

engine.registerCondition('has_not_upgraded', async (ctx) => {
  const sub = await billing.getSubscription(ctx.tenantId)
  return sub.plan === 'trial'
})

engine.registerAction('downgrade_to_free', async (ctx) => {
  await billing.changePlan(ctx.tenantId, 'free')
  ctx.log('Downgraded to free plan')
  return { success: true }
}, { replaySafe: true })

await engine.createWorkflow({
  tenantId: 'workspace_1',
  name: 'Trial expiration',
  triggerType: 'trial.expiring',
  steps: [
    { type: 'action', name: 'send_trial_warning' },
    { type: 'delay', name: 'wait_3_days', delayMs: 3 * 24 * 60 * 60 * 1000 },
    { type: 'condition', name: 'has_not_upgraded' },
    { type: 'action', name: 'downgrade_to_free' },
  ],
  config: {},
  isEnabled: true,
})

Chaining: Workflow That Triggers Another Workflow

Actions can emit signals, allowing workflows to trigger other workflows.

// First workflow: detect issue → emit resolution signal
engine.registerAction('attempt_auto_fix', async (ctx) => {
  const result = await autoRemediate(ctx.trigger.resourceId)

  if (result.fixed) {
    // Emit a new signal — this will match other workflows
    await ctx.emit({
      tenantId: ctx.tenantId,
      source: 'auto-remediation',
      type: 'service.recovered',
      resourceType: 'service',
      resourceId: ctx.trigger.resourceId,
      payload: { fix: result.action },
    })
  }

  return { success: true, data: result }
}, { replaySafe: true })

// Second workflow: triggered by the recovery signal
engine.registerAction('close_incident', async (ctx) => {
  await db.incidents.close({
    serviceId: ctx.trigger.resourceId,
    resolution: ctx.trigger.payload.fix,
  })
  return { success: true }
}, { replaySafe: true })

await engine.createWorkflow({
  tenantId: 'workspace_1',
  name: 'Auto-close on recovery',
  triggerType: 'service.recovered',
  steps: [
    { type: 'action', name: 'close_incident' },
  ],
  config: {},
  isEnabled: true,
})

Parallel: Multi-Channel Notification

Send notifications across multiple channels simultaneously, then mark the alert as notified.

engine.registerAction('send_email', async (ctx) => {
  await emails.send({ to: ctx.config.alertEmail, subject: 'Alert' })
  return { success: true }
}, { replaySafe: false })

engine.registerAction('send_slack', async (ctx) => {
  await slack.post({ channel: ctx.config.slackChannel, text: 'Alert triggered' })
  return { success: true }
}, { replaySafe: false })

engine.registerAction('send_sms', async (ctx) => {
  await sms.send({ to: ctx.config.phoneNumber, body: 'Alert triggered' })
  return { success: true }
}, { replaySafe: false })

engine.registerAction('mark_notified', async (ctx) => {
  await db.alerts.update(ctx.trigger.resourceId, { notified: true })
  return { success: true }
}, { replaySafe: true })

await engine.createWorkflow({
  tenantId: 'workspace_1',
  name: 'Multi-channel alert',
  triggerType: 'alert.triggered',
  steps: [
    {
      type: 'parallel',
      name: 'notify_all',
      branches: [
        [{ type: 'action', name: 'send_email' }],
        [{ type: 'action', name: 'send_slack' }],
        [{ type: 'action', name: 'send_sms' }],
      ],
    },
    { type: 'action', name: 'mark_notified' },
  ],
  config: {
    alertEmail: '[email protected]',
    slackChannel: '#alerts',
    phoneNumber: '+1234567890',
  },
  isEnabled: true,
})

Cron: Daily Cleanup Job

Run a cleanup workflow on a schedule without needing an external signal.

engine.registerTrigger('maintenance.cleanup', {
  source: 'cron',
})

engine.registerAction('cleanup_old_records', async (ctx) => {
  const deleted = await db.query(
    'DELETE FROM temp_data WHERE created_at < NOW() - INTERVAL \'30 days\' RETURNING id'
  )
  ctx.log('Cleaned up old records', { count: deleted.rowCount })
  return { success: true, data: { deletedCount: deleted.rowCount } }
}, { replaySafe: true })

engine.registerAction('send_cleanup_report', async (ctx) => {
  await emails.send({
    to: '[email protected]',
    subject: 'Daily cleanup report',
    body: `Deleted ${ctx.run.context.deletedCount ?? 0} old records`,
  })
  return { success: true }
}, { replaySafe: false })

await engine.createWorkflow({
  tenantId: 'workspace_1',
  name: 'Daily cleanup',
  triggerType: 'maintenance.cleanup',
  steps: [
    { type: 'action', name: 'cleanup_old_records' },
    { type: 'action', name: 'send_cleanup_report' },
  ],
  config: {},
  isEnabled: true,
  cronExpression: '0 2 * * *',  // Every day at 2 AM
})

Observability: Run Timeline Dashboard

Use the timeline API to build a debugging view of what happened during a run.

const timeline = await engine.getRunTimeline('run_abc123')

// Display in a dashboard
for (const entry of timeline) {
  const time = entry.timestamp.toISOString()

  switch (entry.type) {
    case 'run_created':
      console.log(`${time} Run started`)
      break
    case 'step_started':
      console.log(`${time} Step "${entry.stepName}" (${entry.stepType}) started`)
      break
    case 'step_completed':
      console.log(`${time} Step "${entry.stepName}" completed`, entry.detail)
      break
    case 'step_failed':
      console.log(`${time} Step "${entry.stepName}" FAILED:`, entry.detail?.error)
      break
    case 'log':
      console.log(`${time} [${entry.detail?.level}] ${entry.detail?.message}`)
      break
    case 'run_completed':
      console.log(`${time} Run completed successfully`)
      break
    case 'run_canceled':
      console.log(`${time} Run canceled: ${entry.detail?.reason}`)
      break
  }
}

Error Recovery: Retry Failed Runs

Monitor for failures and automatically or manually retry.

// List all failed runs for a tenant
const failed = await engine.getFailedRuns('workspace_1')

for (const run of failed) {
  console.log(`Run ${run.id} failed at ${run.failedAt}`)

  // Retry the run — resumes from the failed step
  try {
    const retried = await engine.retryRun(run.id)
    console.log(`Retried run ${retried.id}, now ${retried.status}`)
  } catch (err) {
    console.error(`Cannot retry: ${err.message}`)
  }
}

// Cancel a run that's stuck or no longer needed
await engine.cancelRun('run_xyz789', 'Superseded by newer deployment')

Requirements

| Requirement | Version | Notes | |---|---|---| | Node.js | 18+ | ESM module | | PostgreSQL | 12+ | Engine creates pulse_* tables | | Redis | 6+ | Used by BullMQ for job queuing | | TypeScript | 5.0+ | Full type definitions included |


License

MIT