@saga-engine/core
v0.1.1
Published
Transactional middleware for AI agents - Saga pattern with automatic rollback
Maintainers
Readme
Saga Engine
Transactional middleware for AI agents — Ctrl+Z for the real world.
Saga Engine implements the Saga pattern for AI agents and autonomous systems. When your agent books a flight, reserves a hotel, and then fails to rent a car — Saga Engine automatically cancels the hotel and flight in reverse order.
Why Saga Engine?
AI agents are increasingly performing real-world actions: booking travel, processing payments, managing infrastructure. But what happens when step 5 of 7 fails?
Without Saga Engine: Manual cleanup, inconsistent state, frustrated users.
With Saga Engine: Automatic rollback, consistent state, reliable operations.
const result = await orchestrator.execute(bookTripSaga, {
flight: { from: 'NYC', to: 'LAX' },
hotel: { city: 'Los Angeles', nights: 3 },
car: { pickup: 'LAX' }
});
if (!result.success) {
// Flight and hotel automatically cancelled
console.log('Booking failed, all changes rolled back');
}Features
- Automatic Compensation — Define rollback logic once, executed automatically on failure
- Type-Safe — Full TypeScript support with generics for input/output types
- Pluggable Storage — In-memory for dev, Redis/Postgres for production
- Observable — Rich event system for monitoring and debugging
- Crash Recovery — Resume interrupted sagas after process restart
- Battle-Tested — 193+ tests covering edge cases and production scenarios
Installation
npm install @saga-engine/core
# or
pnpm add @saga-engine/core
# or
yarn add @saga-engine/coreQuick Start
import { Saga, SagaOrchestrator, InMemoryStore } from '@saga-engine/core';
// 1. Define your saga with steps and compensations
const orderSaga = new Saga<{ orderId: string; amount: number }>('process-order')
.step({
name: 'reserve-inventory',
execute: async (ctx) => {
const reservation = await inventoryService.reserve(ctx.input.orderId);
return { reservationId: reservation.id };
},
compensate: async (ctx, result) => {
await inventoryService.release(result.reservationId);
}
})
.step({
name: 'charge-payment',
execute: async (ctx) => {
const charge = await paymentService.charge(ctx.input.amount);
return { chargeId: charge.id };
},
compensate: async (ctx, result) => {
await paymentService.refund(result.chargeId);
}
})
.step({
name: 'send-confirmation',
execute: async (ctx) => {
await emailService.send(ctx.input.orderId, 'Order confirmed!');
return { sent: true };
}
});
// 2. Create orchestrator with storage
const orchestrator = new SagaOrchestrator({
store: new InMemoryStore()
});
// 3. Execute the saga
const result = await orchestrator.execute(orderSaga, {
orderId: 'ORD-123',
amount: 99.99
});
if (result.success) {
console.log('Order processed:', result.stepResults);
} else {
console.log('Order failed:', result.error.message);
console.log('Compensated:', result.compensated);
}Core Concepts
Sagas
A saga is a sequence of steps that form a logical transaction. Each step can have:
- execute — The action to perform
- compensate — The rollback action (optional)
const saga = new Saga<InputType>('saga-name')
.step({ name: 'step-1', execute: async (ctx) => {...}, compensate: async (ctx, result) => {...} })
.step({ name: 'step-2', execute: async (ctx) => {...} });Context
Each step receives a context with:
sagaId— Unique execution IDinput— The input provided when starting the sagastepResults— Results from previous steps (Map)
execute: async (ctx) => {
const previousResult = ctx.stepResults.get('previous-step');
return { processed: ctx.input.data };
}Compensation
When a step fails, compensation runs in reverse order for all completed steps:
Step 1: reserve-inventory ✅
Step 2: charge-payment ✅
Step 3: send-email ❌ FAILED
Compensation:
→ charge-payment.compensate() // Refund
→ reserve-inventory.compensate() // ReleaseCompensation Strategies
const orchestrator = new SagaOrchestrator({
store: new InMemoryStore(),
onCompensationFailure: 'continue', // 'retry' | 'continue' | 'halt'
compensationRetries: 3, // For 'retry' strategy
compensationRetryDelay: 1000 // Milliseconds between retries
});Events
Monitor saga execution with events:
orchestrator.on('saga:started', (state) => {
console.log(`Saga ${state.sagaName} started`);
});
orchestrator.on('step:executed', (state, stepName, result) => {
console.log(`Step ${stepName} completed:`, result);
});
orchestrator.on('step:failed', (state, stepName, error) => {
console.log(`Step ${stepName} failed:`, error.message);
});
orchestrator.on('compensation:started', (state) => {
console.log('Starting rollback...');
});
orchestrator.on('compensation:completed', (state) => {
console.log('Rollback complete');
});
orchestrator.on('saga:completed', (state) => {
console.log('Saga completed successfully');
});
orchestrator.on('saga:failed', (state, error) => {
console.log('Saga failed:', error.message);
});Storage Adapters
InMemoryStore (Development)
import { InMemoryStore } from '@saga-engine/core';
const store = new InMemoryStore();RedisStore (Production) — Coming Soon
import { RedisStore } from '@saga-engine/redis';
const store = new RedisStore({
url: 'redis://localhost:6379',
keyPrefix: 'saga:'
});PostgresStore (Enterprise) — Coming Soon
import { PostgresStore } from '@saga-engine/postgres';
const store = new PostgresStore({
connectionString: process.env.DATABASE_URL
});Custom Store
Implement the StateStore interface:
interface StateStore {
create(state: SagaState): Promise<void>;
get(sagaId: string): Promise<SagaState | undefined>;
updateStatus(sagaId: string, status: SagaStatus, completedAt?: Date): Promise<void>;
updateStep(sagaId: string, stepName: string, stepState: Partial<StepState>): Promise<void>;
getPendingSagas(): Promise<SagaState[]>;
delete?(sagaId: string): Promise<void>;
close?(): Promise<void>;
}Crash Recovery
Recover interrupted sagas on startup:
const orchestrator = new SagaOrchestrator({ store: redisStore });
// On application startup
await orchestrator.recover();Examples
E-Commerce Order Processing
const processOrderSaga = new Saga<OrderInput>('process-order')
.step({
name: 'validate-cart',
execute: async (ctx) => {
const valid = await cartService.validate(ctx.input.cartId);
if (!valid) throw new Error('Invalid cart');
return { validated: true };
}
})
.step({
name: 'reserve-inventory',
execute: async (ctx) => {
const items = await inventoryService.reserve(ctx.input.items);
return { reservedItems: items };
},
compensate: async (ctx, result) => {
await inventoryService.release(result.reservedItems);
}
})
.step({
name: 'process-payment',
execute: async (ctx) => {
const payment = await paymentService.charge({
amount: ctx.input.total,
customerId: ctx.input.customerId
});
return { paymentId: payment.id };
},
compensate: async (ctx, result) => {
await paymentService.refund(result.paymentId);
}
})
.step({
name: 'create-shipment',
execute: async (ctx) => {
const shipment = await shippingService.create({
address: ctx.input.shippingAddress,
items: ctx.stepResults.get('reserve-inventory').reservedItems
});
return { trackingNumber: shipment.tracking };
},
compensate: async (ctx, result) => {
await shippingService.cancel(result.trackingNumber);
}
});AI Agent Tool Execution
const agentTaskSaga = new Saga<AgentTask>('agent-task')
.step({
name: 'acquire-resources',
execute: async (ctx) => {
const resources = await resourceManager.acquire(ctx.input.requirements);
return { resourceIds: resources.map(r => r.id) };
},
compensate: async (ctx, result) => {
await resourceManager.release(result.resourceIds);
}
})
.step({
name: 'execute-action',
execute: async (ctx) => {
const result = await actionExecutor.run(ctx.input.action);
return { actionResult: result };
},
compensate: async (ctx, result) => {
await actionExecutor.undo(result.actionResult);
}
})
.step({
name: 'commit-changes',
execute: async (ctx) => {
await changeTracker.commit(ctx.sagaId);
return { committed: true };
}
});Integration with AI Frameworks
LangChain
import { Tool } from 'langchain/tools';
const bookingTool = new Tool({
name: 'book-travel',
description: 'Book flights and hotels with automatic rollback on failure',
func: async (input: string) => {
const params = JSON.parse(input);
const result = await orchestrator.execute(bookTripSaga, params);
return JSON.stringify(result);
}
});Vercel AI SDK
import { tool } from 'ai';
const processOrderTool = tool({
description: 'Process an order with inventory, payment, and shipping',
parameters: z.object({
orderId: z.string(),
items: z.array(z.object({ sku: z.string(), quantity: z.number() })),
customerId: z.string()
}),
execute: async (params) => {
return orchestrator.execute(orderSaga, params);
}
});API Reference
Saga
new Saga<TInput>(name: string)
.step<TResult>(definition: StepDefinition<TInput, TResult>): Saga<TInput>
.addSteps(definitions: StepDefinition[]): Saga<TInput>
.getStep(name: string): SagaStep | undefined
.hasSteps(): boolean
.steps: ReadonlyArray<SagaStep>
.stepCount: number
.name: stringSagaOrchestrator
new SagaOrchestrator(options: OrchestratorOptions & { store: StateStore })
.execute<TInput, TResult>(saga: Saga<TInput>, input: TInput): Promise<SagaResult<TResult>>
.recover(): Promise<void>
.on<K extends keyof SagaEvents>(event: K, callback: SagaEvents[K]): void
.off<K extends keyof SagaEvents>(event: K, callback: SagaEvents[K]): voidTypes
interface SagaResult<TResult> {
success: boolean;
sagaId: string;
result?: TResult; // On success
stepResults?: Map<string, unknown>;
error?: Error; // On failure
failedStep?: string;
compensated?: boolean;
compensationErrors?: Array<{ step: string; error: Error }>;
}
interface SagaContext<TInput> {
sagaId: string;
input: TInput;
stepResults: Map<string, unknown>;
}
interface StepDefinition<TInput, TResult> {
name: string;
execute: (ctx: SagaContext<TInput>) => Promise<TResult>;
compensate?: (ctx: SagaContext<TInput>, result: TResult) => Promise<void>;
}Enterprise Features
Looking for advanced features? Check out Saga Engine Cloud:
- Hosted State Management — Durable storage with global replication
- Observability Dashboard — Real-time monitoring, tracing, and analytics
- Team Collaboration — Role-based access control and audit logs
- SLA Guarantees — 99.99% uptime with dedicated support
Contact us for Enterprise pricing →
Contributing
We welcome contributions! Please see our Contributing Guide for details.
# Clone the repository
git clone https://github.com/Chetan-svg/saga-engine.git
cd saga-engine
# Install dependencies
pnpm install
# Run tests
pnpm test
# Build
pnpm buildLicense
MIT © Verto AI LLC
