@equationalapplications/prisma-outbox
v4.9.0
Published
Prisma adapter for the expo-llm-wiki transactional outbox pattern.
Downloads
65
Readme
@equationalapplications/prisma-outbox
Prisma adapter for the expo-llm-wiki transactional outbox pattern.
Polls the SQLite outbox table written by @equationalapplications/core-llm-wiki and syncs events to your Prisma-backed system inside a Prisma transaction, with configurable batch size, poll interval, error handling, and a concurrency guard.
Installation
npm install @equationalapplications/prisma-outbox
# peer deps
npm install @equationalapplications/core-llm-wiki @prisma/clientQuick start
import { PrismaOutboxWorker } from '@equationalapplications/prisma-outbox';
import { WikiMemory } from '@equationalapplications/core-llm-wiki';
import { PrismaClient } from '@prisma/client';
const wiki = new WikiMemory(db, {
llmProvider,
config: { enableOutbox: true },
});
await wiki.setup();
const prisma = new PrismaClient();
const worker = new PrismaOutboxWorker({
wikiMemory: wiki,
prisma,
mapEvent: async (event, tx) => {
// mapEvent must be idempotent: at-least-once delivery means the same event
// can be retried if acknowledgement fails after the Prisma transaction commits.
if (event.operation === 'INSERT' && event.table_name.includes('entries')) {
await tx.wikiEntry.upsert({
where: { id: event.record_id },
create: { id: event.record_id, ...(event.payload as Record<string, unknown>) },
update: {},
});
}
},
pollIntervalMs: 5000,
batchSize: 100,
onError: (err, event) => {
console.error('Outbox event failed', event.id, err);
return false; // halt to preserve ordering; return true to skip poison-pill
},
});
worker.start();
// On shutdown:
worker.stop();API
PrismaOutboxWorker
| Method | Description |
|--------|-------------|
| start() | Begins polling on the configured interval. Idempotent. |
| stop() | Clears the poll interval and any pending backlog timeout. |
| syncBatch() | Manually trigger one poll cycle (useful for testing). |
PrismaOutboxConfig
| Field | Type | Default | Description |
|-------|------|---------|-------------|
| wikiMemory | WikiMemory | required | The WikiMemory instance to poll. |
| prisma | PrismaLike<TTx> | required | Any Prisma client with a $transaction method (your generated PrismaClient satisfies this). |
| mapEvent | (event, tx: TTx) => Promise<void> | required | Maps one outbox event to Prisma operations inside a transaction. tx is inferred from your PrismaClient. |
| batchSize | number | 100 | Max events fetched per cycle. |
| pollIntervalMs | number | 5000 | Milliseconds between poll cycles. |
| onError | (err, event) => boolean \| undefined | — | Return true to skip a failing event; false/undefined to halt. |
| onWorkerError | (err: Error) => void | — | Called for worker-level errors (SQLite read/ack failures) not delivered to onError. |
How it works
- Every
WikiMemorymutation (whenenableOutbox: true) atomically writes an event to the SQLiteoutboxtable in the same transaction as the domain write. PrismaOutboxWorkerpollsgetUnprocessedOutboxEvents()and callsmapEventinside a Prisma transaction for each event.- Successfully processed event IDs are passed to
markOutboxEventsProcessed(), which deletes them from SQLite. - If a full batch is consumed without error, an immediate follow-up cycle runs (backlog optimization) to drain queues faster than the poll interval.
Limitations
- Single-instance only. The worker does not use row-level locking or leases. Running two
PrismaOutboxWorkerinstances against the same SQLite file will cause duplicate Prisma writes. Run exactly one worker per SQLite database.mapEventmust still be idempotent to tolerate at-least-once delivery (acknowledgement can fail after a successful Prisma commit).
