npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

ai-workflows

v2.1.3

Published

Event-driven workflows with state machine support

Readme

ai-workflows

Event-driven AI workflows shouldn't require a PhD in distributed systems.

You have business logic that needs to react to events, run on schedules, and coordinate parallel tasks. Traditional workflow engines make you wade through XML configs, learn proprietary DSLs, and debug mysterious state machines. You just want to write $.on.Order.placed(handler) and have it work.

import { Workflow } from 'ai-workflows'

const workflow = Workflow($ => {
  $.on.Customer.created(async (customer, $) => {
    await $.send('Email.welcome', { to: customer.email })
  })

  $.every.Monday.at9am(async ($) => {
    $.log('Weekly standup reminder')
  })
})

await workflow.start()

That's it. No YAML. No state machine diagrams. Just JavaScript.

Installation

npm install ai-workflows

Quick Start

Event Handlers

React to events with the $.on pattern. Events follow Noun.verb naming:

Workflow($ => {
  $.on.Order.placed(async (order, $) => {
    $.log('Processing order', order.id)
    await $.send('Inventory.reserve', { items: order.items })
    await $.send('Payment.charge', { amount: order.total })
  })

  $.on.Payment.completed(async (payment, $) => {
    await $.send('Order.fulfill', { orderId: payment.orderId })
  })

  $.on.Payment.failed(async (payment, $) => {
    await $.send('Customer.notify', {
      message: 'Payment failed',
      orderId: payment.orderId
    })
  })
})

Scheduled Tasks

Natural scheduling with $.every:

Workflow($ => {
  // Simple intervals
  $.every.hour(async ($) => {
    $.log('Hourly health check')
  })

  // Day + time combinations
  $.every.Monday.at9am(async ($) => {
    const report = await $.do('Analytics.weeklyReport', {})
    await $.send('Slack.post', { channel: '#metrics', report })
  })

  $.every.weekday.at8am(async ($) => {
    $.log('Good morning! Time to standup.')
  })

  // Precise intervals
  $.every.minutes(30)(async ($) => {
    await $.send('Cache.refresh', {})
  })
})

Available schedules:

| Intervals | Days | Times | |-----------|------|-------| | $.every.second | $.every.Monday | .at6am .at7am .at8am | | $.every.minute | $.every.Tuesday | .at9am .at10am .at11am | | $.every.hour | $.every.Wednesday | .at12pm .atnoon | | $.every.day | $.every.Thursday | .at1pm .at2pm .at3pm | | $.every.week | $.every.Friday | .at4pm .at5pm .at6pm | | $.every.month | $.every.Saturday | .at7pm .at8pm .at9pm | | $.every.minutes(n) | $.every.Sunday | .atmidnight | | $.every.hours(n) | $.every.weekday | | | | $.every.weekend | |

The Cascade Pattern

Not every problem can be solved with code. Some need AI. Some need human judgment. The cascade executor tries each tier in sequence, escalating only when needed:

Code -> Generative AI -> Agentic AI -> Human
import { CascadeExecutor } from 'ai-workflows'

const processRefund = new CascadeExecutor({
  cascadeName: 'refund-processor',

  tiers: {
    // Tier 1: Deterministic rules (fastest, cheapest)
    code: {
      name: 'rule-based-refund',
      execute: async (request) => {
        if (request.amount < 50 && request.reason === 'defective') {
          return { approved: true, method: 'original-payment' }
        }
        throw new Error('Rules inconclusive')
      }
    },

    // Tier 2: AI analysis for complex cases
    generative: {
      name: 'ai-refund-analysis',
      execute: async (request, ctx) => {
        const analysis = await analyzeRefundRequest(request)
        if (analysis.confidence > 0.9) {
          return { approved: analysis.shouldApprove, reason: analysis.explanation }
        }
        throw new Error('Confidence too low')
      }
    },

    // Tier 3: Agent with tool access
    agentic: {
      name: 'refund-agent',
      execute: async (request, ctx) => {
        return await refundAgent.process(request)
      }
    },

    // Tier 4: Human review for edge cases
    human: {
      name: 'human-review',
      execute: async (request) => {
        return await createHumanTask({
          type: 'refund-review',
          data: request,
          assignTo: 'support-team'
        })
      }
    }
  },

  // Default timeouts per tier
  useDefaultTimeouts: true,  // code: 5s, generative: 30s, agentic: 5m, human: 24h
})

const result = await processRefund.execute(refundRequest)
console.log(`Resolved by ${result.tier} tier in ${result.metrics.totalDuration}ms`)

Cascade Features

  • Automatic escalation - Failed tiers escalate to the next level
  • Tier timeouts - Each tier has configurable time limits
  • Retry support - Configure retries with exponential backoff per tier
  • Skip conditions - Skip tiers based on input characteristics
  • 5W+H audit trail - Full event log: who, what, when, where, why, how

Dependency Graphs

For complex workflows with interdependent steps, use the dependency graph to ensure correct execution order:

import { DependencyGraph, getExecutionLevels } from 'ai-workflows'

const graph = new DependencyGraph()

// Steps with no dependencies run first (level 0)
graph.addNode('fetch-user')
graph.addNode('fetch-products')

// Dependent steps run after their dependencies complete
graph.addNode('validate-cart', { dependsOn: ['fetch-user', 'fetch-products'] })
graph.addNode('calculate-shipping', { dependsOn: 'fetch-products' })
graph.addNode('apply-discounts', { dependsOn: 'validate-cart' })
graph.addNode('process-payment', { dependsOn: ['apply-discounts', 'calculate-shipping'] })

