@fuzzy-street/saga
v0.0.0
Published
Zero-infrastructure durable execution library for distributed workflows
Maintainers
Readme
📚🐻 Sagas
A lightweight, fully type-safe, minimum-dependency library for building durable workflows that survive crashes, coordinate across services, and handle real-world timing.
Complete with:
- Automatic compensation on failure
- Event-driven coordination
- Multi-day workflows
- Scheduled checkups
- Out-of-order event handling
- Storage-agnostic persistence
It's a fuzzy approach at reliable orchestration, with it we can craft distributed systems that handle failure gracefully without the complexity/tax of heavyweight workflow engines.
🔍 Overview
This wee library aims to provide an elegant solution for coordinating multi-step workflows across services, databases, and external APIs in TypeScript applications. It addresses the common problems of building reliable distributed systems while maintaining full type safety and simplicity.
Like with similar heavyweight workflow engines (Temporal, Akka, AWS Step Functions, Inngest, Trigger) this library provides us with powerful primatives to enable the following:
- Durable execution that survives process crashes and restarts
- Automatic rollback when steps fail (compensation in reverse order)
- Real-world waits for webhooks, user actions, or time-based events
- Event buffering to handle events that arrive before you're ready
- Scheduled checkups for periodic status checks or reminders
- Storage flexibility to run anywhere (Redis, SQL, filesystem, memory) using unstorage
- Zero infrastructure - just a library you drop into your existing services, serverless functions, or containers, you bring your own storage adapter.
✨ Features
🧙♂️ Type-Safe Context - Your workflow context evolves with full TypeScript inference through each step
🔄 Automatic Compensation - Steps define rollback logic that execute in reverse order on failure scenarios, ensuring consistency throughout the workflow
⏸️ Durable Waiting - Pause and Park your workflow for hours or days waiting for webhooks, user input, or scheduled events
📬 Event Buffering - Events can arrive before you're ready for them, they'll be waiting in the buffer when your step executes
⏰ Scheduled Checkups - Wake your saga at T+anytime (T+2h, T+24h, T+7d) for status checks, reminders, or escalations
🎯 Correlation Keys - Route events to waiting sagas with really flexible key matching
💾 Storage Agnostic - Leveraging
unstoragewe can use Redis for speed, or our own PostgreSQL layer for durability, filesystem for development, or memory for tests. We have in effect, uncoupled the storage layer from the execution context.🔁 Retry Logic - Configure things like exponential backoff and max attempts per step with complete circuit breaker support!
⚡ Concurrency Safe - Optimistic locking with version numbers prevents race conditions when working across distributed workers
🪝 Rich Hooks - Observe every step start, completion, error, and compensation for metrics, logging, observability and anything else you might need. Just hook into any part of the saga lifecycle
🔍 Query & Inspect - Find sagas by status, labels, or correlation keys; inspect their current state and history
🧹 Background Maintenance - A specialised Janitor handles checkups, timeouts, and cleanup automatically
💻 Developer Experience - A Simple API, clear patterns, runs anywhere Node.js runs
🤏 Minimal Dependencies - Just one peer dependency (unstorage) for storage abstraction, and one for errors, everything else is built-in
💚 Universal Compatibility - Works in Node.js, serverless functions, containers, or wherever TypeScript runs
🪖 Battle-Tested - A really comprehensive test suite with real-world examples
📦 Installation
# npm
npm install @fuzzy-street/saga @fuzzy-street/results @fuzzy-street/errors unstorage
# pnpm
pnpm add @fuzzy-street/saga @fuzzy-street/results @fuzzy-street/errors unstorage
# yarn
yarn add @fuzzy-street/saga @fuzzy-street/results @fuzzy-street/errors unstorage
# Or use brace expansion (bash/zsh)
package-manager install @fuzzy-street/{saga,results,errors} unstorage🔍 Core Concepts
Sagas orchestrate multi-step workflows where each step can succeed or fail. When a step fails, the saga automatically compensates (rolls back) all previous steps in reverse order. When a step needs to wait for an external event, the saga parks itself and can be resumed later - even days later.
The Pattern
// Define workflow steps
Step 1: Reserve inventory (if fails → nothing to compensate)
Step 2: Charge payment (if fails → release inventory)
Step 3: Create shipment (if fails → refund payment, release inventory)
Step 4: Wait for delivery (if fails → cancel shipment, refund, release)
Step 5: Send receipt (if fails → all of the above)
// Each step knows how to undo itself
// Compensation runs in reverse: 4, 3, 2, 1This allows you to:
- Coordinate distributed operations with automatic cleanup
- Handle failures gracefully without manual intervention
- Wait for real-world events (webhooks, user actions, time delays)
- Build complex workflows from simple, composable steps
- Ensure consistency across services without distributed transactions
🚀 Quick Start
import { createSagaManager, step, wait, checkup } from '@fuzzy-street/saga';
import { fromPromise } from '@fuzzy-street/results';
import memoryDriver from 'unstorage/drivers/memory';
// Create manager (use redisDriver in production)
const manager = createSagaManager(memoryDriver(), 'orders');
// Define workflow
manager.register({
name: 'fulfill-order',
steps: [
step({
id: 'reserve-inventory',
execute: async (ctx) => {
const result = await fromPromise(inventory.reserve(ctx.items))();
// We pass through errors to allow the supervisor to handle
if (result.status === 'error') return result;
ctx.reservationId = result.data.id;
return result;
},
compensate: async (result, ctx) => {
await inventory.release(ctx.reservationId);
return { status: 'success', data: undefined };
},
retry: { maxAttempts: 3, backoffMs: 1000 }
}),
step({
id: 'charge-payment',
execute: async (ctx) => {
const result = await fromPromise(payment.charge(ctx.total))();
if (result.status === 'error') return result;
ctx.transactionId = result.data.id;
return result;
},
compensate: async (result, ctx) => {
await payment.refund(ctx.transactionId);
return { status: 'success', data: undefined };
}
}),
step({
id: 'wait-for-delivery',
execute: async (ctx, eventBuffer) => {
// Check if already delivered
const delivered = eventBuffer.find(e => e.type === 'delivery.confirmed');
if (delivered) return { status: 'success', data: delivered.payload };
// Park and wait
return wait({
eventTypes: ['delivery.confirmed'],
correlationKeys: [`tracking:${ctx.trackingNumber}`],
timeoutMs: 14 * 24 * 60 * 60 * 1000, // 14 days
checkups: [
checkup('check-status', 24 * 60 * 60 * 1000) // Daily check
]
});
}
})
]
});
// Start a workflow
await manager.start('fulfill-order', 'order_123', {
items: [{ id: 'prod_1', qty: 2 }],
total: 99.99
});
// Resume from webhook (days later, different process, whatever)
await manager.sendEvent('tracking:1Z999', 'delivery.confirmed', {
deliveredAt: Date.now()
});📚 Key Patterns
1. Steps with Compensation
Every step that modifies state should provide compensation logic:
step({
id: 'create-user-account',
execute: async (ctx) => {
const user = await db.createUser(ctx.email);
ctx.userId = user.id;
return { status: 'success', data: user };
},
compensate: async (result, ctx) => {
await db.deleteUser(ctx.userId);
return { status: 'success', data: undefined };
}
})2. Waiting States
Workflows can park and wait for external events:
step({
id: 'wait-for-approval',
execute: async (ctx, eventBuffer) => {
// Check if already approved using the `eventBuffer`-its like a mailbox, but not.
const approval = eventBuffer.find(e => e.type === 'approval.granted');
if (approval) {
return { status: 'success', data: approval.payload };
}
// Still waiting
return wait({
eventTypes: ['approval.granted', 'approval.denied'],
correlationKeys: [`request:${ctx.requestId}`],
timeoutMs: 48 * 60 * 60 * 1000 // 48 hours
});
}
})3. Scheduled Checkups
Wake your saga at specific intervals to take action:
step({
id: 'monitor-delivery',
execute: async (ctx, eventBuffer) => {
// Track waiting time
if (!ctx.waitStartedAt) {
ctx.waitStartedAt = Date.now();
ctx.checkCount = 0;
}
// Take action based on elapsed time
const elapsed = Date.now() - ctx.waitStartedAt;
const hours = elapsed / (60 * 60 * 1000);
if (hours >= 24 && ctx.checkCount === 0) {
await sendStatusUpdate(ctx.trackingNumber);
ctx.checkCount = 1;
}
// Continue waiting
return wait({
eventTypes: ['delivery.confirmed'],
correlationKeys: [`tracking:${ctx.trackingNumber}`],
checkups: [
checkup('status-check', 24 * 60 * 60 * 1000), // Day 1
checkup('status-check', 7 * 24 * 60 * 60 * 1000) // Day 7
]
});
}
})4. Context Evolution
Your context accumulates data as it flows through steps:
interface OrderContext {
orderId: string;
items: Item[];
// Populated by step 1
reservationId?: string;
// Populated by step 2
transactionId?: string;
// Populated by step 3
trackingNumber?: string;
}5. Event Correlation
Route events to waiting sagas with flexible keys:
// Saga waits with multiple correlation keys
return wait({
eventTypes: ['payment.completed'],
correlationKeys: [
`order:${ctx.orderId}`,
`user:${ctx.userId}`,
`transaction:${ctx.transactionId}`
]
});
// Event can match any of them
await manager.sendEvent('order:123', 'payment.completed', {...});
// OR
await manager.sendEvent('transaction:txn_456', 'payment.completed', {...});🗄️ Storage Options
Development (Memory)
import memoryDriver from 'unstorage/drivers/memory';
const manager = createSagaManager(memoryDriver(), 'my-app');Zero setup, perfect for local development and testing.
Production (Redis)
import redisDriver from 'unstorage/drivers/redis';
const manager = createSagaManager(
redisDriver({ url: process.env.REDIS_URL }),
'my-app'
);Fast, atomic operations, automatic expiration via TTL.
Docker quick start:
docker run -d -p 6379:6379 redis:apline🎯 Real-World Examples
We've included examples that demonstrate conceptual patterns:
Run them:
npx tsx examples/*All examples use memory storage by default. Swap in Redis or any other unstorage driver for production.
📐 API Reference
SagaManager
const manager = createSagaManager(driver, namespace);
// Register saga configuration
manager.register<TContext>({
name: string,
labels?: Record<string, string>,
timeoutMs?: number,
ttl?: { execution: number, retention: number },
hooks?: {
onStepStart?: (stepId, ctx) => void,
onStepComplete?: (stepId, result, ctx) => void,
onStepError?: (stepId, error, ctx) => void,
onComplete?: (ctx) => void,
onFailed?: (error, ctx) => void,
},
steps: StepDefinition<TContext>[]
})
// Start new saga
await manager.start(name, id, context)
// Send event to waiting saga
await manager.sendEvent(correlationKey, eventType, payload)
// Query sagas
await manager.query({ status, labels })
// Get saga state
await manager.get(id)
// Cancel saga
await manager.cancel(id)
// Replay/retry saga
await manager.replay(id, { fromStep?, updateContext? })
// Start background maintenance
manager.startJanitor({ checkupInterval, cleanupInterval })Step Definition
step<TContext>({
id: string,
execute: (ctx: TContext, eventBuffer: SagaEvent[], signal: AbortSignal) =>
Promise<Result<any> | WaitingResult>,
compensate?: (result: any, ctx: TContext) => Promise<Result<void>>,
retry?: {
maxAttempts: number,
backoffMs: number,
backoffMultiplier?: number
},
timeoutMs?: number,
circuitBreaker?: {
failureThreshold: number,
resetTimeoutMs: number,
onOpen?: () => void,
onClose?: () => void
}
})Helpers
// Create waiting result
wait({
eventTypes: string[],
correlationKeys: string[],
timeoutMs?: number,
checkups?: Checkup[]
})
// Create checkup
checkup(handler: string, afterMs: number, metadata?: Record<string, any>)🌟 Advanced Patterns
Circuit Breaker
step({
id: 'call-flaky-api',
execute: async (ctx) => {
const result = await fromPromise(flakyApi.call());
return result;
},
circuitBreaker: {
failureThreshold: 5,
resetTimeoutMs: 60000,
onOpen: () => console.log('Circuit opened'),
onClose: () => console.log('Circuit closed')
}
})Parallel Steps
step({
id: 'notify-users',
parallel: [
{ id: 'email', execute: async (ctx) => await sendEmail(ctx) },
{ id: 'sms', execute: async (ctx) => await sendSMS(ctx) },
{ id: 'push', execute: async (ctx) => await sendPush(ctx) }
]
})Conditional Execution
step({
id: 'premium-feature',
condition: async (ctx) => ctx.user.tier === 'premium',
execute: async (ctx) => {
// Only runs for premium users
}
})Schema Versioning
manager.register({
name: 'my-saga',
version: 2,
migrate: async (state, fromVersion) => {
if (fromVersion === 1) {
state.context.newField = 'default';
}
return state;
},
steps: [...]
})🧪 Testing
# Run all tests
pnpm test
# Run specific suite
pnpm run test:engine
pnpm run test:storage
pnpm run test:integration
# Watch mode
pnpm run test:watch🎓 When to Use This
✅ Good fit:
- Multi-step business processes
- Cross-service coordination
- Workflows with waiting states
- Operations requiring rollback on failure
- Event-driven architectures
- Long-running tasks (hours/days)
❌ Not a good fit:
- Simple CRUD operations
- Real-time streaming
- High-frequency state machines (use in-process FSM like xState or robots)
- Operations requiring strict serializability (use database transactions)
Known Limitations:
- ⚠️ Memory driver not production-safe - Race conditions under concurrent load due to lack of atomic CAS
- ⚠️ Single-node only - No distributed coordination
- ⚠️ No production storage drivers (Postgres/Redis) yet
- ⚠️ Event store lacks incremental snapshots (full state copy)
- ⚠️ Janitor doesn't scale horizontally
Use Case: Suitable for development, testing, and low-concurrency production workloads (<10 concurrent sagas). Not recommended for high-throughput production systems until v1.0.
🤝 Contributing
Contributions are always welcome! This library stands on the shoulders of giants - inspired by the saga pattern from academic research, workflow engines like Temporal and Inngest, and the many developers who've built distributed systems before us.
The goal is to keep it:
- Minimal - No unnecessary complexity
- Portable - Runs anywhere TypeScript runs
- Extensible - Easy to add storage adapters and patterns
- Type-safe - Leveraging TypeScript's full power
- Infra-agnostic - Not to couple or introduce infrastructure tax
I wish to state my thanks to the hundreds of speakers on YouTube on this subject matter. Such critical and valued material to the creating and the education required for this particular undertaking.
Remember to stay fuzzy friends
💚
📜 License
MIT
