transactional-ai
v0.2.2
Published
A reliability protocol for AI Agents. Saga pattern with persistent rollback.
Maintainers
Readme
Transactional AI
A reliability protocol for AI Agents. Implement the Saga Pattern with persistent rollback and state recovery for Long-Running Machine (LLM) operations.
Why use this?
AI Agents are flaky. Steps fail, APIs time out, and hallucinations happen.
transactional-ai gives you:
- Automatic Rollbacks: If step 3 fails, steps 2 and 1 are compensated (undone) automatically.
- Concurrency Safety: Distributed locking prevents race conditions when scaling workers.
- Persistence: Transactions survive process crashes using Redis, Postgres, or File storage.
- Resilience: Built-in retry policies for flaky LLM APIs.
Installation
npm install transactional-aiQuick Start
1. The "Litmus Test" (Basic Usage)
Define a transaction where every action has a compensating rollback action.
import { Transaction } from "transactional-ai";
// 1. Create a named transaction (required for resumability)
const agent = new Transaction("user-onboarding-123");
agent
.step({
name: "create-file",
execute: async (ctx) => {
// Do the work
const file = await googleDrive.createFile("report.txt");
return file.id;
},
compensate: async (fileId) => {
// Undo the work
await googleDrive.deleteFile(fileId);
},
})
.step({
name: "email-report",
execute: async (ctx) => {
// Use previous results via context or external state
await emailService.send(ctx.result);
},
compensate: async () => {
await emailService.recallLast();
},
});
// 2. Run it
await agent.run({ initialData: "foo" });Adding Persistence (Redis)
To survive process crashes, simply provide a storage adapter.
import { Transaction, RedisStorage } from "transactional-ai";
const storage = new RedisStorage("redis://localhost:6379");
const agent = new Transaction("workflow-id-555", storage);
// If the process crashes here, running this code again
// will automatically SKIP completed steps and resume at the failure point.
agent
.step({
/* ... */
})
.step({
/* ... */
});
await agent.run();Enterprise Features (v0.2.0)
1. Distributed Locking (Redis)
Prevent race conditions when running multiple agent workers on the same Transaction ID.
import { Transaction, RedisStorage, RedisLock } from 'transactional-ai';
const connection = 'redis://localhost:6379';
const storage = new RedisStorage(connection);
const lock = new RedisLock(connection);
const agent = new Transaction('tx-unique-id', storage, {
lock: lock,
lockTTL: 30000 // Hold lock for 30s
});
// Safe to run across multiple processes/servers
await agent.run();2. Postgres Storage (SQL)
Use Postgres for strict ACID compliance and auditability.
Prerequisite: Run schema.sql in your database.
import { Transaction, PostgresStorage } from 'transactional-ai';
const storage = new PostgresStorage('postgresql://user:pass@localhost:5432/mydb');
const agent = new Transaction('tx-id', storage);Setting Up Postgres
Before using PostgresStorage, create the required table in your database:
Option 1: Run the provided schema
# From project root
psql -U your_user -d your_database -f schema.sqlOption 2: Copy-paste this SQL
CREATE TABLE IF NOT EXISTS transactions (
id VARCHAR(255) PRIMARY KEY,
status VARCHAR(50) NOT NULL,
step_stack JSONB NOT NULL,
created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX IF NOT EXISTS idx_transactions_status ON transactions(status);Important: The table must exist before running your first transaction. It will NOT be auto-created.
3. Retry Policies
Handle flaky APIs (e.g., OpenAI 500 errors) automatically without failing the transaction.
agent.step({
name: 'generate-text',
execute: async () => await openai.createCompletion(...),
compensate: async () => { /* ... */ },
retry: {
attempts: 3, // Try 3 times
backoffMs: 1000 // Wait 1s between attempts
}
});4. Step Timeouts (NEW in v0.2.1)
Prevent steps from hanging indefinitely with configurable timeouts.
await tx.step('call-external-api', {
do: async () => await fetch('https://api.example.com'),
undo: async () => {},
timeout: 30000 // Kill after 30 seconds
});5. Observability Hooks (NEW in v0.2.1)
Integrate with your logging, metrics, and alerting systems using event hooks.
import { Transaction, TransactionEvents } from 'transactional-ai';
const events: TransactionEvents = {
onStepComplete: (stepName, result, durationMs) => {
logger.info(`Step ${stepName} completed in ${durationMs}ms`);
metrics.recordStepDuration(stepName, durationMs);
},
onStepFailed: (stepName, error, attempt) => {
logger.error(`Step ${stepName} failed:`, error);
alerting.sendAlert(`Step failure: ${stepName}`);
},
onStepTimeout: (stepName, timeoutMs) => {
alerting.sendCriticalAlert(`Step ${stepName} timed out after ${timeoutMs}ms`);
}
};
const tx = new Transaction('workflow-123', storage, { events });Available events:
onTransactionStart,onTransactionComplete,onTransactionFailedonStepStart,onStepComplete,onStepFailed,onStepRetry,onStepSkipped,onStepTimeoutonCompensationStart,onCompensationComplete,onCompensationFailed
See full example: npm run example:observability
CLI Inspector
You don't need a complex dashboard to see what your agents are doing. Use the included CLI to inspect transaction logs directly from your terminal.
# Inspect a specific transaction ID (File Storage)
npx tai-inspect workflow-id-555
# Inspect using Redis
export REDIS_URL="redis://localhost:6379"
npx tai-inspect workflow-id-555Output:
🔍 Inspecting: workflow-id-555
Source: RedisStorage
STEP NAME | STATUS
------------------------------------
├── create-file | ✅ completed
│ └-> Result: "file_xyz123"
├── email-report | ❌ failed
└── (comp) create-f..| ✅ completedAdvanced Usage
Audit Mode (Governance)
By default, logs are cleared upon success to save storage space. To keep a permanent audit trail for compliance (e.g., "Why did the agent do this?"), enable Audit Mode:
const agent = new Transaction("id", storage, {
cleanupOnSuccess: false,
});Manual Rollbacks
The library handles rollbacks automatically on error. You can trigger them manually by throwing an error inside any step:
// Define a step that throws an error to trigger rollback
agent.step({
name: 'check-balance',
execute: async (ctx) => {
const balance = await getBalance(ctx.userId);
if (balance < 10) {
// Throwing an error automatically triggers the compensation
// for all previous steps.
throw new Error("Insufficient funds");
}
},
compensate: async () => {
// No compensation needed for a read-only check
}
});Testing Utilities (NEW in v0.2.1)
Write fast, isolated tests for your transaction workflows with built-in testing utilities.
import { Transaction, MemoryStorage, MockLock, createEventSpy } from 'transactional-ai';
describe('My Workflow', () => {
test('Should complete successfully', async () => {
// Use in-memory storage (no files/Redis needed)
const storage = new MemoryStorage();
const lock = new MockLock();
const eventSpy = createEventSpy();
const tx = new Transaction('test-123', storage, {
lock,
events: eventSpy.events
});
await tx.run(async (t) => {
await t.step('step-1', {
do: async () => 'result',
undo: async () => {}
});
});
// Verify state
const state = await storage.load('test-123');
expect(state).toHaveLength(1);
expect(state[0].status).toBe('completed');
// Verify events
expect(eventSpy.wasCalled('onStepComplete')).toBe(true);
expect(eventSpy.getCallCount('onStepStart')).toBe(1);
});
});Testing utilities:
MemoryStorage- Fast in-memory storage for testsMockLock- Simulated distributed lockcreateEventSpy()- Track event emissions
Roadmap
[x] Core Saga Engine (Execute/Compensate)
[x] Persistence Adapters (File, Redis)
[x] Resumability (Skip completed steps)
[x] CLI Inspector (tai-inspect)
[x] Concurrent Transaction Locking
[x] Postgres/SQL Storage Adapter
[x] Step Retry Policies
[x] Step Timeouts (v0.2.1)
[x] Observability Event Hooks (v0.2.1)
[x] Testing Utilities (v0.2.1)
License
MIT
