npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

@flowmonkey/postgres

v0.0.1

Published

PostgreSQL storage adapters for FlowMonkey

Readme

@flowmonkey/postgres

PostgreSQL persistence layer for FlowMonkey workflows.

This package provides production-ready PostgreSQL implementations of FlowMonkey's storage interfaces for executions, flows, jobs, events, and resume tokens.

Table of Contents

Installation

pnpm add @flowmonkey/postgres pg
pnpm add -D @types/pg

Quick Start

import { Pool } from 'pg';
import { Engine, DefaultFlowRegistry, DefaultHandlerRegistry } from '@flowmonkey/core';
import { createPgStores, applySchema } from '@flowmonkey/postgres';

// Create connection pool
const pool = new Pool({
  connectionString: process.env.DATABASE_URL,
  max: 20,
  idleTimeoutMillis: 30000,
  connectionTimeoutMillis: 2000,
});

// Apply schema (run once at startup or via migrations)
await applySchema(pool);

// Create stores using factory
const { executionStore, flowStore, jobStore, eventStore } = createPgStores(pool);

// Set up engine with PostgreSQL persistence
const handlers = new DefaultHandlerRegistry();
const flows = new DefaultFlowRegistry();

const engine = new Engine(executionStore, handlers, flows);

// Use normally - all state persisted to PostgreSQL
const { execution } = await engine.create('my-flow', { data: 'value' });
await engine.run(execution.id);

Schema

The package creates the following tables. You can apply the schema using applySchema(pool) or by running the SQL directly in your migration tool.

Executions Table

Stores execution state and context:

CREATE TABLE fm_executions (
  id                    TEXT PRIMARY KEY,
  flow_id               TEXT NOT NULL,
  flow_version          TEXT NOT NULL,
  current_step          TEXT NOT NULL,
  status                TEXT NOT NULL,
  context               JSONB NOT NULL DEFAULT '{}',
  wake_at               BIGINT,
  wait_reason           TEXT,
  error                 JSONB,
  step_count            INTEGER NOT NULL DEFAULT 0,
  history               JSONB,
  tenant_id             TEXT,
  metadata              JSONB,
  created_at            BIGINT NOT NULL,
  updated_at            BIGINT NOT NULL,
  
  -- Idempotency
  idempotency_key       TEXT,
  idempotency_expires_at BIGINT,
  
  -- Cancellation
  cancellation          JSONB,
  
  -- Parent-child relationships
  parent_execution_id   TEXT,
  
  -- Wait tracking
  wait_started_at       BIGINT,
  
  -- Timeouts
  timeout_config        JSONB
);

-- Indexes for common queries
CREATE INDEX idx_fm_exec_status ON fm_executions(status);
CREATE INDEX idx_fm_exec_wake ON fm_executions(wake_at) WHERE wake_at IS NOT NULL;
CREATE INDEX idx_fm_exec_tenant ON fm_executions(tenant_id) WHERE tenant_id IS NOT NULL;
CREATE INDEX idx_fm_exec_idemp ON fm_executions(flow_id, idempotency_key) 
  WHERE idempotency_key IS NOT NULL;
CREATE INDEX idx_fm_exec_parent ON fm_executions(parent_execution_id) 
  WHERE parent_execution_id IS NOT NULL;

Flows Table

Stores flow definitions with versioning:

CREATE TABLE fm_flows (
  id          TEXT NOT NULL,
  version     TEXT NOT NULL,
  name        TEXT,
  definition  JSONB NOT NULL,
  created_at  BIGINT NOT NULL,
  PRIMARY KEY (id, version)
);

Jobs Table

Stores background jobs for stateful handlers:

CREATE TABLE fm_jobs (
  id            TEXT PRIMARY KEY,
  execution_id  TEXT NOT NULL REFERENCES fm_executions(id),
  step_id       TEXT NOT NULL,
  handler       TEXT NOT NULL,
  status        TEXT NOT NULL,
  input         JSONB NOT NULL,
  result        JSONB,
  error         JSONB,
  runner_id     TEXT,
  heartbeat_at  BIGINT,
  attempts      INTEGER NOT NULL DEFAULT 0,
  max_attempts  INTEGER NOT NULL DEFAULT 3,
  created_at    BIGINT NOT NULL,
  updated_at    BIGINT NOT NULL
);

