npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

@rodgerai/workflows

v2.0.0

Published

Workflow orchestration for AI agents - framework agnostic, production ready

Downloads

2,319

Readme

rodger/workflows

Workflow orchestration for AI agents. Build complex, multi-step processes with suspend/resume, conditional branching, and state persistence. Framework agnostic and production ready.

Features

  • Framework Agnostic - Works with Rodger agents, AI SDK, Mastra, or plain functions
  • Beautiful Fluent API - Reads like your execution flow
  • Conditional Branching - Different paths based on previous results
  • Suspend/Resume - Human-in-the-loop workflows with state persistence
  • Streaming Support - Real-time workflow execution events
  • Retry & Error Handling - Per-step retry logic with exponential backoff
  • Backend Integration - Seamless integration with @rodger/backend
  • TypeScript First - Full type inference and safety
  • Production Ready - State persistence, auth, rate limiting built-in

Installation

npm install rodger @rodger/core
# or
pnpm add @rodger/workflows @rodger/core

Quick Start

1. Create a Simple Workflow

import { createWorkflow } from 'rodger/workflows';
import { myAgent } from './agent';

const workflow = createWorkflow('data-pipeline')
  .step('extract', async (ctx) => {
    return await fetchData();
  })
  .then('transform', myAgent)
  .then('load', async (ctx) => {
    const data = ctx.previousResults.transform;
    await saveToDatabase(data);
    return { success: true };
  })
  .commit();

// Execute
const result = await workflow.execute({ source: 'api' });
console.log('Result:', result.stepResults);

2. Integrate with Backend

// app/api/workflow/route.ts
import { handleWorkflow } from 'rodger/backend';
import { workflow } from '@/lib/workflow';
import { storage } from '@/lib/storage';

export async function POST(request: Request) {
  return handleWorkflow(request, {
    workflow,
    storage, // Optional: for state persistence
  });
}

Core Concepts

Steps

Steps are the building blocks of workflows. Each step can be an agent, a function, or async operation.

const workflow = createWorkflow('example')
  // Agent step
  .step('analyze', analyzerAgent)

  // Function step
  .then('process', async (ctx) => {
    return processData(ctx.input);
  })

  // Agent with context
  .then('summarize', summarizerAgent, {
    context: (ctx) => {
      // Pass previous results to agent
      return `Process result: ${ctx.previousResults.process}`;
    }
  })

  .commit();

Context

The workflow context carries state between steps:

const workflow = createWorkflow('example')
  .step('first', async (ctx) => {
    console.log('Input:', ctx.input); // Original input
    console.log('Session:', ctx.sessionId);
    console.log('User:', ctx.userId);
    return { value: 42 };
  })
  .then('second', async (ctx) => {
    // Access previous step results
    const firstResult = ctx.previousResults.first;
    console.log('Previous:', firstResult.value); // 42

    // Access all completed steps
    console.log('Completed:', ctx.completedSteps); // ['first']

    return { doubled: firstResult.value * 2 };
  })
  .commit();

Conditional Branching

Execute different steps based on conditions:

const workflow = createWorkflow('approval')
  .step('analyze', analyzerAgent)

  // Branch 1: High confidence path
  .then('auto-approve', async (ctx) => {
    return { approved: true };
  }, {
    when: (ctx) => ctx.previousResults.analyze.confidence > 0.9
  })

  // Branch 2: Manual review path
  .then('manual-review', async (ctx) => {
    await ctx.suspend(); // Pause for human review
    return { approved: true };
  }, {
    when: (ctx) => ctx.previousResults.analyze.confidence <= 0.9
  })

  // Both branches converge here
  .after(['auto-approve', 'manual-review'])
    .step('notify', notifierAgent)

  .commit();

Suspend/Resume

Pause workflows for human approval or external data:

// Create workflow with suspension
const workflow = createWorkflow('approval')
  .step('analyze', analyzerAgent)
  .then('review', async (ctx) => {
    if (ctx.previousResults.analyze.needsReview) {
      // Suspend - saves snapshot if storage provided
      await ctx.suspend();
    }
    return { reviewed: true };
  })
  .then('execute', executorAgent)
  .commit();

// Initial execution
const result = await workflow.execute(input, { storage: workflowStorage });

if (result.status === 'suspended') {
  console.log('Awaiting review:', result.executionId);

  // Later, resume after approval
  const resumed = await workflow.resume(result.executionId, workflowStorage);
  console.log('Completed:', resumed.result);
}

Complete Example: Loan Processing

import { createAgent } from 'rodger';
import { createWorkflow } from 'rodger/workflows';

// Create specialized agents for each step
const infoCollector = createAgent({
  name: 'Info Collector',
  llm: { provider: 'openai', model: 'gpt-4o' },
  instructions: 'Collect loan application information from the user.'
});

