@rotorsoft/act-pg
v1.0.0
Published
act pg adapters
Maintainers
Readme
@rotorsoft/act-pg
PostgreSQL event store for @rotorsoft/act. ACID, connection-pooled, multi-process — production default for Act deployments. Lane-aware claim/ack via streams.lane + streams_lane_ix since v0.25.0 (ACT-1103).
Why this package
Act's in-memory store is fine for development and tests, but production needs durable events, cross-process coordination, and a query path that scales past a single Node process. PostgresStore is the canonical production implementation of Act's Store port: full ACID guarantees from PG, atomic stream claiming via FOR UPDATE SKIP LOCKED (no application-layer locking required), optional LISTEN/NOTIFY for sub-poll cross-process wakeup, and auto-managed schema via seed().
The adapter passes the same conformance suite (@rotorsoft/act-tck) as InMemoryStore and SqliteStore, so swapping it in is a one-line bootstrap change.
Installation
pnpm add @rotorsoft/act @rotorsoft/act-pgQuick start
import { act, state, store } from "@rotorsoft/act";
import { PostgresStore } from "@rotorsoft/act-pg";
import { z } from "zod";
store(new PostgresStore({
host: "localhost",
port: 5432,
database: "myapp",
user: "postgres",
password: "secret",
}));
// One-time schema setup (idempotent — safe to leave in your bootstrap).
await store().seed();
// From here, the framework is identical to the InMemory version.
const Counter = state({ Counter: z.object({ count: z.number() }) })
.init(() => ({ count: 0 }))
.emits({ Incremented: z.object({ amount: z.number() }) })
.patch({ Incremented: ({ data }, s) => ({ count: s.count + data.amount }) })
.on({ increment: z.object({ by: z.number() }) })
.emit((a) => ["Incremented", { amount: a.by }])
.build();
const app = act().withState(Counter).build();
await app.do("increment", { stream: "c1", actor: { id: "1", name: "u" } }, { by: 1 });API
PostgresStore— class implementing Act'sStoreport. Construct once, pass tostore().PostgresConfig— constructor options (host/port/db/user/password/schema/table/notify).
Full type reference: typedoc.
Configuration
All fields are optional and have sensible defaults:
| Option | Default | Description |
|---|---|---|
| host | localhost | PostgreSQL host |
| port | 5432 | PostgreSQL port |
| database | postgres | Database name |
| user | postgres | Database user |
| password | postgres | Database password |
| schema | public | Schema for event + streams tables |
| table | events | Base name (<table> for events, <table>_streams for subscriptions) |
| notify | false | Opt-in LISTEN/NOTIFY for cross-process commit wakeup (see below) |
| max, idleTimeoutMillis, …pg.PoolConfig | (pg defaults) | Pass-through to node-postgres pool config |
// Production deployment via env vars
store(new PostgresStore({
host: process.env.DB_HOST,
port: Number(process.env.DB_PORT ?? 5432),
database: process.env.DB_NAME,
user: process.env.DB_USER,
password: process.env.DB_PASSWORD,
schema: process.env.DB_SCHEMA ?? "public",
max: 20, // pool size — raise for drain-heavy workloads
}));Multi-tenant deployments often want one schema per tenant. The store accepts both — use them rather than namespacing stream IDs.
Common patterns
Cross-process LISTEN/NOTIFY (opt-in)
For multi-instance deployments, PostgresStore implements the optional Store.notify hook via LISTEN/NOTIFY so the orchestrator wakes settle() immediately on commits from other processes — no polling delay. Off by default to keep single-instance deployments allocation-free; enable on every store instance in a multi-process app:
const config = { schema: "myapp", table: "events", notify: true };
// Worker A (writer)
store(new PostgresStore(config));
const app = act().withState(Order).build();
await app.do("placeOrder", { stream: "order-1", actor }, payload);
// Worker B (reactions, separate process)
store(new PostgresStore(config));
const app = act()
.withState(Order)
.on("OrderPlaced").do(reduceInventory).to("inventory-1")
.build();
// Worker B wakes within ~10ms of Worker A's commit (vs. ≥ poll interval).
app.on("notified", (n) => sse.broadcast(n)); // optional fan-outWhen notify: true: commit() issues one NOTIFY act_commit_<schema>_<table> per transaction with the full event batch as JSON. The store self-filters its own commits (per-instance UUID), so the "notified" lifecycle event surfaces only cross-process activity. Size your pool to account for one extra dedicated LISTEN client per process.
notify is a hint, not a contract — lost notifications fall back to the existing debounce/poll path. Correctness is preserved.
Build-time contract: call store(adapter) before act()…build(). The orchestrator binds notify to whichever store is current at construction time.
Competing consumer (free horizontal scaling)
claim() uses FOR UPDATE SKIP LOCKED — the idiomatic Postgres competing-consumer pattern. Workers never block each other; locked rows are silently skipped. Same approach as pgBoss and Graphile Worker.
Add a second pod, run the same Act app — drain workload splits with zero application-layer coordination. No external job queue, no Redis lock.
Schema setup
await store().seed();Idempotent. Creates the events table, the streams (subscription) table, and the indexes that support the claim and notify paths. Safe to leave in your bootstrap. The store transparently runs ADD COLUMN IF NOT EXISTS migrations for new optional columns (e.g. priority for priority lanes), so existing deployments upgrade in place.
Database schema reference
Created by seed():
- Events (
{schema}.{table}):id(serial PK),name,data(jsonb),stream,version,created(timestamptz),meta(jsonb). Unique index on(stream, version). - Streams (
{schema}.{table}_streams):stream(PK),source,at,retry,blocked,error,leased_by,leased_until,priority. Composite index on(blocked, priority DESC, at)for the saturated-claim ordering.
When to use this vs act-sqlite
| You want… | Use |
|---|---|
| Multi-server deployment, distributed processing | act-pg |
| Sub-poll cross-process reaction latency | act-pg (with notify: true) |
| Embedded / single-server / edge | act-sqlite |
| Zero-config local dev / tests | The default InMemoryStore |
Both adapters pass the same conformance suite — your application code doesn't change.
Compatibility
- Node: >=22.18.0
- PostgreSQL: >=14 (uses
FOR UPDATE SKIP LOCKED,LISTEN/NOTIFY, JSONB) - Peer:
@rotorsoft/act>=0.39.0,zod^4.4.3 - Bundled deps:
pg^8.20.0 - Module formats: ESM + CJS
Stability
Public API governed by the Act Stability Charter. PostgresStore implements the Store contract from @rotorsoft/act and is validated against @rotorsoft/act-tck across PostgreSQL 14/15/16/17 in CI. Charter is in effect as of 1.0.0; the milestone tracker is milestone 1.0.
Related packages
- @rotorsoft/act — the framework whose
Storeport this implements. - @rotorsoft/act-sqlite — sibling store adapter for single-node / edge deployments.
- @rotorsoft/act-tck — conformance suite.
PostgresStorepassesrunStoreTckwithcapabilities: { notify: true }. - @rotorsoft/act-pino — pino logger adapter, common pairing for production deployments.
Documentation
- Production checklist — operator-facing guide for taking an Act app to production with this store.
- Cross-process reactions — when to enable
notify, what the latency looks like. - Concurrency model — lease lifecycle,
claim/ack/block/timeout, optimistic concurrency. - Writing a custom Store adapter — the recipe
PostgresStoreitself follows, for authors building against other databases. - PERFORMANCE.md — measured throughput numbers, including the
notifylatency benchmark.
License
MIT
