@loop-engine/adapter-postgres
v0.2.0
Published
PostgreSQL loop state store for durable production workloads.
Maintainers
Readme
@loop-engine/adapter-postgres
PostgreSQL storage adapter for Loop Engine - persist loop state and transitions to Postgres.
Install
npm install @loop-engine/adapter-postgres @loop-engine/sdk pgQuick Start
import { createLoopSystem } from "@loop-engine/sdk";
import {
createPool,
postgresStore,
runMigrations
} from "@loop-engine/adapter-postgres";
const pool = createPool({ connectionString: process.env.DATABASE_URL });
await runMigrations(pool);
const { engine } = await createLoopSystem({
loops: [loopDefinition],
store: postgresStore(pool)
});createPool(...) applies loop-engine-opinionated defaults; see the
"Pool configuration" section below. Consumers who want full control can
still new Pool(...) themselves and pass the result to
postgresStore(pool).
Schema migrations
The adapter ships versioned, idempotent SQL migrations and a small custom
runner. createSchema(pool) is a convenience that applies every pending
migration; runMigrations(pool) is the lower-level API that returns
structured information about which migrations were newly applied vs.
already recorded.
import { Pool } from "pg";
import { runMigrations } from "@loop-engine/adapter-postgres";
const pool = new Pool({ connectionString: process.env.DATABASE_URL });
const result = await runMigrations(pool);
console.log(result);
// { applied: ["001_schema_migrations", "002_loop_instances", "003_loop_transitions"],
// skipped: [] }Key properties:
- Idempotent: calling
runMigrationstwice on the same database is a no-op (all migrations returned underskipped). - Transactional: each migration applies inside its own transaction; a crash mid-migration leaves the database unchanged and the next run retries cleanly.
- Concurrent-safe: a Postgres advisory lock serializes concurrent callers, so two processes starting up simultaneously will not race into a duplicate-key error.
- Drift-protected: migrations are content-hashed with SHA-256; the runner refuses to proceed if an already-recorded migration's SQL has been edited on disk. To change schema, add a new migration file with a later numeric prefix.
Migrations are read from dist/migrations/sql/ at runtime. The adapter
currently ships four:
001_schema_migrations.sql— theschema_migrationstracking table used by the runner itself.002_loop_instances.sql— one row per loop aggregate; upserted byLoopStore.saveInstance.003_loop_transitions.sql— append-only transition history.004_idx_loop_instances_loop_id_status.sql— composite B-tree index onloop_instances (loop_id, status)supporting thelistOpenInstances(loopId)query path. Without this index the planner falls back to a sequential scan; the adapter's integration tests assert the plan selects this index and does not seq-scanloop_instanceson realistically-sized tables.
Supported Postgres versions: the adapter is tested against 15 and 16 and is documented to support 13+.
Pool configuration
createPool(options?) is a thin wrapper around pg.Pool that applies
four opinionated defaults. All pg.PoolConfig fields pass through
unchanged, plus a first-class statement_timeout knob.
import { createPool, DEFAULT_POOL_OPTIONS } from "@loop-engine/adapter-postgres";
// Defaults:
// {
// max: 10,
// idleTimeoutMillis: 30_000,
// connectionTimeoutMillis: 5_000,
// statement_timeout: 30_000
// }
console.log(DEFAULT_POOL_OPTIONS);
// Typical usage:
const pool = createPool({
connectionString: process.env.DATABASE_URL
});
// Override defaults:
const biggerPool = createPool({
connectionString: process.env.DATABASE_URL,
max: 25,
statement_timeout: 5_000,
options: "-c search_path=app_schema"
});Rationale per default:
max: 10— suitable for a single app instance against a standard Postgres deployment. Raise this in coordination with the server'smax_connectionswhen scaling horizontally.idleTimeoutMillis: 30_000— 30 seconds balances connection reuse under burst traffic against reclaiming slots during lulls.connectionTimeoutMillis: 5_000— 5 seconds is where pool exhaustion should fail loudly instead of silently accumulating request latency.statement_timeout: 30_000— caps worst-case query latency; runaway queries are canceled server-side rather than holding a connection hostage. Wired via the libpqoptionsconnection parameter (-c statement_timeout=N), so it applies at connection init — no per-querySETround-trip.
A consumer-supplied options string (e.g., -c search_path=...) is
preserved and the statement_timeout clause is appended alongside it.
Transactions
postgresStore(pool) returns a PostgresStore — a LoopStore plus a
withTransaction(fn) method for atomically sequencing multiple
LoopStore operations against a single pg-acquired client.
import { postgresStore } from "@loop-engine/adapter-postgres";
const store = postgresStore(pool);
await store.withTransaction(async (tx) => {
await tx.saveInstance(updatedInstance);
await tx.saveTransitionRecord(transitionRecord);
});The callback receives a TransactionClient with the same five
LoopStore methods, each routed through the transactional client.
Semantics:
- Commit on success:
fnresolves →COMMIT→ changes persist. - Rollback on error:
fnrejects →ROLLBACK→ changes discarded → original error propagates unchanged. - Isolation: Postgres's default
READ COMMITTEDapplies — writes insidefnare invisible to reads on the outer pool untilCOMMIT. - Return-value propagation:
fn's return value is thewithTransactionreturn value. - Nesting via the outer store is independent: calling
store.withTransaction(...)inside anotherstore.withTransactionacquires a second client and runs in its own transaction. To extend atomicity across nested scopes, pass the outertxto the inner operation and call its LoopStore methods instead.
TransactionClient is intentionally narrow — it exposes LoopStore
methods only, with no raw pg.PoolClient escape hatch. Consumers
needing LISTEN/NOTIFY or other non-LoopStore Postgres operations should
manage their own pg.Pool alongside the adapter's.
Error classification
The adapter exports a minimal classification surface for retry logic.
Routine pg errors pass through unchanged (including constraint
violations, data errors, access violations, etc.) — consumers who
want typed handling of specific SQLSTATE codes inspect .code
themselves.
import { classifyError, isTransientError } from "@loop-engine/adapter-postgres";
try {
await store.saveTransitionRecord(record);
} catch (err) {
if (isTransientError(err)) {
// Connection drop, server lifecycle event, or deadlock —
// retry with a fresh attempt is likely to succeed.
return retryWithBackoff(() => store.saveTransitionRecord(record));
}
throw err;
}Transient classification (retry likely to help):
- Node connection-level errors:
ECONNRESET,ECONNREFUSED,ETIMEDOUT,ENOTFOUND,EHOSTUNREACH,ENETUNREACH. - Postgres server-lifecycle SQLSTATEs:
57P01(admin_shutdown),57P02(crash_shutdown),57P03(cannot_connect_now). - Deadlocks:
40P01(deadlock_detected) — Postgres aborted one transaction to break a deadlock; the retry resolves. - pg's
Connection terminated unexpectedlystring-matched errors.
Everything else that looks like a SQLSTATE is classified "permanent"
(including constraint violations, invalid-input errors, etc.). Shapes
the classifier doesn't recognize are "unknown"; treat these as
permanent for retry-loop safety unless context argues otherwise.
TransactionIntegrityError
withTransaction throws TransactionIntegrityError (a subclass of
PostgresStoreError) when the adapter cannot confirm a definite
terminal state for the transaction:
fnthrew, and the subsequentROLLBACKalso failed. The transaction's server-side state is indeterminate.fnsucceeded, butCOMMITfailed with a connection-level error. The transaction may have been committed server-side before the connection dropped (we never received the ACK).
kind is "transient", and the original cause is preserved at
.cause. Consumers are responsible for the retry's idempotency story
— append-only writes like saveTransitionRecord should either ride
on upstream idempotency guards or use unique-constraint surfaces to
reject duplicates.
Non-indeterminate failures pass through unchanged:
fnthrew + ROLLBACK succeeded → originalfnerror (no wrap).COMMITfailed with a non-connection error (e.g., deferred constraint violation) → Postgres definitively rolled back; the commit error propagates with its SQLSTATE intact.
Documentation link
https://loopengine.io/docs/integrations/postgres
License
Apache-2.0 © Better Data, Inc.
