@rodgerai/workflows
v2.0.0
Published
Workflow orchestration for AI agents - framework agnostic, production ready
Downloads
2,319
Maintainers
Readme
rodger/workflows
Workflow orchestration for AI agents. Build complex, multi-step processes with suspend/resume, conditional branching, and state persistence. Framework agnostic and production ready.
Features
- Framework Agnostic - Works with Rodger agents, AI SDK, Mastra, or plain functions
- Beautiful Fluent API - Reads like your execution flow
- Conditional Branching - Different paths based on previous results
- Suspend/Resume - Human-in-the-loop workflows with state persistence
- Streaming Support - Real-time workflow execution events
- Retry & Error Handling - Per-step retry logic with exponential backoff
- Backend Integration - Seamless integration with @rodger/backend
- TypeScript First - Full type inference and safety
- Production Ready - State persistence, auth, rate limiting built-in
Installation
npm install rodger @rodger/core
# or
pnpm add @rodger/workflows @rodger/coreQuick Start
1. Create a Simple Workflow
import { createWorkflow } from 'rodger/workflows';
import { myAgent } from './agent';
const workflow = createWorkflow('data-pipeline')
.step('extract', async (ctx) => {
return await fetchData();
})
.then('transform', myAgent)
.then('load', async (ctx) => {
const data = ctx.previousResults.transform;
await saveToDatabase(data);
return { success: true };
})
.commit();
// Execute
const result = await workflow.execute({ source: 'api' });
console.log('Result:', result.stepResults);2. Integrate with Backend
// app/api/workflow/route.ts
import { handleWorkflow } from 'rodger/backend';
import { workflow } from '@/lib/workflow';
import { storage } from '@/lib/storage';
export async function POST(request: Request) {
return handleWorkflow(request, {
workflow,
storage, // Optional: for state persistence
});
}Core Concepts
Steps
Steps are the building blocks of workflows. Each step can be an agent, a function, or async operation.
const workflow = createWorkflow('example')
// Agent step
.step('analyze', analyzerAgent)
// Function step
.then('process', async (ctx) => {
return processData(ctx.input);
})
// Agent with context
.then('summarize', summarizerAgent, {
context: (ctx) => {
// Pass previous results to agent
return `Process result: ${ctx.previousResults.process}`;
}
})
.commit();Context
The workflow context carries state between steps:
const workflow = createWorkflow('example')
.step('first', async (ctx) => {
console.log('Input:', ctx.input); // Original input
console.log('Session:', ctx.sessionId);
console.log('User:', ctx.userId);
return { value: 42 };
})
.then('second', async (ctx) => {
// Access previous step results
const firstResult = ctx.previousResults.first;
console.log('Previous:', firstResult.value); // 42
// Access all completed steps
console.log('Completed:', ctx.completedSteps); // ['first']
return { doubled: firstResult.value * 2 };
})
.commit();Conditional Branching
Execute different steps based on conditions:
const workflow = createWorkflow('approval')
.step('analyze', analyzerAgent)
// Branch 1: High confidence path
.then('auto-approve', async (ctx) => {
return { approved: true };
}, {
when: (ctx) => ctx.previousResults.analyze.confidence > 0.9
})
// Branch 2: Manual review path
.then('manual-review', async (ctx) => {
await ctx.suspend(); // Pause for human review
return { approved: true };
}, {
when: (ctx) => ctx.previousResults.analyze.confidence <= 0.9
})
// Both branches converge here
.after(['auto-approve', 'manual-review'])
.step('notify', notifierAgent)
.commit();Suspend/Resume
Pause workflows for human approval or external data:
// Create workflow with suspension
const workflow = createWorkflow('approval')
.step('analyze', analyzerAgent)
.then('review', async (ctx) => {
if (ctx.previousResults.analyze.needsReview) {
// Suspend - saves snapshot if storage provided
await ctx.suspend();
}
return { reviewed: true };
})
.then('execute', executorAgent)
.commit();
// Initial execution
const result = await workflow.execute(input, { storage: workflowStorage });
if (result.status === 'suspended') {
console.log('Awaiting review:', result.executionId);
// Later, resume after approval
const resumed = await workflow.resume(result.executionId, workflowStorage);
console.log('Completed:', resumed.result);
}Complete Example: Loan Processing
import { createAgent } from 'rodger';
import { createWorkflow } from 'rodger/workflows';
// Create specialized agents for each step
const infoCollector = createAgent({
name: 'Info Collector',
llm: { provider: 'openai', model: 'gpt-4o' },
instructions: 'Collect loan application information from the user.'
});
const riskAnalyzer = createAgent({
name: 'Risk Analyzer',
llm: { provider: 'openai', model: 'gpt-4o' },
instructions: 'Analyze risk factors for the loan application.',
tools: { calculateRisk, checkCreditScore }
});
const termsGenerator = createAgent({
name: 'Terms Generator',
llm: { provider: 'openai', model: 'gpt-4o' },
instructions: 'Generate loan terms based on risk analysis.',
tools: { generateTerms }
});
// Build the workflow
const loanWorkflow = createWorkflow('loan-processing')
// Step 1: Collect information
.step('collect', infoCollector, {
retry: { attempts: 2, delay: 1000 },
timeout: 60000
})
// Step 2: Analyze risk
.then('analyze', riskAnalyzer, {
retry: { attempts: 3, delay: 500, backoff: 'exponential' }
})
// Step 3a: Auto-approve (low risk)
.then('auto-approve', termsGenerator, {
when: (ctx) => ctx.previousResults.analyze.riskScore < 0.3,
onError: async (ctx, error) => {
// Fallback to standard terms
return { interestRate: 5.5, error: 'Using standard terms' };
}
})
// Step 3b: Manual review (high risk)
.after('analyze')
.step('manual-review', async (ctx) => {
// Suspend for human approval
await ctx.suspend();
return { approved: true };
}, {
when: (ctx) => ctx.previousResults.analyze.riskScore >= 0.3
})
// Step 4: Send notification (after either path)
.after(['auto-approve', 'manual-review'])
.step('notify', async (ctx) => {
const hasTerms = ctx.previousResults['auto-approve'];
const wasReviewed = ctx.previousResults['manual-review'];
if (hasTerms) {
await sendApprovalEmail(ctx.userId, hasTerms);
} else if (wasReviewed) {
await sendReviewCompleteEmail(ctx.userId);
}
return { notified: true };
})
.commit();
// Execute the workflow
const result = await loanWorkflow.execute(
{ loanAmount: 50000, creditScore: 720 },
{ sessionId: 'session-123', userId: 'user-456' }
);
console.log('Status:', result.status);
console.log('Duration:', result.duration);
console.log('Results:', result.stepResults);Workflow Storage
Implement WorkflowStorage to enable suspend/resume across server restarts.
Interface
import type { WorkflowStorage, WorkflowSnapshot } from 'rodger/workflows';
interface WorkflowStorage {
saveSnapshot(snapshot: WorkflowSnapshot): Promise<void>;
getSnapshot(executionId: string): Promise<WorkflowSnapshot | null>;
}Redis Implementation
import { Redis } from 'ioredis';
import type { WorkflowStorage, WorkflowSnapshot } from 'rodger/workflows';
export class RedisWorkflowStorage implements WorkflowStorage {
constructor(private redis: Redis) {}
async saveSnapshot(snapshot: WorkflowSnapshot): Promise<void> {
const key = `workflow:${snapshot.executionId}`;
// 24 hour expiry - workflows auto-expire after completion
await this.redis.setex(key, 86400, JSON.stringify(snapshot));
}
async getSnapshot(executionId: string): Promise<WorkflowSnapshot | null> {
const key = `workflow:${executionId}`;
const data = await this.redis.get(key);
return data ? JSON.parse(data) : null;
}
}
// Usage
const storage = new RedisWorkflowStorage(redis);
const result = await workflow.execute(input, { storage });Supabase Implementation
import { createClient, SupabaseClient } from '@supabase/supabase-js';
import type { WorkflowStorage, WorkflowSnapshot } from 'rodger/workflows';
export class SupabaseWorkflowStorage implements WorkflowStorage {
constructor(private supabase: SupabaseClient) {}
async saveSnapshot(snapshot: WorkflowSnapshot): Promise<void> {
await this.supabase
.from('workflow_snapshots')
.upsert({
execution_id: snapshot.executionId,
workflow_id: snapshot.workflowId,
data: snapshot, // Store entire snapshot as JSONB
status: snapshot.status,
updated_at: snapshot.updatedAt,
});
}
async getSnapshot(executionId: string): Promise<WorkflowSnapshot | null> {
const { data } = await this.supabase
.from('workflow_snapshots')
.select('data')
.eq('execution_id', executionId)
.single();
return data?.data || null;
}
}
// Usage
const storage = new SupabaseWorkflowStorage(supabase);Database Schema (Supabase/Postgres)
CREATE TABLE workflow_snapshots (
execution_id TEXT PRIMARY KEY,
workflow_id TEXT NOT NULL,
data JSONB NOT NULL,
status TEXT NOT NULL,
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL
);
CREATE INDEX idx_workflow_snapshots_workflow ON workflow_snapshots(workflow_id);
CREATE INDEX idx_workflow_snapshots_status ON workflow_snapshots(status);
CREATE INDEX idx_workflow_snapshots_updated ON workflow_snapshots(updated_at);Streaming Execution
Get real-time events as workflow executes:
const workflow = createWorkflow('streaming')
.step('step1', agent1)
.then('step2', agent2)
.then('step3', agent3)
.commit();
// Stream execution events
for await (const event of workflow.executeStream(input)) {
switch (event.type) {
case 'workflow-start':
console.log('Started:', event.workflowId);
break;
case 'step-start':
console.log('Step starting:', event.stepName);
break;
case 'step-complete':
console.log('Step done:', event.stepName, event.result);
break;
case 'step-error':
console.error('Step failed:', event.stepName, event.error);
break;
case 'workflow-complete':
console.log('Workflow done:', event.result);
break;
case 'workflow-suspended':
console.log('Suspended at:', event.stepName);
console.log('Resume with:', event.executionId);
break;
}
}API Reference
createWorkflow(id: string)
Create a new workflow builder.
const workflow = createWorkflow('my-workflow')
.step('first', myAgent)
.then('second', myFunction)
.commit();.step(name, executor, options?)
Add a step to the workflow.
Parameters:
name: Unique step identifierexecutor: Agent, function, or async operationoptions: Step configuration
Options:
interface StepOptions {
// Conditional execution
when?: (ctx: WorkflowContext) => boolean | Promise<boolean>;
// Retry configuration
retry?: {
attempts: number;
delay: number;
backoff?: 'linear' | 'exponential';
};
// Timeout in milliseconds
timeout?: number;
// Error handling
onError?: (ctx: WorkflowContext, error: Error) => Promise<any>;
// Context customization (for agents)
context?: (ctx: WorkflowContext) => string;
}.then(name, executor, options?)
Add a sequential step (alias for .step()).
.after(stepNames)
Create steps that run after specific step(s) complete.
workflow
.step('step1', agent1)
.step('step2', agent2)
.after(['step1', 'step2'])
.step('step3', agent3) // Runs after both complete
.commit();.commit()
Finalize and return the executable workflow.
workflow.execute(input, options?)
Execute the workflow.
Parameters:
interface ExecutionOptions {
sessionId?: string;
userId?: string;
metadata?: Record<string, any>;
storage?: WorkflowStorage;
trace?: TraceConfig;
}Returns:
interface WorkflowResult {
workflowId: string;
executionId: string;
status: 'completed' | 'failed' | 'suspended';
result?: any;
error?: Error;
stepResults: Map<string, any>;
startedAt: Date;
completedAt: Date;
duration: number;
traces?: ExecutionTraceEntry[];
}workflow.executeStream(input, options?)
Execute with streaming events.
Returns: AsyncIterable<WorkflowStreamEvent>
workflow.resume(executionId, storage)
Resume a suspended workflow.
const result = await workflow.resume(
'exec-123',
workflowStorage
);Best Practices
1. Use Specialized Agents per Step
// ✅ Good: Focused agents
const workflow = createWorkflow('process')
.step('extract', extractorAgent)
.then('analyze', analyzerAgent)
.then('summarize', summarizerAgent)
.commit();
// ❌ Bad: One agent for everything
const workflow = createWorkflow('process')
.step('everything', generalAgent)
.commit();2. Implement Proper Error Handling
const workflow = createWorkflow('robust')
.step('critical', criticalAgent, {
retry: { attempts: 3, delay: 1000, backoff: 'exponential' },
onError: async (ctx, error) => {
// Fallback logic
await notifyAdmin(error);
return { fallback: true };
}
})
.commit();3. Use Conditional Branching
const workflow = createWorkflow('adaptive')
.step('analyze', analyzerAgent)
.then('fast-path', fastAgent, {
when: (ctx) => ctx.previousResults.analyze.simple === true
})
.then('complex-path', complexAgent, {
when: (ctx) => ctx.previousResults.analyze.simple === false
})
.commit();4. Store Workflow State for Resume
// Always provide storage for workflows that might suspend
const result = await workflow.execute(input, {
storage: workflowStorage, // Required for suspend/resume
sessionId: 'session-123',
userId: 'user-456'
});Examples
See complete examples in the examples directory:
- Loan Assistant - Multi-step loan processing with conditional approval
- Knowledge Agent - Document processing with RAG
- Tool Agent - Custom tool integration
Related Packages
- rodger - Agent framework with LLM and tools
- rodger/backend - Backend handlers including handleWorkflow
- rodger/testing - Testing utilities
License
MIT
