@intent-driven/bridge-postgres
v0.6.0
Published
Postgres ↔ IDF bridge: CDC mirror (pg_logical → Φ event-log) и gateway-mode (Fold-runtime поверх existing БД)
Maintainers
Readme
@intent-driven/bridge-postgres
Bridge между Postgres БД и IDF Φ event-log: CDC mirror (read-only, БД primary) и gateway-mode (Fold принимает agent calls, валидирует, при confirm пишет в БД).
→ Migration guide — decision tree mirror vs gateway, step-by-step roll-out, операционные concerns. → Express + Fold gateway example — docker-compose с Postgres + API + outbox-worker.
Status
| Feature | Status |
|---------|--------|
| pgoutput decoder + createMirror skeleton | ✅ PR 5a |
| createPgLogicalMirror + idempotent setup helpers | ✅ PR 5b |
| Schema-drift detector (detectDrift) | ✅ PR 5c |
| Gateway mode + transactional outbox | ✅ PR 5d |
| Reference deployment + migration guide | ✅ PR 5e |
Gateway mode (PR 5d)
import {
ensureOutboxTable,
createGateway,
createOutboxProcessor,
} from "@intent-driven/bridge-postgres";
import pg from "pg";
const pool = new pg.Pool({ connectionString });
const setupClient = await pool.connect();
await ensureOutboxTable(setupClient);
setupClient.release();
const gateway = createGateway({ pool, ontology });
// Agent call:
const result = await gateway.execute({
intentName: "create_order",
params: { amount: 100, status: "new" },
ctx: { idempotencyKey: "uuid-...", role: { base: "agent" } },
});
// → { ok: true, result: <row>, outboxId: 42 }
// ИЛИ
// → { ok: false, code: "preapproval_denied", message, details }
// Outbox processor (отдельный worker):
const processor = createOutboxProcessor({
pool,
publish: async (row) => { await foldRuntime.publishEffect(row); },
});
await processor.start();Atomicity: business write + outbox row в одной TX. Validator reject (preapproval / invariants / irreversibility) → ничего не пишется. Idempotency через idempotency_key UNIQUE constraint (replay safe).
Mirror API (PR 5a)
import { createMirror } from "@intent-driven/bridge-postgres";
const mirror = createMirror({
stream: someAsyncIterableOfPgoutputBuffers,
async onEffect(effect, ctx) {
// effect: { alpha: "insert"|"replace"|"remove", target, value?, key?, ... }
// ctx: { transaction: { xid, finalLSN, commitTimestamp }, relation }
await foldRuntime.proposeEffect(effect);
},
async onCommit(info) {
// commit batch boundary — flush к Φ event-log
},
});
await mirror.start();
console.log(mirror.stats);
// { decoded: 1234, insert: 100, update: 80, delete: 5, relation: 3, begin: 50, commit: 50 }Mirror через pg_logical (PR 5b)
import { createPgLogicalMirror, ensurePublication, ensureSlot } from "@intent-driven/bridge-postgres";
await ensurePublication(setupClient, "idf_pub");
await ensureSlot(setupClient, "idf_mirror");
const mirror = createPgLogicalMirror({
connectionString: "postgres://repluser:pass@host/db",
slotName: "idf_mirror",
publicationName: "idf_pub",
onEffect, onCommit,
});
await mirror.start();Schema drift detection (PR 5c)
import { detectDrift, renderDriftReport } from "@intent-driven/bridge-postgres";
const result = await detectDrift({ ontology, client });
console.log(renderDriftReport(result, { sourceLabel: "production-db" }));
if (!result.ok) process.exit(1); // CI gateFiles
src/ — implementation
docs/migration-guide.md — full migration playbook
examples/express-gateway/ — docker-compose demoLicense
MIT.
