@nvent-addon/adapter-store-postgres
v0.5.15
Published
PostgreSQL store adapter for nvent with automatic schema management
Readme
@nvent-addon/adapter-store-postgres
PostgreSQL store adapter for nvent providing a complete three-tier storage solution with optimized flat schema design for maximum performance.
Status: Production Ready ✅
- ✅ Optimized for PostgreSQL - Native columns for hot paths (20-50x faster updates)
- ✅ Intelligent field routing - Automatic mapping between metadata and database columns
- ✅ All data types working - Flows, triggers, scheduler, events all persisting correctly
- ✅ Performance optimized - Strategic indexes, atomic operations, minimal JSONB usage
- 🚀 Performance: Simple updates ~0.5ms, complex updates ~5ms (vs 10-20ms with all-JSONB)
Key Features
Optimized Flat Schema Design
Frequently accessed metadata fields (status, counters, timestamps) are stored as native PostgreSQL columns for maximum performance. JSONB is used only for truly nested complex data.
Performance Gains:
- 20-50x faster for simple updates (status changes, counter increments)
- 2-3x faster for complex updates (narrower JSONB merge scope)
- Native indexing on all hot-path columns
- Atomic operations without expensive deep merges
Intelligent Field Routing
The adapter automatically routes metadata fields to the optimal storage:
- Flat columns: status, timestamps, counters (90% of operations)
- JSONB fields: emittedEvents, subscriptions, config (10% of operations)
Timestamp Normalization
Automatic bidirectional conversion between ISO strings (app) and BIGINT milliseconds (database)
Production-Ready Features
- Connection pooling with configurable size
- Optimistic locking with automatic retry
- Strategic partial indexes for common queries
- Atomic counter operations
- Version-tracked migrations
Features
- Optimized Flat Schema: Native columns for status, stats, timestamps (90% of operations)
- Minimal JSONB Usage: Only for complex nested data (emittedEvents, awaitingSteps)
- Event Stream Storage: Append-only tables with BIGSERIAL IDs and optimized indexes
- Sorted Index Storage: Specialized tables per use case with intelligent field routing
- Key-Value Store: Simple table with TTL support and atomic operations
- Automatic Schema Management: Versioned migrations with auto-migration on startup
- Optimistic Locking: Version-based concurrency control for index updates
- Atomic Operations: Native PostgreSQL atomic increments on stat columns
- Pattern-based Routing: Subject patterns automatically route to correct tables
- Connection Pooling: Built-in connection pool management
Installation
pnpm add @nvent-addon/adapter-store-postgres pgNote:
pgis a required peer dependency.
Basic Usage
Add to your nuxt.config.ts:
export default defineNuxtConfig({
modules: [
'nvent',
'@nvent-addon/adapter-store-postgres'
],
nvent: {
store: {
adapter: 'postgres',
prefix: 'nvent' // Optional: table name prefix
}
},
nventStorePostgres: {
connection: {
host: 'localhost',
port: 5432,
database: 'nvent',
user: 'postgres',
password: process.env.POSTGRES_PASSWORD
},
prefix: 'nvent', // Optional: overrides nvent.store.prefix
autoMigrate: true, // Run migrations on startup
poolSize: 10 // Connection pool size
}
})Advanced Configuration
Connection String
You can use a connection string instead of individual options:
export default defineNuxtConfig({
nventStorePostgres: {
connection: 'postgresql://user:password@localhost:5432/nvent'
}
})Multiple Connection Sources
The adapter can pull connection settings from multiple locations (in order of precedence):
export default defineNuxtConfig({
nvent: {
// Option 1: Direct store connection
store: {
connection: {
host: 'postgres.example.com',
port: 5432,
database: 'nvent'
}
},
// Option 2: Shared connections config
connections: {
postgres: {
host: 'postgres.example.com',
port: 5432,
database: 'nvent',
user: 'nvent_user',
password: process.env.POSTGRES_PASSWORD
}
}
},
// Option 3: Module-specific connection
nventStorePostgres: {
connection: {
host: 'localhost',
port: 5432,
database: 'nvent'
}
}
})Disabling Auto-Migration
For production environments where you want to control migrations manually:
export default defineNuxtConfig({
nventStorePostgres: {
connection: process.env.DATABASE_URL,
autoMigrate: false // Disable automatic migrations
}
})You can then run migrations manually using the adapter's migration tools.
Configuration Reference
Connection Options
| Option | Type | Description |
|--------|------|-------------|
| connection | PoolConfig \| string | PostgreSQL connection config or connection string |
| host | string | PostgreSQL server host |
| port | number | PostgreSQL server port (default: 5432) |
| database | string | Database name |
| user | string | Database user |
| password | string | Database password |
| ssl | boolean \| TLSOptions | SSL/TLS configuration |
Adapter Options
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| prefix | string | 'nvent' | Prefix for all table names |
| autoMigrate | boolean | true | Run migrations automatically on startup |
| poolSize | number | 10 | Maximum number of connections in pool |
Database Schema
The adapter uses a specialized table design optimized for different use cases. Frequently accessed metadata fields are stored as native PostgreSQL columns for maximum performance.
Schema Design Philosophy
- Use-case driven tables: 7 specialized tables instead of generic single table
- Flat columns for hot paths: status, stats, timestamps as native columns (20-50x faster)
- JSONB only when needed: Complex nested data only (emittedEvents, awaitingSteps)
- Native indexing: All hot-path columns are indexed for fast queries
- Optimistic locking: Version fields on all index tables
1. Flow Run Events Table
Append-only event log for flow execution tracking.
CREATE TABLE {prefix}_flow_events (
id BIGSERIAL PRIMARY KEY,
run_id TEXT NOT NULL,
flow_name TEXT NOT NULL,
ts BIGINT NOT NULL,
type TEXT NOT NULL,
step_name TEXT,
step_id TEXT,
attempt INTEGER,
data JSONB,
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);
-- Optimized indexes
CREATE INDEX idx_{prefix}_flow_events_run ON {prefix}_flow_events(run_id, ts DESC);
CREATE INDEX idx_{prefix}_flow_events_flow_type ON {prefix}_flow_events(flow_name, type, ts DESC);2. Flow Runs Table - OPTIMIZED FLAT DESIGN ⚡
Tracks individual flow run state with flat columns for all frequently accessed fields.
CREATE TABLE {prefix}_flow_runs (
flow_name TEXT NOT NULL,
run_id TEXT NOT NULL,
-- Flat columns for hot path (direct updates, fast!)
status TEXT NOT NULL DEFAULT 'running',
started_at BIGINT NOT NULL,
completed_at BIGINT,
step_count INTEGER,
completed_steps INTEGER DEFAULT 0,
last_activity_at BIGINT,
-- JSONB only for complex nested data
emitted_events JSONB DEFAULT '{}'::jsonb,
awaiting_steps JSONB DEFAULT '{}'::jsonb,
-- Optimistic locking
version INTEGER DEFAULT 0,
updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
PRIMARY KEY (flow_name, run_id)
);
-- Fast status queries
CREATE INDEX idx_{prefix}_flow_runs_status ON {prefix}_flow_runs(status)
WHERE status IN ('running', 'awaiting');
-- Fast active runs queries
CREATE INDEX idx_{prefix}_flow_runs_active ON {prefix}_flow_runs(flow_name, started_at DESC)
WHERE status IN ('running', 'awaiting');Performance Impact:
- Status updates:
0.5ms(was 10-20ms with deep merge) - Counter increments:
0.5ms(was 10-20ms) - Complex updates:
5ms(was 10-20ms)
3. Flows Table - OPTIMIZED STATS ⚡
Flow metadata with stats as individual columns for atomic increments.
CREATE TABLE {prefix}_flows (
flow_name TEXT PRIMARY KEY,
-- Flat columns
display_name TEXT,
flow_version INTEGER DEFAULT 1,
registered_at BIGINT NOT NULL,
last_run_at BIGINT,
-- Stats as flat columns (atomic increments, no merge!)
stats_total INTEGER DEFAULT 0,
stats_success INTEGER DEFAULT 0,
stats_failure INTEGER DEFAULT 0,
stats_running INTEGER DEFAULT 0,
stats_awaiting INTEGER DEFAULT 0,
stats_cancel INTEGER DEFAULT 0,
-- Versioning
version INTEGER DEFAULT 0,
updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);
CREATE INDEX idx_{prefix}_flows_last_run ON {prefix}_flows(last_run_at DESC NULLS LAST);Performance Impact: Stats increments are now atomic column updates instead of expensive JSONB deep merges.
4. Trigger Events Table
Append-only log for trigger events.
CREATE TABLE {prefix}_trigger_events (
id BIGSERIAL PRIMARY KEY,
trigger_name TEXT NOT NULL,
ts BIGINT NOT NULL,
type TEXT NOT NULL,
data JSONB,
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);
CREATE INDEX idx_{prefix}_trigger_events_trigger ON {prefix}_trigger_events(trigger_name, ts DESC);5. Triggers Table - OPTIMIZED FLAT DESIGN ⚡
CREATE TABLE {prefix}_triggers (
trigger_name TEXT PRIMARY KEY,
-- Flat columns
trigger_type TEXT,
status TEXT DEFAULT 'active',
registered_at BIGINT NOT NULL,
-- Stats as flat columns
stats_total_fires INTEGER DEFAULT 0,
stats_last_fired_at BIGINT,
-- Versioning
version INTEGER DEFAULT 0,
updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);6. Scheduler Jobs Table - OPTIMIZED FLAT DESIGN ⚡
CREATE TABLE {prefix}_scheduler_jobs (
job_id TEXT PRIMARY KEY,
-- Flat columns for scheduling
job_name TEXT,
schedule TEXT,
scheduled_at BIGINT NOT NULL,
last_run_at BIGINT,
next_run_at BIGINT,
status TEXT DEFAULT 'pending',
-- Job configuration (complex, needs JSONB)
config JSONB,
-- Versioning
version INTEGER DEFAULT 0,
updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);
CREATE INDEX idx_{prefix}_scheduler_jobs_next_run ON {prefix}_scheduler_jobs(next_run_at)
WHERE status = 'pending';7. Key-Value Store Table
Simple key-value storage with TTL support.
CREATE TABLE {prefix}_kv (
key TEXT PRIMARY KEY,
value JSONB NOT NULL,
expires_at TIMESTAMP WITH TIME ZONE,
updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);
CREATE INDEX idx_{prefix}_kv_expires ON {prefix}_kv(expires_at)
WHERE expires_at IS NOT NULL;8. Schema Version Table
Tracks applied migrations.
CREATE TABLE {prefix}_schema_version (
version INTEGER PRIMARY KEY,
applied_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
description TEXT
);Helper Functions
jsonb_deep_merge()
For the remaining JSONB fields (emittedEvents, awaitingSteps), a PostgreSQL function provides deep merge capability:
CREATE FUNCTION jsonb_deep_merge(target jsonb, source jsonb) RETURNS jsonbThis function is only used for complex nested updates. 90% of updates now bypass this by using flat columns.
Storage Architecture & Field Routing
Intelligent Field Mapping
The adapter automatically routes metadata fields to the optimal storage location:
Flow Runs (nvent:flow:runs:{flowName}):
- Flat columns:
status,startedAt,completedAt,stepCount,completedSteps,lastActivityAt - JSONB fields:
emittedEvents,awaitingSteps
Flows (nvent:flows):
- Flat columns:
displayName,flowVersion,registeredAt,lastRunAt - Flat stats:
stats.total,stats.success,stats.failure,stats.running,stats.awaiting,stats.cancel
Triggers (nvent:triggers):
- Flat columns:
triggerType,status,registeredAt - Flat stats:
stats.totalFires,stats.lastFiredAt
Scheduler Jobs (nvent:scheduler:jobs):
- Flat columns:
jobName,schedule,scheduledAt,lastRunAt,nextRunAt,status - JSONB fields:
config
Pattern-Based Table Routing
Subject patterns automatically route to specialized tables:
// Flow run events → nvent_flow_events
'nvent:flow:run:abc123'
// Flow run index → nvent_flow_runs
'nvent:flow:runs:myflow'
// Flows index → nvent_flows
'nvent:flows'
// Trigger events → nvent_trigger_events
'nvent:trigger:event:mytrigger'
// Triggers index → nvent_triggers
'nvent:triggers'
// Scheduler jobs → nvent_scheduler_jobs
'nvent:scheduler:jobs'1. Event Streams
Append-only event storage with automatic routing to specialized tables.
Operations:
stream.append()- Insert event with auto-generated ID and timestampstream.read()- Query events with filtering, ordering, and paginationstream.delete()- Remove all events for a subject
Performance: Optimized indexes on (run_id, ts) and (flow_name, type, ts) for fast queries.
2. Sorted Indexes with Intelligent Field Routing ⚡
The adapter automatically splits metadata between flat columns and JSONB:
Operations:
index.add()- REPLACES metadata completely (extracts to flat columns + JSONB)index.get()- Reconstructs metadata object from columns + JSONBindex.read()- Range query with automatic metadata reconstructionindex.update()- DEEP MERGES metadata (uses flat columns when possible, jsonb_deep_merge for complex fields)index.updateWithRetry()- Automatic retry on version conflictsindex.increment()- Fast path: Atomic column increment for mapped stats, slow path: JSONB for unmapped fieldsindex.remove()- Remove entry
Metadata Semantics (critical for correctness):
add(): Completely replaces all metadataupdate(): Deep merges new values into existing metadata
Features:
- Optimistic locking via version field
- 20-50x faster simple updates (status, counters)
- 2-3x faster complex updates (smaller JSONB scope)
- Nested object support (dot notation for updates)
- Atomic counter increments
- Exponential backoff for retries
- GIN index for metadata queries
3. Key-Value Store
Simple key-value storage with TTL support. Keys are used as-is without additional prefixing.
Example keys: nvent:scheduler:lock:xyz, nvent:kv:config
Operations:
kv.get()- Retrieve value (auto-cleans expired entries)kv.set()- Store value with optional TTLkv.delete()- Remove keykv.clear()- Pattern-based deletion (supports LIKE patterns)kv.increment()- Atomic increment
Features:
- Automatic JSON serialization/deserialization
- TTL support with automatic cleanup on read
- Pattern-based batch deletion (convert Redis-style
*to SQL%) - Atomic increments with UPSERT
- Partial index for TTL cleanup efficiency
Data Structures
Event Record
{
id: string // Auto-generated BIGSERIAL ID (e.g., "12345")
ts: number // Unix timestamp in milliseconds
type: string // Event type
runId?: string // Flow run ID
flowName?: string // Flow name
stepName?: string // Step name (for step events)
stepId?: string // Step instance ID (for step events)
attempt?: number // Retry attempt
data?: any // Event payload (stored as JSONB)
}Index Entry
{
id: string // Entry identifier
score: number // Sort score (typically timestamp)
metadata?: { // Optional metadata (stored as JSONB)
version: number // Auto-managed for optimistic locking
[key: string]: any // User-defined fields (supports nesting)
}
}Examples
Working with Event Streams
// Append event to stream
await store.stream.append('nvent:flow:run123', {
type: 'flow:started',
flowName: 'my-flow',
runId: 'run123',
data: { input: 'test' }
})
// Read recent events
const events = await store.stream.read('nvent:flow:run123', {
limit: 50,
order: 'desc',
types: ['flow:started', 'flow:completed']
})
// Read events in time range
const events = await store.stream.read('nvent:flow:run123', {
from: Date.now() - 3600000, // Last hour
to: Date.now(),
limit: 100
})
// Read events after a specific ID
const events = await store.stream.read('nvent:flow:run123', {
after: '12345',
limit: 100
})
// Delete entire event stream
await store.stream.delete('nvent:flow:run123')Working with Indexes
// Add indexed entry with metadata
await store.index.add(
'nvent:scheduler:trigger:flow1',
'trigger-123',
Date.now(),
{
cron: '0 * * * *',
enabled: true,
stats: {
totalFires: 0,
lastFire: null
}
}
)
// Get single entry
const entry = await store.index.get(
'nvent:scheduler:trigger:flow1',
'trigger-123'
)
// Update with optimistic locking
const success = await store.index.update(
'nvent:scheduler:trigger:flow1',
'trigger-123',
{ enabled: false }
)
if (!success) {
console.log('Version conflict - entry was updated by another process')
}
// Update with automatic retry on conflict
await store.index.updateWithRetry(
'nvent:scheduler:trigger:flow1',
'trigger-123',
{ 'stats.totalFires': 5 } // Dot notation for nested updates
)
// Atomic increment
const newValue = await store.index.increment(
'nvent:scheduler:trigger:flow1',
'trigger-123',
'stats.totalFires',
1
)
// List entries (newest first)
const entries = await store.index.read(
'nvent:scheduler:trigger:flow1',
{ offset: 0, limit: 50 }
)
// Delete entry
await store.index.delete(
'nvent:scheduler:trigger:flow1',
'trigger-123'
)Working with Key-Value Store
// Set with TTL (60 seconds)
await store.kv.set('nvent:scheduler:lock:flow1', { owner: 'worker-1' }, 60)
// Get value (auto-cleans expired entries)
const lock = await store.kv.get('nvent:scheduler:lock:flow1')
// Set without TTL
await store.kv.set('nvent:config:flows', { maxRetries: 3 })
// Increment counter (creates if doesn't exist)
const count = await store.kv.increment('nvent:counter:processed', 1)
// Delete single key
await store.kv.delete('nvent:temp:data')
// Clear by pattern (Redis-style wildcards converted to SQL LIKE)
const deleted = await store.kv.clear('nvent:temp:*') // Deletes all keys matching patternSchema Migrations
The adapter uses a versioned migration system:
Migration Structure
Migrations are defined in /src/runtime/migrations.ts:
export const migrations: Migration[] = [
{
version: 1,
name: 'initial_schema',
up: async (pool: Pool, prefix: string) => {
// Create tables and indexes
},
down: async (pool: Pool, prefix: string) => {
// Rollback (optional)
}
}
]Automatic Migration
By default, migrations run automatically on adapter initialization:
const adapter = new PostgresStoreAdapter({
connection: 'postgresql://...',
autoMigrate: true // Default
})
await adapter.init() // Runs pending migrationsManual Migration
For production environments:
import { runMigrations, getCurrentVersion, rollbackTo } from '@nvent-addon/adapter-store-postgres/runtime/migrations'
import { Pool } from 'pg'
const pool = new Pool({ connectionString: process.env.DATABASE_URL })
// Check current version
const version = await getCurrentVersion(pool, 'nvent')
console.log(`Current schema version: ${version}`)
// Run migrations
await runMigrations(pool, 'nvent')
// Rollback to version 0 (drops all tables)
await rollbackTo(pool, 'nvent', 0)Adding New Migrations
When adding new features, create a new migration:
{
version: 2,
name: 'add_metadata_index',
up: async (pool: Pool, prefix: string) => {
await pool.query(`
CREATE INDEX idx_${prefix}_events_metadata
ON ${prefix}_events USING GIN(data)
`)
},
down: async (pool: Pool, prefix: string) => {
await pool.query(`DROP INDEX idx_${prefix}_events_metadata`)
}
}Performance Considerations
Subject-Based Tables (Natural Isolation)
The adapter uses subject-based tables to handle millions of events efficiently:
How It Works
Instead of one massive events table, each subject type gets its own table:
nvent_stream_flow- All flow events (nq:flow:*)nvent_stream_trigger- All trigger events (nq:trigger:*)nvent_stream_await- All await events (nq:await:*)
Tables are created automatically on first use - no manual setup required.
Key Benefits
- Natural Isolation: Each subject type is physically separated
- Smaller Tables: Queries scan only relevant data (10-100x faster)
- Smaller Indexes: Each table has its own indexes (faster lookups)
- Faster Vacuums: Maintenance operations work on smaller tables
- Easy Cleanup:
TRUNCATEorDROPentire subject types - Flexible Retention: Different policies per subject type
Table Management
List all stream tables:
import { listStreamTables } from '@nvent-addon/adapter-store-postgres/migrations'
import { Pool } from 'pg'
const pool = new Pool({ /* your config */ })
// List all stream tables
const tables = await listStreamTables(pool, 'nvent')
console.log(tables)
// [
// { subjectPrefix: 'nq:flow', tableName: 'nvent_stream_flow' },
// { subjectPrefix: 'nq:trigger', tableName: 'nvent_stream_trigger' }
// ]Drop a specific subject type:
import { dropStreamTable } from '@nvent-addon/adapter-store-postgres/migrations'
// Drop all flow events
await dropStreamTable(pool, 'nvent', 'nq:flow')Performance Example
With 10M events across 3 subject types:
❌ Single table approach:
- 10M rows in one table
- Full table scans for queries
- 500MB+ indexes
- Slow vacuums
✅ Subject-based approach:
- 3 tables with ~3.3M rows each
- Queries only touch relevant table
- ~150MB indexes per table
- Fast, parallel vacuums
Retention Policies
Implement different retention per subject type:
// Keep flow runs for 30 days
await pool.query(`
DELETE FROM nvent_stream_flow
WHERE created_at < NOW() - INTERVAL '30 days'
`)
// Keep trigger events for 90 days
await pool.query(`
DELETE FROM nvent_stream_trigger
WHERE created_at < NOW() - INTERVAL '90 days'
`)
// Or simply truncate to clear all
await pool.query(`TRUNCATE nvent_stream_flow`)Indexing Strategy
Each stream table gets optimized indexes:
- Primary Key: BIGSERIAL id for sequential inserts
- Subject + Timestamp: Fast lookups by subject with time ordering
- Type Index: Filter by event type within subject
- Run ID Index: Partial index (WHERE run_id IS NOT NULL) for flow queries
Additional indexes:
- KV Store: Primary key on
keyand partial index for TTL cleanup - Index Store: Composite primary key (key, id) and score-based index
- Metadata: GIN index for JSONB queries
Connection Pooling
Configure pool size based on your workload:
{
poolSize: 20 // Increase for high-concurrency workloads
}Optimistic Locking
The version-based locking reduces contention:
- Updates fail fast on conflicts
- Automatic retry with exponential backoff (handles "not found" race conditions)
- No database-level locks needed
TTL Cleanup
Expired KV entries are cleaned up lazily on read operations, avoiding background cleanup jobs.
Query Optimization
- Use
limitto prevent large result sets - Use timestamp-based queries for event streams
- Use pattern-based deletion sparingly (requires table scan)
- Queries automatically target the correct subject table (no joins needed)
Production Configuration
Recommended Connection Pool Settings
export default defineNuxtConfig({
nventStorePostgres: {
connection: {
connectionString: process.env.DATABASE_URL,
max: 20, // Max connections
min: 5, // Min connections
idleTimeoutMillis: 30000, // Close idle after 30s
connectionTimeoutMillis: 5000,
ssl: { rejectUnauthorized: false }
},
poolSize: 20,
autoMigrate: false // Control migrations manually in production
}
})PostgreSQL Server Tuning
Add to postgresql.conf:
# Memory (adjust based on available RAM)
shared_buffers = 256MB # 25% of RAM
effective_cache_size = 1GB # 50-75% of RAM
work_mem = 16MB
maintenance_work_mem = 128MB
# Checkpoints
checkpoint_completion_target = 0.9
wal_buffers = 16MB
min_wal_size = 1GB
max_wal_size = 4GB
# Query planner (for SSD)
random_page_cost = 1.1
effective_io_concurrency = 200Performance Monitoring
-- Check index usage
SELECT schemaname, tablename, indexname, idx_scan
FROM pg_stat_user_indexes
WHERE schemaname = 'public' AND tablename LIKE 'nvent_%'
ORDER BY idx_scan DESC;
-- Check table sizes
SELECT tablename,
pg_size_pretty(pg_total_relation_size(schemaname||'.'||tablename)) as total_size
FROM pg_tables
WHERE schemaname = 'public' AND tablename LIKE 'nvent_%'
ORDER BY pg_total_relation_size(schemaname||'.'||tablename) DESC;Production Checklist
- ✅ Set
autoMigrate: falseand control migrations manually - ✅ Configure connection pool size (20-50 for production)
- ✅ Enable SSL for database connections
- ✅ Set up automated backups
- ✅ Configure auto-vacuum settings
- ✅ Monitor connection pool usage
- ✅ Set up read replicas for scaling
- ✅ Implement retention policy for old data
- ✅ Monitor query performance with
pg_stat_statements
Comparison with Redis Adapter
| Feature | PostgreSQL | Redis | |---------|-----------|-------| | Durability | ✅ ACID compliant | ⚠️ Depends on persistence config | | Queries | ✅ Complex SQL queries | ⚠️ Limited to data structure ops | | Transactions | ✅ Full ACID | ⚠️ Lua scripts only | | Storage | ✅ Unlimited (disk-based) | ⚠️ Limited to RAM | | Scalability | ✅ Subject-based tables for isolation | ⚠️ Single-instance memory limit | | Indexing | ✅ Multiple index types | ⚠️ Data structure specific | | Schema Evolution | ✅ Versioned migrations | ❌ No schema | | Data Retention | ✅ Per-subject cleanup/truncate | ⚠️ Manual cleanup required | | Performance | ✅ Excellent with subject isolation | ✅✅ Faster (in-memory) | | Complexity | ⚠️ Requires DB management | ✅ Simple setup |
Recommendation: Use PostgreSQL for production systems with:
- Long-term data retention requirements
- Millions of events
- Need for complex queries and reporting
- ACID compliance requirements
Use Redis for:
- Development/testing
- Short retention periods
- Ultra-low latency requirements
- Simple event streaming
Troubleshooting
Connection Issues
// Enable detailed logging
const adapter = new PostgresStoreAdapter({
connection: {
host: 'localhost',
port: 5432,
database: 'nvent',
// Add logging
log: (msg) => console.log('PG:', msg)
}
})Migration Failures
Check the schema version table:
SELECT * FROM nvent_schema_version ORDER BY version;Performance Issues
Use EXPLAIN ANALYZE to diagnose slow queries:
EXPLAIN ANALYZE
SELECT * FROM nvent_events
WHERE subject = 'nvent:flow:abc123'
ORDER BY ts DESC
LIMIT 50;License
MIT
