@kylebegeman/pulse
v0.5.0
Published
Universal workflow engine — signal-driven automation for multi-tenant applications
Maintainers
Readme
Overview
Pulse is a TypeScript library that powers event-driven workflow automation. Your application emits signals (triggers), and the engine matches them to registered workflows, scheduling and executing steps in sequence with full persistence, observability, and replay support.
Signal → Match → Schedule Steps → Evaluate Conditions → Execute Actions → Record ResultsBuilt for multi-tenant SaaS applications. Each signal, workflow, and run is scoped to a tenant. The engine runs in-process alongside your application and uses your existing PostgreSQL database and Redis instance.
Table of Contents
- Quick Start
- Installation
- Core Concepts
- API Reference
- Type Reference
- Database Schema
- Runtime Guarantees
- Architecture
- Examples
- Requirements
- License
Quick Start
import { createEngine } from '@kylebegeman/pulse'
import { Pool } from 'pg'
import Redis from 'ioredis'
// 1. Create the engine
const engine = createEngine({
db: new Pool({ connectionString: process.env.DATABASE_URL }),
redis: new Redis(process.env.REDIS_URL),
})
// 2. Register a trigger type with optional schema validation
engine.registerTrigger('heartbeat.missed', {
source: 'heartbeat',
resourceType: 'service',
})
// 3. Register an action handler
engine.registerAction('create_incident', async (ctx) => {
const incident = await createIncident({
service: ctx.trigger.resourceId,
tenant: ctx.tenantId,
reason: 'Heartbeat missed',
})
ctx.log('Incident created', { incidentId: incident.id })
return { success: true, data: { incidentId: incident.id } }
}, { replaySafe: true })
// 4. Register a condition
engine.registerCondition('is_still_failing', async (ctx) => {
const status = await checkServiceHealth(ctx.trigger.resourceId)
return status === 'unhealthy'
})
// 5. Run migrations (creates pulse_* tables)
await engine.migrate()
// 6. Create a workflow
await engine.createWorkflow({
tenantId: 'workspace_1',
name: 'Incident on missed heartbeat',
triggerType: 'heartbeat.missed',
steps: [
{ type: 'delay', name: 'wait_5m', delayMs: 5 * 60 * 1000 },
{ type: 'condition', name: 'is_still_failing' },
{ type: 'action', name: 'create_incident' },
],
config: {},
isEnabled: true,
})
// 7. Start the engine (BullMQ workers begin processing)
await engine.start()
// 8. Emit signals from anywhere in your application
await engine.emit({
tenantId: 'workspace_1',
source: 'heartbeat',
type: 'heartbeat.missed',
resourceType: 'service',
resourceId: 'api-server',
payload: { lastSeenAt: new Date().toISOString() },
})
// 9. Graceful shutdown
await engine.stop()Installation
Pulse is a private package. Install from GitHub:
# package.json
"@kylebegeman/pulse": "github:mrbagels/pulse"
# or via SSH
npm install git+ssh://[email protected]:mrbagels/pulse.gitPeer requirements:
| Dependency | Purpose |
|---|---|
| pg | PostgreSQL client — engine uses your connection pool |
| ioredis | Redis client — used by BullMQ for job queuing |
Core Concepts
Signals (Triggers)
A signal is a structured event emitted by your application. Every signal includes a tenant, a source system, a type, and optional resource and payload data.
await engine.emit({
tenantId: 'workspace_1', // Required — tenant scope
source: 'heartbeat', // Required — originating system
type: 'heartbeat.missed', // Required — event type
resourceType: 'service', // Optional — what kind of resource
resourceId: 'api-server', // Optional — which specific resource
environment: 'production', // Optional — environment scope
payload: { lastSeenAt: '...' } // Optional — arbitrary data
})Triggers are registered to declare their source, expected resource type, and optional payload schema (validated with Zod):
import { z } from 'zod'
engine.registerTrigger('heartbeat.missed', {
source: 'heartbeat',
resourceType: 'service',
payloadSchema: z.object({
lastSeenAt: z.string().datetime(),
}),
})All emitted signals are persisted to the database for auditing and replay.
Workflows
A workflow is a named sequence of steps that runs when a matching signal is emitted. Workflows are stored in the database and scoped to a tenant.
await engine.createWorkflow({
tenantId: 'workspace_1',
name: 'Incident on missed heartbeat',
triggerType: 'heartbeat.missed', // Matches signals of this type
environmentFilter: 'production', // Optional — only match this environment
resourceTypeFilter: 'service', // Optional — only match this resource type
steps: [
{ type: 'delay', name: 'wait_5m', delayMs: 300_000 },
{ type: 'condition', name: 'is_still_failing' },
{ type: 'action', name: 'create_incident' },
],
config: { severity: 'high' }, // Passed to handlers via ctx.config
isEnabled: true,
})When a signal is emitted, the engine finds all enabled workflows matching the signal type (and optional environment/resource filters), creates a run for each, and begins scheduling steps.
Workflows can be enabled/disabled at runtime:
await engine.disableWorkflow('wfd_abc123')
await engine.enableWorkflow('wfd_abc123')Steps
Each workflow contains an ordered list of steps. Steps execute sequentially — each step must complete before the next begins.
action — Execute logic
Calls a registered handler function. The handler receives a WorkflowContext and returns an ActionResult.
engine.registerAction('send_alert', async (ctx) => {
await sendSlackMessage(ctx.config.channel, `Alert for ${ctx.trigger.resourceId}`)
return { success: true, data: { sent: true } }
}, { replaySafe: false }) // Will be skipped during replay// In workflow steps:
{ type: 'action', name: 'send_alert' }Retry policy: Action steps can define a retry policy for automatic retries on failure:
{
type: 'action',
name: 'send_alert',
retryPolicy: { maxAttempts: 3, backoffMs: 5000 },
timeoutMs: 30_000, // Optional step timeout
}condition — Branch on logic
Evaluates a boolean. If the condition returns false, the workflow completes early by default (status: completed, not failed).
engine.registerCondition('monitor_still_failing', async (ctx) => {
const health = await checkHealth(ctx.trigger.resourceId)
return health.status === 'down'
})// In workflow steps:
{ type: 'condition', name: 'monitor_still_failing' }onFalse behavior: Control what happens when a condition returns false:
// Default: complete the workflow early
{ type: 'condition', name: 'check', onFalse: 'complete' }
// Skip the next step and continue
{ type: 'condition', name: 'check', onFalse: 'skip' }
// Skip the next N steps
{ type: 'condition', name: 'check', onFalse: 3 }delay — Wait before continuing
Pauses execution for a duration in milliseconds. Uses BullMQ delayed jobs for reliable scheduling. Maximum delay: 30 days.
// In workflow steps:
{ type: 'delay', name: 'wait_5_minutes', delayMs: 5 * 60 * 1000 }parallel — Execute branches concurrently
Runs multiple branches of steps simultaneously. See Parallel Steps below.
Parallel Steps
The parallel step type executes multiple branches concurrently. Each branch is an independent sequence of steps. All branches must complete before the workflow continues to the next top-level step.
await engine.createWorkflow({
tenantId: 'workspace_1',
name: 'Multi-channel notification',
triggerType: 'alert.triggered',
steps: [
{
type: 'parallel',
name: 'notify_all_channels',
branches: [
// Branch 0: Email notification
[
{ type: 'action', name: 'send_email' },
{ type: 'action', name: 'log_email_sent' },
],
// Branch 1: Slack notification
[
{ type: 'action', name: 'send_slack' },
],
// Branch 2: SMS notification
[
{ type: 'action', name: 'send_sms' },
],
],
},
// This step runs after ALL branches complete
{ type: 'action', name: 'mark_notified' },
],
config: {},
isEnabled: true,
})Behavior:
- Each branch executes independently and concurrently via BullMQ jobs
- Branches can contain any step types (action, condition, delay)
- All branches share the run's
contextobject - The parallel step completes when all branches complete
- If any branch fails, the parallel step fails and the run fails
- A parallel step must have at least 2 branches
Branch step tracking: Each step within a branch is tracked with a branchIndex and parentStepRunId, linking it back to the parallel step that spawned it. This is visible in the step runs returned by getRunSteps().
Cron-Triggered Workflows
Workflows can be scheduled to run on a recurring basis using cron expressions. When a cron fires, the engine automatically emits a trigger of the workflow's triggerType, which matches and creates a new run.
await engine.createWorkflow({
tenantId: 'workspace_1',
name: 'Daily cleanup',
triggerType: 'maintenance.cleanup',
steps: [
{ type: 'action', name: 'cleanup_old_records' },
{ type: 'action', name: 'send_cleanup_report' },
],
config: {},
isEnabled: true,
cronExpression: '0 2 * * *', // Run daily at 2:00 AM
})Behavior:
- Cron jobs are managed via BullMQ repeatable jobs on a dedicated cron queue
- When
engine.start()is called, all enabled cron definitions are restored - Enabling/disabling a workflow also starts/stops its cron job
- The cron fires a trigger with
source: 'cron'and the workflow'striggerType - Standard cron expression format:
minute hour dayOfMonth month dayOfWeek
Examples of cron expressions:
* * * * *— every minute0 */6 * * *— every 6 hours0 9 * * 1-5— weekdays at 9:00 AM0 0 1 * *— first day of every month at midnight
Handler Context
Every action and condition handler receives a WorkflowContext object:
engine.registerAction('my_action', async (ctx) => {
ctx.tenantId // string — current tenant
ctx.trigger // TriggerEnvelope — the signal that started this run
ctx.run // WorkflowRun — current run state
ctx.step // WorkflowStepRun — current step state
ctx.config // Record<string, unknown> — workflow-level config
ctx.isReplay // boolean — true if this is a replay execution
ctx.emit(...) // Emit another signal (enables workflow chaining)
ctx.log(...) // Write to execution log
return { success: true }
})The emit function on the context lets actions trigger other workflows, enabling powerful workflow chaining patterns. See the chaining example.
Lifecycle Hooks
The engine supports lifecycle hooks for observability and integration:
const engine = createEngine({
db: pool,
redis: redisClient,
onStepComplete: (event) => {
// Called after each step completes, fails, or is skipped
console.log(`Step ${event.step.stepName}: ${event.step.status}`)
},
onRunComplete: (event) => {
// Called when a run reaches completed, failed, or canceled status
console.log(`Run ${event.run.id}: ${event.status}`)
},
})Hooks are fire-and-forget — errors in hooks do not affect workflow execution.
Replay
All signals are persisted, enabling replay — re-processing a historical signal through the matching pipeline as if it were emitted again.
// Full replay — creates new runs and executes all steps
await engine.replay('trg_abc123')
// Dry run — creates runs but skips all actions (logs only)
await engine.replay('trg_abc123', { dryRun: true })Replay safety: Each action declares whether it is safe to re-execute via the replaySafe option:
// Safe to replay — idempotent or read-only
engine.registerAction('create_incident', handler, { replaySafe: true })
// Not safe to replay — side effects like emails, charges
engine.registerAction('send_email', handler, { replaySafe: false })During replay, actions with replaySafe: false are skipped (status: skipped). This prevents duplicate emails, charges, or other non-idempotent side effects.
Runs created by replay are marked with isReplay: true and handlers can check ctx.isReplay to adjust behavior.
Run Timeline
The run timeline provides a chronological view of everything that happened during a workflow run — useful for debugging dashboards and audit trails.
const timeline = await engine.getRunTimeline('run_abc123')
for (const entry of timeline) {
console.log(`[${entry.timestamp}] ${entry.type}`, {
stepName: entry.stepName,
detail: entry.detail,
})
}Entry types:
run_created— run was createdstep_scheduled— step was queued for executionstep_started— step began executingstep_completed— step finished successfullystep_failed— step failed with an errorstep_skipped— step was skipped (condition false, replay, etc.)run_completed— run finished successfullyrun_failed— run ended due to a step failurerun_canceled— run was canceled (includes cancel reason indetail)log— execution log entry fromctx.log()(includes level, message, data indetail)
The timeline is assembled from a single SQL query across runs, steps, and execution logs tables, ordered chronologically.
Cancel & Retry
Canceling a Running Workflow
Cancel an in-progress run to stop further step execution:
const canceledRun = await engine.cancelRun('run_abc123', 'No longer needed')Behavior:
- Sets the run status to
canceledwith a timestamp and optional reason - Marks all pending/scheduled step runs as
skipped - Removes pending BullMQ jobs for that run
- The executor checks for canceled status before executing each step
- Fires the
onRunCompletehook withstatus: 'canceled' - Cannot cancel runs that are already
completed,failed, orcanceled
Retrying Failed Runs
Resume a failed run from the step that failed:
const retriedRun = await engine.retryRun('run_abc123')Behavior:
- Finds the first failed step in the run
- Resets that step to
pendingstatus (clears error, timestamps) - Sets the run back to
runningstatus - Re-enqueues the step in BullMQ with its original retry policy
- Cannot retry runs that aren't in
failedstatus
Listing Failed Runs
Query failed runs for monitoring dashboards:
// All failed runs
const allFailed = await engine.getFailedRuns()
// Failed runs for a specific tenant
const tenantFailed = await engine.getFailedRuns('workspace_1')API Reference
createEngine(config)
Creates and returns an engine instance.
import { createEngine } from '@kylebegeman/pulse'
const engine = createEngine({
db: pool, // Required — pg Pool instance
redis: redisClient, // Required — ioredis instance
tablePrefix: 'pulse_', // Optional — table name prefix (default: 'pulse_')
queuePrefix: 'pulse', // Optional — BullMQ queue prefix (default: 'pulse')
concurrency: 5, // Optional — step worker concurrency (default: 5)
onStepComplete: (e) => {}, // Optional — lifecycle hook
onRunComplete: (e) => {}, // Optional — lifecycle hook
})Engine Methods
Registration
| Method | Description |
|---|---|
| registerTrigger(type, registration) | Register a signal type with source, optional resource type, and optional Zod payload schema |
| registerAction(name, handler, options?) | Register an action handler. Set replaySafe: true/false in options |
| registerCondition(name, handler) | Register a condition handler that returns a boolean |
Signals
| Method | Returns | Description |
|---|---|---|
| emit(trigger) | Promise<TriggerEnvelope> | Emit a signal. Persists the trigger, matches workflows, and creates runs |
Lifecycle
| Method | Returns | Description |
|---|---|---|
| start() | Promise<void> | Start BullMQ workers and restore cron jobs. Call after registering handlers |
| stop() | Promise<void> | Graceful shutdown — waits for active jobs to complete, stops cron workers |
Queries
| Method | Returns | Description |
|---|---|---|
| getRun(runId) | Promise<WorkflowRun \| null> | Get a workflow run by ID |
| getRunSteps(runId) | Promise<WorkflowStepRun[]> | Get all step runs for a workflow run (includes branch steps) |
| getRunsByTrigger(triggerId) | Promise<WorkflowRun[]> | Get all runs spawned by a trigger |
| getTrigger(triggerId) | Promise<TriggerEnvelope \| null> | Get a persisted trigger by ID |
Timeline
| Method | Returns | Description |
|---|---|---|
| getRunTimeline(runId) | Promise<RunTimelineEntry[]> | Chronological lifecycle view of a run — steps, logs, status changes |
Cancel & Retry
| Method | Returns | Description |
|---|---|---|
| cancelRun(runId, reason?) | Promise<WorkflowRun> | Cancel an in-progress run. Removes pending jobs |
| retryRun(runId) | Promise<WorkflowRun> | Retry a failed run from the failed step |
| getFailedRuns(tenantId?) | Promise<WorkflowRun[]> | List failed runs, optionally filtered by tenant |
Replay
| Method | Returns | Description |
|---|---|---|
| replay(triggerId, options?) | Promise<WorkflowRun[]> | Re-emit a historical trigger. Pass { dryRun: true } to skip actions |
Workflow Management
| Method | Returns | Description |
|---|---|---|
| createWorkflow(definition) | Promise<WorkflowDefinition> | Create a new workflow definition. Schedules cron if applicable |
| enableWorkflow(definitionId) | Promise<void> | Enable a workflow definition. Starts cron if defined |
| disableWorkflow(definitionId) | Promise<void> | Disable a workflow definition. Stops cron if defined |
Schema
| Method | Returns | Description |
|---|---|---|
| migrate() | Promise<void> | Create pulse_* tables in your database (idempotent) |
Type Reference
interface TriggerInput {
tenantId: string
source: string
type: string
resourceType?: string
resourceId?: string
environment?: string
payload?: Record<string, unknown>
}interface TriggerEnvelope extends TriggerInput {
id: string // Auto-generated (e.g., 'trg_...')
createdAt: Date
}interface TriggerRegistration {
source: string
resourceType?: string
payloadSchema?: ZodSchema // Optional Zod schema for payload validation
}interface WorkflowDefinition {
id: string
tenantId: string
name: string
description?: string
triggerType: string
environmentFilter?: string
resourceTypeFilter?: string
steps: WorkflowStep[]
config: Record<string, unknown>
isEnabled: boolean
cronExpression?: string // Cron schedule (e.g., '0 2 * * *')
createdAt: Date
updatedAt: Date
}type StepType = 'action' | 'condition' | 'delay' | 'parallel'
interface WorkflowStep {
type: StepType
name: string
config?: Record<string, unknown>
delayMs?: number // Only for 'delay' steps
timeoutMs?: number // Step execution timeout
retryPolicy?: RetryPolicy // Retry on failure
onFalse?: 'complete' | 'skip' | number // Condition false behavior
branches?: WorkflowStep[][] // Only for 'parallel' steps
}type WorkflowStatus = 'pending' | 'running' | 'waiting' | 'completed' | 'failed' | 'canceled'
interface WorkflowRun {
id: string
definitionId: string
tenantId: string
triggerId: string
status: WorkflowStatus
context: Record<string, unknown>
currentStepIndex: number
isReplay: boolean
definitionSnapshot: DefinitionSnapshot
startedAt?: Date
completedAt?: Date
failedAt?: Date
canceledAt?: Date // When the run was canceled
cancelReason?: string // Why the run was canceled
createdAt: Date
updatedAt: Date
}type StepStatus = 'pending' | 'scheduled' | 'running' | 'completed' | 'failed' | 'skipped'
interface WorkflowStepRun {
id: string
runId: string
tenantId: string
stepIndex: number
stepType: StepType
stepName: string
status: StepStatus
scheduledFor?: Date
startedAt?: Date
completedAt?: Date
result?: Record<string, unknown>
errorMessage?: string
branchIndex?: number // Branch index for parallel step branches
parentStepRunId?: string // Parent step run ID for branch steps
createdAt: Date
}interface RunTimelineEntry {
timestamp: Date
type:
| 'run_created' | 'run_completed' | 'run_failed' | 'run_canceled'
| 'step_scheduled' | 'step_started' | 'step_completed'
| 'step_failed' | 'step_skipped' | 'log'
stepIndex?: number
stepName?: string
stepType?: string
detail?: Record<string, unknown>
}interface DefinitionSnapshot {
steps: WorkflowStep[]
config: Record<string, unknown>
triggerType: string
}interface RetryPolicy {
maxAttempts: number // 1-10
backoffMs: number // 100-300000 ms
}interface WorkflowContext {
tenantId: string
trigger: TriggerEnvelope
run: WorkflowRun
step: WorkflowStepRun
config: Record<string, unknown>
isReplay: boolean
emit: (trigger: TriggerInput) => Promise<TriggerEnvelope>
log: (message: string, data?: Record<string, unknown>) => void
}interface ActionResult {
success: boolean
data?: Record<string, unknown>
error?: string
}interface ActionOptions {
replaySafe: boolean // Whether this action can be re-executed during replay
}interface EngineConfig {
db: Pool // pg Pool instance
redis: unknown // ioredis instance
tablePrefix?: string // Default: 'pulse_'
queuePrefix?: string // Default: 'pulse'
concurrency?: number // Step worker concurrency (default: 5)
onStepComplete?: LifecycleHook<StepCompleteEvent>
onRunComplete?: LifecycleHook<RunCompleteEvent>
}interface StepCompleteEvent {
run: WorkflowRun
step: WorkflowStepRun
}
interface RunCompleteEvent {
run: WorkflowRun
status: 'completed' | 'failed' | 'canceled'
}interface ReplayOptions {
dryRun?: boolean // If true, skip all actions (log only)
}interface ExecutionLog {
id: string
runId: string
stepRunId?: string
tenantId: string
level: 'info' | 'warn' | 'error'
message: string
data?: Record<string, unknown>
createdAt: Date
}Database Schema
The engine creates tables in your PostgreSQL database with the prefix pulse_ (configurable). All tables use TEXT primary keys with auto-generated IDs. Migrations are idempotent — safe to call on every startup.
await engine.migrate()pulse_triggers
Persisted signal records for auditing and replay.
| Column | Type | Description |
|---|---|---|
| id | TEXT PK | Trigger ID (e.g., trg_...) |
| tenant_id | TEXT | Tenant scope |
| source | TEXT | Originating system |
| type | TEXT | Signal type |
| resource_type | TEXT | Resource category |
| resource_id | TEXT | Specific resource |
| environment | TEXT | Environment scope |
| payload | JSONB | Arbitrary event data |
| created_at | TIMESTAMPTZ | When the signal was emitted |
Indexes: (tenant_id, type), (created_at)
pulse_workflow_definitions
Workflow templates that define step sequences and matching rules.
| Column | Type | Description |
|---|---|---|
| id | TEXT PK | Definition ID |
| tenant_id | TEXT | Tenant scope |
| name | TEXT | Human-readable name |
| description | TEXT | Optional description |
| trigger_type | TEXT | Signal type to match |
| environment_filter | TEXT | Optional environment filter |
| resource_type_filter | TEXT | Optional resource type filter |
| steps | JSONB | Ordered step definitions |
| config | JSONB | Config passed to handlers |
| is_enabled | BOOLEAN | Whether the workflow is active |
| cron_expression | TEXT | Optional cron schedule for automatic trigger emission |
| created_at | TIMESTAMPTZ | Creation time |
| updated_at | TIMESTAMPTZ | Last update time |
Indexes: (tenant_id, trigger_type, is_enabled)
pulse_workflow_runs
Execution records — one per workflow triggered by a signal.
| Column | Type | Description |
|---|---|---|
| id | TEXT PK | Run ID |
| definition_id | TEXT FK | Workflow definition |
| tenant_id | TEXT | Tenant scope |
| trigger_id | TEXT FK | Signal that started this run |
| status | TEXT | pending / running / waiting / completed / failed / canceled |
| context | JSONB | Shared run context |
| current_step_index | INTEGER | Active step position |
| is_replay | BOOLEAN | Whether this is a replay run |
| definition_snapshot | JSONB | Frozen copy of workflow definition at run creation |
| started_at | TIMESTAMPTZ | When execution began |
| completed_at | TIMESTAMPTZ | When execution completed |
| failed_at | TIMESTAMPTZ | When execution failed |
| canceled_at | TIMESTAMPTZ | When the run was canceled |
| cancel_reason | TEXT | Why the run was canceled |
| created_at | TIMESTAMPTZ | Creation time |
| updated_at | TIMESTAMPTZ | Last update time |
Indexes: (tenant_id, status), (trigger_id)
pulse_workflow_step_runs
Individual step execution records within a run.
| Column | Type | Description |
|---|---|---|
| id | TEXT PK | Step run ID |
| run_id | TEXT FK | Parent workflow run |
| tenant_id | TEXT | Tenant scope |
| step_index | INTEGER | Position in step sequence |
| step_type | TEXT | action / condition / delay / parallel |
| step_name | TEXT | Registered handler name |
| status | TEXT | pending / scheduled / running / completed / failed / skipped |
| scheduled_for | TIMESTAMPTZ | When the step should execute (for delays) |
| started_at | TIMESTAMPTZ | When execution began |
| completed_at | TIMESTAMPTZ | When execution completed |
| result | JSONB | Action result data |
| error_message | TEXT | Error details on failure |
| branch_index | INTEGER | Branch index within a parallel step (null for top-level) |
| parent_step_run_id | TEXT | Parent parallel step run ID (null for top-level) |
| created_at | TIMESTAMPTZ | Creation time |
Indexes: (run_id, step_index), (status, scheduled_for) WHERE status = 'scheduled'
pulse_execution_logs
Structured execution logs written by ctx.log() calls.
| Column | Type | Description |
|---|---|---|
| id | TEXT PK | Log entry ID |
| run_id | TEXT FK | Parent workflow run |
| step_run_id | TEXT FK | Associated step (optional) |
| tenant_id | TEXT | Tenant scope |
| level | TEXT | info / warn / error |
| message | TEXT | Log message |
| data | JSONB | Structured log data |
| created_at | TIMESTAMPTZ | When the log was written |
Indexes: (run_id, created_at)
Runtime Guarantees
Execution Semantics
Pulse provides at-least-once execution semantics. If a worker crashes after an action completes but before the engine records success, the step may execute again on retry. Design your actions to be idempotent where possible — especially for side effects like sending emails, charging customers, or creating external resources.
You can derive idempotency keys from the workflow context passed to every action:
engine.registerAction('send_email', async (ctx) => {
const idempotencyKey = `${ctx.run.id}:${ctx.step.name}`
// Use this key with your email provider to prevent duplicates
})Step Claiming
Every step is claimed atomically before execution using a compare-and-set query:
UPDATE step_runs SET status = 'running', started_at = NOW()
WHERE id = $1 AND status IN ('pending', 'scheduled')
RETURNING *If two workers pick up the same job from the queue (e.g., due to retry or crash recovery), only one will successfully claim the step. The other receives a null result and silently no-ops.
Run State Transitions
Run status changes are guarded by the current status. Invalid transitions (e.g., completing an already-canceled run) are rejected:
pending → running, canceled
running → waiting, completed, failed, canceled
waiting → running, canceled
failed → running (retry)Attempts to transition from an unexpected state throw an error rather than silently corrupting run state.
Cancellation
When a run is canceled:
- The run status is set to
canceledwith a timestamp and optional reason. - All pending and scheduled step runs are marked as
skipped. - A step that is already running will complete its current execution — the engine checks run status before scheduling the next step.
Retry
Retrying a failed run:
- Finds the first failed step run and resets it to
pending. - Sets the run status back to
running. - The step is re-enqueued and executes with the original retry policy.
- Execution continues from the failed step — previously completed steps are not re-run.
Replay
Replay re-emits a historical trigger to create a new run:
- A new run is created with
isReplay: true. - Actions flagged with
replaySafe: falseare skipped during replay execution. - All other steps execute normally against the replayed trigger data.
- Replay is useful for debugging, backfilling, and testing new workflows against historical signals.
Parallel Execution
Parallel steps fan out into concurrent branches:
- Each branch executes independently with its own step runs.
- All branches must complete for the parallel step to succeed.
- If any branch fails, pending sibling branch steps are canceled and the run is marked as failed.
- Context from each branch is merged — each step writes to
context[actionName], so branch steps should have unique action names.
Context Accumulation
Each action's return value is stored in the run context keyed by action name:
// After "check_status" action returns { healthy: true }
// context.check_status === { healthy: true }
// Next action can access it:
engine.registerAction('notify', async (ctx) => {
if (ctx.context.check_status.healthy) { /* ... */ }
})Step names within a workflow must be unique, preventing key collisions.
Architecture
┌─────────────────────────────────────────────────────────────────┐
│ Your Application │
│ │
│ engine.emit({ type: 'heartbeat.missed', ... }) │
│ │ │
│ ▼ │
│ ┌──────────┐ ┌───────────┐ ┌───────────────────────┐ │
│ │ Registry │ │ Matcher │────▶│ Run Manager │ │
│ │ │ │ │ │ (state machine) │ │
│ │ triggers │ │ finds │ │ │ │
│ │ actions │ │ matching │ │ pending → running │ │
│ │ conditions│ │ workflows │ │ → waiting → completed │ │
│ └──────────┘ └───────────┘ └───────┬───────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────────┐ ┌─────────────────────────────┐ │
│ │ Step Scheduler │ │ Step Executor │ │
│ │ │────▶│ │ │
│ │ BullMQ delayed jobs │ │ delay → condition → action │ │
│ │ + parallel branches │ │ + parallel branch dispatch │ │
│ └──────────────────────┘ └─────────────────────────────┘ │
│ │
│ ┌────────────────────┐ ┌─────────────────────────────────┐ │
│ │ Replay Manager │ │ Execution Logs │ │
│ │ │ │ │ │
│ │ re-emit historical │ │ structured logs per run/step │ │
│ │ signals with safety│ │ ctx.log() → pulse_execution_logs│ │
│ └────────────────────┘ └─────────────────────────────────┘ │
│ │
│ ┌────────────────────┐ ┌─────────────────────────────────┐ │
│ │ Cron Manager │ │ Run Timeline │ │
│ │ │ │ │ │
│ │ BullMQ repeatable │ │ chronological event view │ │
│ │ jobs → auto-emit │ │ for debugging & dashboards │ │
│ └────────────────────┘ └─────────────────────────────────┘ │
│ │
│ ┌──────────────┐ ┌────────────────┐ │
│ │ PostgreSQL │ │ Redis │ │
│ │ (your DB) │ │ (BullMQ jobs) │ │
│ └──────────────┘ └────────────────┘ │
└─────────────────────────────────────────────────────────────────┘Module Breakdown
| Module | File | Purpose |
|---|---|---|
| Engine Factory | src/index.ts | createEngine() — wires everything together |
| Types | src/types.ts | All TypeScript interfaces and type definitions |
| Registry | src/registry.ts | Trigger, action, and condition registration |
| Matcher | src/matcher.ts | Signal-to-workflow matching, run creation, and validation |
| Run Manager | src/runs.ts | Workflow run lifecycle, state machine, timeline, cancel |
| Scheduler | src/scheduler.ts | Step scheduling with BullMQ delayed jobs + parallel branches |
| Executor | src/executor.ts | Step dispatch — delay, condition, action, parallel |
| Cron Manager | src/cron.ts | Cron-triggered workflow scheduling via BullMQ repeatable jobs |
| Replay | src/replay.ts | Trigger persistence and replay support |
| Queue | src/queue.ts | BullMQ queue and worker setup (match, step, cron queues) |
| Logs | src/logs.ts | Execution logging and query helpers |
| Schema | src/schema/ | Table definitions and migration runner |
Examples
Monitoring: Incident on Missed Heartbeat
A service stops sending heartbeats. Wait 5 minutes, check if it's still down, then create an incident.
// Register handlers
engine.registerTrigger('heartbeat.missed', {
source: 'heartbeat',
resourceType: 'service',
})
engine.registerCondition('service_still_down', async (ctx) => {
const status = await healthCheck(ctx.trigger.resourceId)
return status !== 'healthy'
})
engine.registerAction('create_incident', async (ctx) => {
const incident = await db.incidents.create({
tenantId: ctx.tenantId,
serviceId: ctx.trigger.resourceId,
severity: ctx.config.severity || 'medium',
title: `Service ${ctx.trigger.resourceId} is unresponsive`,
})
ctx.log('Incident created', { incidentId: incident.id })
return { success: true, data: { incidentId: incident.id } }
}, { replaySafe: true })
engine.registerAction('notify_oncall', async (ctx) => {
await pagerduty.trigger({
service: ctx.trigger.resourceId,
severity: ctx.config.severity,
})
return { success: true }
}, { replaySafe: false }) // Don't re-page during replay
// Create the workflow
await engine.createWorkflow({
tenantId: 'workspace_1',
name: 'Incident on missed heartbeat',
triggerType: 'heartbeat.missed',
environmentFilter: 'production',
steps: [
{ type: 'delay', name: 'wait_5m', delayMs: 5 * 60 * 1000 },
{ type: 'condition', name: 'service_still_down' },
{ type: 'action', name: 'create_incident' },
{ type: 'action', name: 'notify_oncall' },
],
config: { severity: 'high' },
isEnabled: true,
})Billing: Trial Expiration Workflow
A trial is about to expire. Send a warning email, wait, then downgrade if they haven't upgraded.
engine.registerTrigger('trial.expiring', {
source: 'billing',
resourceType: 'subscription',
})
engine.registerAction('send_trial_warning', async (ctx) => {
await emails.send({
to: ctx.trigger.payload.email,
template: 'trial-expiring',
data: { daysLeft: ctx.trigger.payload.daysLeft },
})
return { success: true }
}, { replaySafe: false })
engine.registerCondition('has_not_upgraded', async (ctx) => {
const sub = await billing.getSubscription(ctx.tenantId)
return sub.plan === 'trial'
})
engine.registerAction('downgrade_to_free', async (ctx) => {
await billing.changePlan(ctx.tenantId, 'free')
ctx.log('Downgraded to free plan')
return { success: true }
}, { replaySafe: true })
await engine.createWorkflow({
tenantId: 'workspace_1',
name: 'Trial expiration',
triggerType: 'trial.expiring',
steps: [
{ type: 'action', name: 'send_trial_warning' },
{ type: 'delay', name: 'wait_3_days', delayMs: 3 * 24 * 60 * 60 * 1000 },
{ type: 'condition', name: 'has_not_upgraded' },
{ type: 'action', name: 'downgrade_to_free' },
],
config: {},
isEnabled: true,
})Chaining: Workflow That Triggers Another Workflow
Actions can emit signals, allowing workflows to trigger other workflows.
// First workflow: detect issue → emit resolution signal
engine.registerAction('attempt_auto_fix', async (ctx) => {
const result = await autoRemediate(ctx.trigger.resourceId)
if (result.fixed) {
// Emit a new signal — this will match other workflows
await ctx.emit({
tenantId: ctx.tenantId,
source: 'auto-remediation',
type: 'service.recovered',
resourceType: 'service',
resourceId: ctx.trigger.resourceId,
payload: { fix: result.action },
})
}
return { success: true, data: result }
}, { replaySafe: true })
// Second workflow: triggered by the recovery signal
engine.registerAction('close_incident', async (ctx) => {
await db.incidents.close({
serviceId: ctx.trigger.resourceId,
resolution: ctx.trigger.payload.fix,
})
return { success: true }
}, { replaySafe: true })
await engine.createWorkflow({
tenantId: 'workspace_1',
name: 'Auto-close on recovery',
triggerType: 'service.recovered',
steps: [
{ type: 'action', name: 'close_incident' },
],
config: {},
isEnabled: true,
})Parallel: Multi-Channel Notification
Send notifications across multiple channels simultaneously, then mark the alert as notified.
engine.registerAction('send_email', async (ctx) => {
await emails.send({ to: ctx.config.alertEmail, subject: 'Alert' })
return { success: true }
}, { replaySafe: false })
engine.registerAction('send_slack', async (ctx) => {
await slack.post({ channel: ctx.config.slackChannel, text: 'Alert triggered' })
return { success: true }
}, { replaySafe: false })
engine.registerAction('send_sms', async (ctx) => {
await sms.send({ to: ctx.config.phoneNumber, body: 'Alert triggered' })
return { success: true }
}, { replaySafe: false })
engine.registerAction('mark_notified', async (ctx) => {
await db.alerts.update(ctx.trigger.resourceId, { notified: true })
return { success: true }
}, { replaySafe: true })
await engine.createWorkflow({
tenantId: 'workspace_1',
name: 'Multi-channel alert',
triggerType: 'alert.triggered',
steps: [
{
type: 'parallel',
name: 'notify_all',
branches: [
[{ type: 'action', name: 'send_email' }],
[{ type: 'action', name: 'send_slack' }],
[{ type: 'action', name: 'send_sms' }],
],
},
{ type: 'action', name: 'mark_notified' },
],
config: {
alertEmail: '[email protected]',
slackChannel: '#alerts',
phoneNumber: '+1234567890',
},
isEnabled: true,
})Cron: Daily Cleanup Job
Run a cleanup workflow on a schedule without needing an external signal.
engine.registerTrigger('maintenance.cleanup', {
source: 'cron',
})
engine.registerAction('cleanup_old_records', async (ctx) => {
const deleted = await db.query(
'DELETE FROM temp_data WHERE created_at < NOW() - INTERVAL \'30 days\' RETURNING id'
)
ctx.log('Cleaned up old records', { count: deleted.rowCount })
return { success: true, data: { deletedCount: deleted.rowCount } }
}, { replaySafe: true })
engine.registerAction('send_cleanup_report', async (ctx) => {
await emails.send({
to: '[email protected]',
subject: 'Daily cleanup report',
body: `Deleted ${ctx.run.context.deletedCount ?? 0} old records`,
})
return { success: true }
}, { replaySafe: false })
await engine.createWorkflow({
tenantId: 'workspace_1',
name: 'Daily cleanup',
triggerType: 'maintenance.cleanup',
steps: [
{ type: 'action', name: 'cleanup_old_records' },
{ type: 'action', name: 'send_cleanup_report' },
],
config: {},
isEnabled: true,
cronExpression: '0 2 * * *', // Every day at 2 AM
})Observability: Run Timeline Dashboard
Use the timeline API to build a debugging view of what happened during a run.
const timeline = await engine.getRunTimeline('run_abc123')
// Display in a dashboard
for (const entry of timeline) {
const time = entry.timestamp.toISOString()
switch (entry.type) {
case 'run_created':
console.log(`${time} Run started`)
break
case 'step_started':
console.log(`${time} Step "${entry.stepName}" (${entry.stepType}) started`)
break
case 'step_completed':
console.log(`${time} Step "${entry.stepName}" completed`, entry.detail)
break
case 'step_failed':
console.log(`${time} Step "${entry.stepName}" FAILED:`, entry.detail?.error)
break
case 'log':
console.log(`${time} [${entry.detail?.level}] ${entry.detail?.message}`)
break
case 'run_completed':
console.log(`${time} Run completed successfully`)
break
case 'run_canceled':
console.log(`${time} Run canceled: ${entry.detail?.reason}`)
break
}
}Error Recovery: Retry Failed Runs
Monitor for failures and automatically or manually retry.
// List all failed runs for a tenant
const failed = await engine.getFailedRuns('workspace_1')
for (const run of failed) {
console.log(`Run ${run.id} failed at ${run.failedAt}`)
// Retry the run — resumes from the failed step
try {
const retried = await engine.retryRun(run.id)
console.log(`Retried run ${retried.id}, now ${retried.status}`)
} catch (err) {
console.error(`Cannot retry: ${err.message}`)
}
}
// Cancel a run that's stuck or no longer needed
await engine.cancelRun('run_xyz789', 'Superseded by newer deployment')Requirements
| Requirement | Version | Notes |
|---|---|---|
| Node.js | 18+ | ESM module |
| PostgreSQL | 12+ | Engine creates pulse_* tables |
| Redis | 6+ | Used by BullMQ for job queuing |
| TypeScript | 5.0+ | Full type definitions included |
License
MIT