CREATE INDEX idx_fm_jobs_status ON fm_jobs(status);
CREATE INDEX idx_fm_jobs_exec ON fm_jobs(execution_id);
CREATE INDEX idx_fm_jobs_stalled ON fm_jobs(heartbeat_at) 
  WHERE status = 'running';

Events Table

Stores audit events:

CREATE TABLE fm_events (
  id            BIGSERIAL PRIMARY KEY,
  execution_id  TEXT NOT NULL,
  event_type    TEXT NOT NULL,
  step_id       TEXT,
  data          JSONB,
  created_at    BIGINT NOT NULL
);

CREATE INDEX idx_fm_events_exec ON fm_events(execution_id);
CREATE INDEX idx_fm_events_type ON fm_events(event_type);

Resume Tokens Table

Stores tokens for resuming paused executions:

CREATE TABLE fm_resume_tokens (
  token         TEXT PRIMARY KEY,
  execution_id  TEXT NOT NULL REFERENCES fm_executions(id),
  step_id       TEXT NOT NULL,
  status        TEXT NOT NULL,
  metadata      JSONB,
  created_at    BIGINT NOT NULL,
  expires_at    BIGINT NOT NULL
);

CREATE INDEX idx_fm_tokens_exec ON fm_resume_tokens(execution_id);
CREATE INDEX idx_fm_tokens_expires ON fm_resume_tokens(expires_at);

Stores

PgExecutionStore

Implements StateStore for execution persistence:

import { PgExecutionStore } from '@flowmonkey/postgres';

const store = new PgExecutionStore(pool);

// Basic CRUD
await store.create(execution);
const exec = await store.get(executionId);
await store.update(execution);
await store.delete(executionId);

// Query by status
const running = await store.findByStatus('running', 100);
const waiting = await store.findByStatus('waiting', 100);

// Find executions ready to wake
const toWake = await store.findWaiting(100);

// Idempotency lookup
const existing = await store.findByIdempotencyKey('flow-id', 'unique-key');

// Parent-child relationships
const children = await store.findChildren(parentExecutionId);

// Timeout queries
const timedOutExecutions = await store.findTimedOutExecutions(100);
const timedOutWaits = await store.findTimedOutWaits(100);

The store includes automatic JSON serialization for context, history, error, and other complex fields.

PgFlowStore

Stores flow definitions with versioning:

import { PgFlowStore } from '@flowmonkey/postgres';

const flowStore = new PgFlowStore(pool);

// Save a flow (creates new version if exists)
await flowStore.save(flow);

// Retrieve flows
const flow = flowStore.get('order-flow');          // Latest version
const v1 = flowStore.get('order-flow', '1.0.0');   // Specific version
const latest = flowStore.latest('order-flow');     // Latest version info
const versions = flowStore.versions('order-flow'); // All versions

// List all flows
const allFlows = flowStore.list();

// Load all flows from database into memory
await flowStore.loadAll();

The flow store caches flows in memory after loading. Call loadAll() at startup to populate the cache.

PgJobStore

Manages background jobs for stateful handlers:

import { PgJobStore } from '@flowmonkey/postgres';

const jobStore = new PgJobStore(pool);

// Create a job
const job = await jobStore.create({
  executionId: 'exec-123',
  stepId: 'send-email',
  handler: 'email-send',
  input: { to: '[email protected]', subject: 'Hello' },
  maxAttempts: 3,
});

// Claim a job for processing (atomic operation)
const claimed = await jobStore.claim(job.id, 'worker-1');

// Update heartbeat while processing
await jobStore.heartbeat(job.id);

// Complete successfully
await jobStore.complete(job.id, { sent: true, messageId: 'msg-456' });

// Or fail with error
await jobStore.fail(job.id, { code: 'SMTP_ERROR', message: 'Connection refused' });

// Query jobs
const pending = await jobStore.listByStatus('pending', 10);
const stalled = await jobStore.findStalled(Date.now() - 60000, 10);
const forExecution = await jobStore.getByExecution('exec-123');