const riskAnalyzer = createAgent({
  name: 'Risk Analyzer',
  llm: { provider: 'openai', model: 'gpt-4o' },
  instructions: 'Analyze risk factors for the loan application.',
  tools: { calculateRisk, checkCreditScore }
});

const termsGenerator = createAgent({
  name: 'Terms Generator',
  llm: { provider: 'openai', model: 'gpt-4o' },
  instructions: 'Generate loan terms based on risk analysis.',
  tools: { generateTerms }
});

// Build the workflow
const loanWorkflow = createWorkflow('loan-processing')
  // Step 1: Collect information
  .step('collect', infoCollector, {
    retry: { attempts: 2, delay: 1000 },
    timeout: 60000
  })

  // Step 2: Analyze risk
  .then('analyze', riskAnalyzer, {
    retry: { attempts: 3, delay: 500, backoff: 'exponential' }
  })

  // Step 3a: Auto-approve (low risk)
  .then('auto-approve', termsGenerator, {
    when: (ctx) => ctx.previousResults.analyze.riskScore < 0.3,
    onError: async (ctx, error) => {
      // Fallback to standard terms
      return { interestRate: 5.5, error: 'Using standard terms' };
    }
  })

  // Step 3b: Manual review (high risk)
  .after('analyze')
  .step('manual-review', async (ctx) => {
    // Suspend for human approval
    await ctx.suspend();
    return { approved: true };
  }, {
    when: (ctx) => ctx.previousResults.analyze.riskScore >= 0.3
  })

  // Step 4: Send notification (after either path)
  .after(['auto-approve', 'manual-review'])
    .step('notify', async (ctx) => {
      const hasTerms = ctx.previousResults['auto-approve'];
      const wasReviewed = ctx.previousResults['manual-review'];

      if (hasTerms) {
        await sendApprovalEmail(ctx.userId, hasTerms);
      } else if (wasReviewed) {
        await sendReviewCompleteEmail(ctx.userId);
      }

      return { notified: true };
    })

  .commit();

// Execute the workflow
const result = await loanWorkflow.execute(
  { loanAmount: 50000, creditScore: 720 },
  { sessionId: 'session-123', userId: 'user-456' }
);

console.log('Status:', result.status);
console.log('Duration:', result.duration);
console.log('Results:', result.stepResults);

Workflow Storage

Implement WorkflowStorage to enable suspend/resume across server restarts.

Interface

import type { WorkflowStorage, WorkflowSnapshot } from 'rodger/workflows';

interface WorkflowStorage {
  saveSnapshot(snapshot: WorkflowSnapshot): Promise<void>;
  getSnapshot(executionId: string): Promise<WorkflowSnapshot | null>;
}

Redis Implementation

import { Redis } from 'ioredis';
import type { WorkflowStorage, WorkflowSnapshot } from 'rodger/workflows';

export class RedisWorkflowStorage implements WorkflowStorage {
  constructor(private redis: Redis) {}

  async saveSnapshot(snapshot: WorkflowSnapshot): Promise<void> {
    const key = `workflow:${snapshot.executionId}`;
    // 24 hour expiry - workflows auto-expire after completion
    await this.redis.setex(key, 86400, JSON.stringify(snapshot));
  }

  async getSnapshot(executionId: string): Promise<WorkflowSnapshot | null> {
    const key = `workflow:${executionId}`;
    const data = await this.redis.get(key);
    return data ? JSON.parse(data) : null;
  }
}

// Usage
const storage = new RedisWorkflowStorage(redis);
const result = await workflow.execute(input, { storage });

Supabase Implementation

import { createClient, SupabaseClient } from '@supabase/supabase-js';
import type { WorkflowStorage, WorkflowSnapshot } from 'rodger/workflows';

export class SupabaseWorkflowStorage implements WorkflowStorage {
  constructor(private supabase: SupabaseClient) {}

  async saveSnapshot(snapshot: WorkflowSnapshot): Promise<void> {
    await this.supabase
      .from('workflow_snapshots')
      .upsert({
        execution_id: snapshot.executionId,
        workflow_id: snapshot.workflowId,
        data: snapshot, // Store entire snapshot as JSONB
        status: snapshot.status,
        updated_at: snapshot.updatedAt,
      });
  }

  async getSnapshot(executionId: string): Promise<WorkflowSnapshot | null> {
    const { data } = await this.supabase
      .from('workflow_snapshots')
      .select('data')
      .eq('execution_id', executionId)
      .single();

    return data?.data || null;
  }
}

// Usage
const storage = new SupabaseWorkflowStorage(supabase);

Database Schema (Supabase/Postgres)

CREATE TABLE workflow_snapshots (
  execution_id TEXT PRIMARY KEY,
  workflow_id TEXT NOT NULL,
  data JSONB NOT NULL,
  status TEXT NOT NULL,
  created_at TIMESTAMPTZ DEFAULT NOW(),
  updated_at TIMESTAMPTZ NOT NULL
);