// Automatic cycle detection
try {
  graph.addNode('bad-step', { dependsOn: 'process-payment' })
  graph.addEdge('bad-step', 'fetch-user')  // Would create a cycle!
} catch (e) {
  console.log('Caught circular dependency:', e.cyclePath)
}

Topological Sort

Execute steps in dependency order:

import { topologicalSort, getExecutionLevels } from 'ai-workflows'

const steps = [
  { id: 'A', dependencies: [] },
  { id: 'B', dependencies: ['A'] },
  { id: 'C', dependencies: ['A'] },
  { id: 'D', dependencies: ['B', 'C'] },
]

// Linear execution order
const { order } = topologicalSort(steps)
// => ['A', 'B', 'C', 'D']

// Parallel execution groups
const levels = getExecutionLevels(steps)
// => [
//   { level: 0, nodes: ['A'] },        // Run first
//   { level: 1, nodes: ['B', 'C'] },   // Run in parallel
//   { level: 2, nodes: ['D'] }         // Run after B and C complete
// ]

Barriers and Joins

Coordinate parallel operations with barrier semantics:

import { waitForAll, waitForAny, Barrier, withConcurrencyLimit } from 'ai-workflows'

// Wait for all parallel tasks
const results = await waitForAll([
  fetchUserData(userId),
  fetchOrderHistory(userId),
  fetchRecommendations(userId),
], { timeout: 5000 })

// Wait for N of M (e.g., 2 of 3 replicas)
const { completed, pending } = await waitForAny(2, [
  writeToReplica1(data),
  writeToReplica2(data),
  writeToReplica3(data),
])

// Manual barrier for complex coordination
const barrier = new Barrier(3, {
  timeout: 10000,
  onProgress: ({ arrived, expected, percentage }) => {
    console.log(`${arrived}/${expected} (${percentage}%)`)
  }
})

// In parallel handlers...
barrier.arrive(resultFromWorker1)
barrier.arrive(resultFromWorker2)
barrier.arrive(resultFromWorker3)

// Wait for all to arrive
const allResults = await barrier.wait()

Concurrency Control

Limit parallel executions to prevent overwhelming downstream services:

const urls = [/* 100 URLs */]

// Process 5 at a time
const results = await withConcurrencyLimit(
  urls.map(url => () => fetch(url)),
  5,  // max concurrent
  { collectErrors: true }  // don't fail fast
)

Standalone API

Use on, every, and send for global registration outside of a workflow:

import { on, every, send } from 'ai-workflows'

// Register handlers
on.Customer.created(async (customer, $) => {
  await $.send('Email.welcome', { to: customer.email })
})

every.hour(async ($) => {
  $.log('Background task running')
})

// Emit events from anywhere
await send('Customer.created', { name: 'Alice', email: '[email protected]' })

Configuration

Custom Cron Converter

Enable natural language scheduling with an AI-powered cron converter:

import { setCronConverter } from 'ai-workflows'

setCronConverter(async (description) => {
  // Use your AI service to convert natural language to cron
  const response = await ai.complete(`Convert to cron: "${description}"`)
  return response.cron
})

// Now you can use natural language
$.every('first Monday of the month at 9am', handler)
$.every('every 15 minutes during business hours', handler)

Cascade Timeouts

Configure per-tier and total timeouts:

const executor = new CascadeExecutor({
  tiers: { /* ... */ },

  // Custom timeouts per tier (milliseconds)
  timeouts: {
    code: 2000,        // 2 seconds
    generative: 15000, // 15 seconds
    agentic: 60000,    // 1 minute
    human: 3600000,    // 1 hour
  },

  // Or use defaults
  useDefaultTimeouts: true,  // code: 5s, generative: 30s, agentic: 5m, human: 24h

  // Total cascade timeout
  totalTimeout: 300000,  // 5 minutes max for entire cascade
})

Retry Configuration

Add retries with exponential backoff:

const executor = new CascadeExecutor({
  tiers: { /* ... */ },

  retryConfig: {
    code: { maxRetries: 2, baseDelay: 100 },
    generative: { maxRetries: 3, baseDelay: 1000, multiplier: 2 },
    agentic: { maxRetries: 1, baseDelay: 5000 },
  }
})

Testing

Create isolated contexts for testing:

import { createTestContext } from 'ai-workflows'

const $ = createTestContext()

// Call your handler
await orderHandler({ id: '123', total: 99.99 }, $)

// Assert on emitted events
expect($.emittedEvents).toContainEqual({
  event: 'Payment.charge',
  data: { amount: 99.99 },
})

API Reference

Core Workflow

| Export | Description | |--------|-------------| | Workflow($) | Create a workflow with $ context | | on | Standalone event registration proxy | | every | Standalone schedule registration proxy | | send(event, data) | Emit events globally | | createTestContext() | Create isolated $ for testing |

Cascade Executor

| Export | Description | |--------|-------------| | CascadeExecutor | Tiered execution: code -> AI -> agent -> human | | TIER_ORDER | ['code', 'generative', 'agentic', 'human'] | | DEFAULT_TIER_TIMEOUTS | Default timeout per tier |

Dependency Graph

| Export | Description | |--------|-------------| | DependencyGraph | DAG for workflow step dependencies | | topologicalSort(nodes) | Sort nodes in dependency order | | getExecutionLevels(nodes) | Group nodes for parallel execution | | CircularDependencyError | Thrown when cycle detected |

Barriers

| Export | Description | |--------|-------------| | Barrier | Manual synchronization point | | waitForAll(promises) | Wait for all with timeout support | | waitForAny(n, promises) | Wait for N of M to complete | | withConcurrencyLimit(tasks, n) | Limit parallel executions |

Related Packages