// Cleanup
const deleted = await jobStore.deleteOld(Date.now() - 7 * 24 * 60 * 60 * 1000);

Job states: pending, running, completed, failed

PgEventStore

Records and queries execution events:

import { PgEventStore } from '@flowmonkey/postgres';

const eventStore = new PgEventStore(pool);

// Record an event
await eventStore.record({
  executionId: 'exec-123',
  eventType: 'step.completed',
  stepId: 'validate-order',
  data: {
    duration: 150,
    output: { validated: true },
  },
});

// Query events
const events = await eventStore.query({
  executionId: 'exec-123',
  eventTypes: ['step.started', 'step.completed', 'step.failed'],
  after: startTimestamp,
  before: endTimestamp,
  limit: 100,
});

// Get all events for an execution
const allEvents = await eventStore.getByExecution('exec-123');

// Cleanup old events
const deleted = await eventStore.deleteOld(Date.now() - 30 * 24 * 60 * 60 * 1000);

Event types include:

  • execution.created, execution.started, execution.completed, execution.failed, execution.cancelled
  • step.started, step.completed, step.failed, step.waiting
  • job.created, job.claimed, job.completed, job.failed

PgContextStorage

Stores large context data separately from executions:

import { PgContextStorage } from '@flowmonkey/postgres';

const contextStorage = new PgContextStorage(pool);

// Store context data
await contextStorage.set(executionId, 'largeData', bigJsonObject);

// Retrieve context data
const data = await contextStorage.get(executionId, 'largeData');

// Delete context data
await contextStorage.delete(executionId, 'largeData');

// Clear all context for an execution
await contextStorage.clearExecution(executionId);

Use this for storing large intermediate results that should not bloat the main execution record.

PgResumeTokenManager

Manages resume tokens for paused executions:

import { PgResumeTokenManager } from '@flowmonkey/postgres';

const tokenManager = new PgResumeTokenManager(pool);

// Generate a token
const token = await tokenManager.generate('exec-123', 'approval-step', {
  expiresIn: 7 * 24 * 60 * 60 * 1000, // 7 days
  metadata: {
    approver: '[email protected]',
    requestedBy: '[email protected]',
  },
});

// Get token info
const info = await tokenManager.get(token);
// { executionId, stepId, status, metadata, createdAt, expiresAt }

// Validate before use
const isValid = await tokenManager.validate(token);

// Mark as used
await tokenManager.markUsed(token);

// Revoke (cancel) a token
await tokenManager.revoke(token);

// Get all tokens for an execution
const tokens = await tokenManager.getByExecution('exec-123');

// Cleanup expired tokens
const cleaned = await tokenManager.cleanupExpired();

Token states: pending, used, revoked, expired

Factory Function

The createPgStores function creates all stores with shared configuration:

import { createPgStores } from '@flowmonkey/postgres';

const stores = createPgStores(pool, {
  tablePrefix: 'fm_',  // Optional: customize table prefix
});

// Destructure the stores you need
const {
  executionStore,
  flowStore,
  jobStore,
  eventStore,
  contextStorage,
  resumeTokenManager,
} = stores;

Connection Management

Production Configuration

import { Pool } from 'pg';

const pool = new Pool({
  connectionString: process.env.DATABASE_URL,
  
  // Pool sizing - adjust based on your workload
  max: 20,                        // Maximum connections
  min: 5,                         // Minimum connections
  idleTimeoutMillis: 30000,       // Close idle connections after 30s
  connectionTimeoutMillis: 5000,  // Connection attempt timeout
  
  // SSL for production
  ssl: process.env.NODE_ENV === 'production' ? {
    rejectUnauthorized: true,
    ca: process.env.PG_CA_CERT,
  } : false,
  
  // Statement timeout prevents runaway queries
  statement_timeout: 30000,
});

// Handle pool errors
pool.on('error', (err) => {
  console.error('Unexpected pool error:', err);
  // Consider alerting/restarting based on error type
});