CREATE INDEX idx_workflow_snapshots_workflow ON workflow_snapshots(workflow_id);
CREATE INDEX idx_workflow_snapshots_status ON workflow_snapshots(status);
CREATE INDEX idx_workflow_snapshots_updated ON workflow_snapshots(updated_at);

Streaming Execution

Get real-time events as workflow executes:

const workflow = createWorkflow('streaming')
  .step('step1', agent1)
  .then('step2', agent2)
  .then('step3', agent3)
  .commit();

// Stream execution events
for await (const event of workflow.executeStream(input)) {
  switch (event.type) {
    case 'workflow-start':
      console.log('Started:', event.workflowId);
      break;

    case 'step-start':
      console.log('Step starting:', event.stepName);
      break;

    case 'step-complete':
      console.log('Step done:', event.stepName, event.result);
      break;

    case 'step-error':
      console.error('Step failed:', event.stepName, event.error);
      break;

    case 'workflow-complete':
      console.log('Workflow done:', event.result);
      break;

    case 'workflow-suspended':
      console.log('Suspended at:', event.stepName);
      console.log('Resume with:', event.executionId);
      break;
  }
}

API Reference

createWorkflow(id: string)

Create a new workflow builder.

const workflow = createWorkflow('my-workflow')
  .step('first', myAgent)
  .then('second', myFunction)
  .commit();

.step(name, executor, options?)

Add a step to the workflow.

Parameters:

  • name: Unique step identifier
  • executor: Agent, function, or async operation
  • options: Step configuration

Options:

interface StepOptions {
  // Conditional execution
  when?: (ctx: WorkflowContext) => boolean | Promise<boolean>;

  // Retry configuration
  retry?: {
    attempts: number;
    delay: number;
    backoff?: 'linear' | 'exponential';
  };

  // Timeout in milliseconds
  timeout?: number;

  // Error handling
  onError?: (ctx: WorkflowContext, error: Error) => Promise<any>;

  // Context customization (for agents)
  context?: (ctx: WorkflowContext) => string;
}

.then(name, executor, options?)

Add a sequential step (alias for .step()).

.after(stepNames)

Create steps that run after specific step(s) complete.

workflow
  .step('step1', agent1)
  .step('step2', agent2)
  .after(['step1', 'step2'])
    .step('step3', agent3) // Runs after both complete
  .commit();

.commit()

Finalize and return the executable workflow.

workflow.execute(input, options?)

Execute the workflow.

Parameters:

interface ExecutionOptions {
  sessionId?: string;
  userId?: string;
  metadata?: Record<string, any>;
  storage?: WorkflowStorage;
  trace?: TraceConfig;
}

Returns:

interface WorkflowResult {
  workflowId: string;
  executionId: string;
  status: 'completed' | 'failed' | 'suspended';
  result?: any;
  error?: Error;
  stepResults: Map<string, any>;
  startedAt: Date;
  completedAt: Date;
  duration: number;
  traces?: ExecutionTraceEntry[];
}

workflow.executeStream(input, options?)

Execute with streaming events.

Returns: AsyncIterable<WorkflowStreamEvent>

workflow.resume(executionId, storage)

Resume a suspended workflow.

const result = await workflow.resume(
  'exec-123',
  workflowStorage
);

Best Practices

1. Use Specialized Agents per Step

// ✅ Good: Focused agents
const workflow = createWorkflow('process')
  .step('extract', extractorAgent)
  .then('analyze', analyzerAgent)
  .then('summarize', summarizerAgent)
  .commit();

// ❌ Bad: One agent for everything
const workflow = createWorkflow('process')
  .step('everything', generalAgent)
  .commit();

2. Implement Proper Error Handling

const workflow = createWorkflow('robust')
  .step('critical', criticalAgent, {
    retry: { attempts: 3, delay: 1000, backoff: 'exponential' },
    onError: async (ctx, error) => {
      // Fallback logic
      await notifyAdmin(error);
      return { fallback: true };
    }
  })
  .commit();

3. Use Conditional Branching

const workflow = createWorkflow('adaptive')
  .step('analyze', analyzerAgent)
  .then('fast-path', fastAgent, {
    when: (ctx) => ctx.previousResults.analyze.simple === true
  })
  .then('complex-path', complexAgent, {
    when: (ctx) => ctx.previousResults.analyze.simple === false
  })
  .commit();

4. Store Workflow State for Resume

// Always provide storage for workflows that might suspend
const result = await workflow.execute(input, {
  storage: workflowStorage, // Required for suspend/resume
  sessionId: 'session-123',
  userId: 'user-456'
});

Examples

See complete examples in the examples directory:

Related Packages

License

MIT