@rotorsoft/act-pg
v0.13.1
Published
act pg adapters
Readme
@rotorsoft/act-pg
PostgreSQL event store adapter for @rotorsoft/act. Provides persistent, production-ready event storage with ACID guarantees, connection pooling, and distributed stream processing.
Installation
npm install @rotorsoft/act @rotorsoft/act-pg
# or
pnpm add @rotorsoft/act @rotorsoft/act-pgRequirements: Node.js >= 22.18.0, PostgreSQL >= 14
Usage
import { act, state, store } from "@rotorsoft/act";
import { PostgresStore } from "@rotorsoft/act-pg";
import { z } from "zod";
// Inject the PostgreSQL store before building your app
store(new PostgresStore({
host: "localhost",
port: 5432,
database: "myapp",
user: "postgres",
password: "secret",
}));
// Initialize tables (creates schema, events table, streams table, and indexes)
await store().seed();
// Build and use your app as normal
const Counter = state({ Counter: z.object({ count: z.number() }) })
.init(() => ({ count: 0 }))
.emits({ Incremented: z.object({ amount: z.number() }) })
.patch({ Incremented: ({ data }, s) => ({ count: s.count + data.amount }) }) // optional — only for custom reducers
.on({ increment: z.object({ by: z.number() }) })
.emit((action) => ["Incremented", { amount: action.by }])
.build();
const app = act().withState(Counter).build();
await app.do("increment", { stream: "counter1", actor: { id: "1", name: "User" } }, { by: 1 });Configuration
All configuration fields are optional and have sensible defaults:
| Option | Default | Description |
|--------|---------|-------------|
| host | localhost | PostgreSQL host |
| port | 5432 | PostgreSQL port |
| database | postgres | Database name |
| user | postgres | Database user |
| password | postgres | Database password |
| schema | public | Schema for event tables |
| table | events | Base name for event tables |
Custom Schema and Table Names
const pgStore = new PostgresStore({
host: "db.example.com",
database: "production",
user: "app_user",
password: process.env.DB_PASSWORD,
schema: "events", // custom schema
table: "act_events", // creates act_events and act_events_streams tables
});Environment-Based Configuration
if (process.env.NODE_ENV === "production") {
store(new PostgresStore({
host: process.env.DB_HOST,
port: parseInt(process.env.DB_PORT || "5432"),
database: process.env.DB_NAME,
user: process.env.DB_USER,
password: process.env.DB_PASSWORD,
}));
}
// In development, the default InMemoryStore is usedFeatures
- ACID Transactions - Events are committed atomically within PostgreSQL transactions
- Optimistic Concurrency - Version-based conflict detection prevents lost updates
- Connection Pooling - Uses node-postgres Pool for efficient connection management
- Atomic Stream Claiming - Zero-contention competing consumers via
FOR UPDATE SKIP LOCKED - Auto Schema Setup -
seed()creates all required tables, indexes, and schema - NOTIFY/LISTEN - Real-time event notifications via PostgreSQL channels
- Multi-Tenant - Isolate tenants using separate schemas
Database Schema
Calling seed() creates two tables:
Events table ({schema}.{table}) - stores all committed events:
id(serial) - global event sequencename- event type namedata(jsonb) - event payloadstream- stream identifierversion- per-stream sequence numbercreated(timestamptz) - event timestampmeta(jsonb) - correlation, causation, and actor metadata
Streams table ({schema}.{table}_streams) - tracks stream processing state for reactions:
stream- stream identifierat- last processed event positionleased_by/leased_until- distributed processing claim infoblocked/error- error tracking for failed streams
Competing Consumer Pattern
The PostgreSQL adapter uses FOR UPDATE SKIP LOCKED for atomic stream claiming — the idiomatic PostgreSQL competing consumer pattern. The claim() method discovers streams with pending events and locks them in a single query:
- Workers never block each other — locked rows are silently skipped
- No race between discovery and locking (unlike a separate poll + lease)
- Same pattern used by pgBoss, Graphile Worker, and other production job queues
- Enables horizontal scaling by simply adding more workers
This replaces the previous two-step poll/lease approach, eliminating contention and simplifying the drain cycle.
Related
- @rotorsoft/act - Core framework
- Documentation
- Examples