// Graceful shutdown
process.on('SIGTERM', async () => {
  console.log('Shutting down gracefully...');
  await pool.end();
  process.exit(0);
});

Connection Health Check

async function checkDatabaseHealth(): Promise<boolean> {
  try {
    const client = await pool.connect();
    await client.query('SELECT 1');
    client.release();
    return true;
  } catch (error) {
    console.error('Database health check failed:', error);
    return false;
  }
}

Migrations

Using applySchema

For development or simple deployments:

import { applySchema } from '@flowmonkey/postgres';

// Creates all tables if they don't exist
await applySchema(pool);

Using Migration Tools

For production, use a migration tool like node-pg-migrate or Knex:

// migrations/001_initial_flowmonkey.ts
import { schema } from '@flowmonkey/postgres';

export async function up(pool: Pool) {
  await pool.query(schema);
}

export async function down(pool: Pool) {
  await pool.query(`
    DROP TABLE IF EXISTS fm_resume_tokens;
    DROP TABLE IF EXISTS fm_events;
    DROP TABLE IF EXISTS fm_jobs;
    DROP TABLE IF EXISTS fm_flows;
    DROP TABLE IF EXISTS fm_executions;
  `);
}

Schema Versioning

The package exports the schema version for tracking:

import { schemaVersion } from '@flowmonkey/postgres';

console.log(`FlowMonkey schema version: ${schemaVersion}`);

Performance

Indexing Recommendations

The default schema includes indexes for common queries. Add custom indexes based on your query patterns:

-- If you frequently query by tenant
CREATE INDEX idx_exec_tenant_status ON fm_executions(tenant_id, status);

-- If you query by metadata fields
CREATE INDEX idx_exec_customer ON fm_executions((metadata->>'customerId'));

-- If you have many events
CREATE INDEX idx_events_time ON fm_events(created_at DESC);

Partitioning for Large Tables

For high-volume deployments, partition the events table:

-- Partition by month
CREATE TABLE fm_events (
  id BIGSERIAL,
  execution_id TEXT NOT NULL,
  event_type TEXT NOT NULL,
  step_id TEXT,
  data JSONB,
  created_at BIGINT NOT NULL
) PARTITION BY RANGE (created_at);

-- Create monthly partitions
CREATE TABLE fm_events_2024_01 PARTITION OF fm_events
  FOR VALUES FROM (1704067200000) TO (1706745600000);
  
CREATE TABLE fm_events_2024_02 PARTITION OF fm_events
  FOR VALUES FROM (1706745600000) TO (1709251200000);

Archiving Old Data

Move old completed executions to an archive table:

-- Create archive table with same structure
CREATE TABLE fm_executions_archive (LIKE fm_executions INCLUDING ALL);

-- Archive completed executions older than 30 days
INSERT INTO fm_executions_archive
SELECT * FROM fm_executions
WHERE status IN ('completed', 'failed', 'cancelled')
  AND updated_at < extract(epoch from now() - interval '30 days') * 1000;

DELETE FROM fm_executions
WHERE status IN ('completed', 'failed', 'cancelled')
  AND updated_at < extract(epoch from now() - interval '30 days') * 1000;

Monitoring Connection Pool

setInterval(() => {
  console.log('Pool stats:', {
    total: pool.totalCount,
    idle: pool.idleCount,
    waiting: pool.waitingCount,
  });
}, 60000);

API Reference

Exports

// Schema
export { schema, schemaVersion, applySchema } from './schema';

// Stores
export { PgExecutionStore } from './execution-store';
export { PgFlowStore } from './flow-store';
export { PgJobStore } from './job-store';
export { PgEventStore } from './event-store';
export { PgContextStorage } from './context-storage';
export { PgResumeTokenManager } from './resume-token-manager';

// Factory
export { createPgStores } from './factory';

// Types
export type { PgStoreOptions } from './factory';

Store Interfaces

All stores implement their corresponding interfaces from @flowmonkey/core:

  • PgExecutionStore implements StateStore
  • PgFlowStore implements FlowRegistry
  • PgJobStore implements JobStore
  • PgEventStore implements EventStore
  • PgResumeTokenManager implements ResumeTokenManager

License

MIT