@queuert/postgres
v0.7.0
Published
PostgreSQL adapter for Queuert job queue system
Maintainers
Readme
@queuert/postgres
PostgreSQL state adapter and notify adapter for Queuert - a TypeScript library for database-backed job queues.
What does this do?
Queuert uses adapters to store job state and coordinate workers. This package provides two adapters:
State Adapter - Stores jobs in PostgreSQL tables:
- Creating and updating jobs with full ACID transactions
- Tracking job status (
blocked/pending→running→completed) - Managing job leases with
FOR UPDATE SKIP LOCKEDfor distributed workers - Storing job chains and blocker relationships
Notify Adapter - Coordinates workers via PostgreSQL LISTEN/NOTIFY:
- Broadcasts job scheduling events so workers wake up immediately
- Signals chain completion for
awaitJobChain - Uses 3 fixed channels with payload-based filtering
When to use PostgreSQL
- Production deployments - Battle-tested, ACID-compliant, scales well
- Distributed workers - Multiple workers across machines with proper locking
- Existing PostgreSQL infrastructure - No additional services needed if you already use PostgreSQL
For simpler setups, consider SQLite. For high-throughput pub/sub, consider Redis or NATS notify adapters alongside the PostgreSQL state adapter.
Requirements
- Node.js 22 or later
- TypeScript 5.0+ (recommended)
- PostgreSQL 14 or later
Installation
npm install @queuert/postgresPeer dependencies: queuert
Quick Start
import { createClient, createConsoleLog, defineJobTypeRegistry } from "queuert";
import { createPgStateAdapter, createPgNotifyAdapter } from "@queuert/postgres";
const jobTypeRegistry = defineJobTypeRegistry<{
"send-email": { entry: true; input: { to: string }; output: { sent: true } };
}>();
const stateAdapter = await createPgStateAdapter({
stateProvider: myPgStateProvider, // You provide this - see below
});
await stateAdapter.migrateToLatest();
const notifyAdapter = await createPgNotifyAdapter({
provider: myPgNotifyProvider, // You provide this - see below
});
const client = await createClient({
stateAdapter,
notifyAdapter,
registry: jobTypeRegistry,
log: createConsoleLog(),
});Configuration
State Adapter
const stateAdapter = await createPgStateAdapter({
stateProvider: myPgStateProvider,
schema: "queuert", // Schema name (default: "queuert")
tablePrefix: "", // Table name prefix (default: "")
idType: "uuid", // SQL type for job IDs (default: "uuid")
idDefault: "gen_random_uuid()", // SQL DEFAULT expression (default: "gen_random_uuid()")
});Notify Adapter
const notifyAdapter = await createPgNotifyAdapter({
provider: myPgNotifyProvider,
channelPrefix: "queuert", // Channel prefix (default: "queuert")
});The notify adapter uses LISTEN/NOTIFY which is fire-and-forget. Workers have built-in polling as a fallback for reliability.
State Provider
You need to implement a state provider that bridges your PostgreSQL client (raw pg, Drizzle, Prisma, etc.) with this adapter. The provider handles transaction management and SQL execution. See the examples for complete implementations.
API Reference
For the full API reference with types and signatures, see the @queuert/postgres reference.
Documentation
For full documentation and examples, see the Queuert documentation.
