pg-boss-bull-board
v1.0.2
Published
BullMQ-compatible job queue using PostgreSQL (pg-boss) with bull-board integration
Maintainers
Readme
pg-boss-bull-board
A BullMQ-compatible job queue using PostgreSQL (pg-boss) as the backend, with seamless bull-board integration.
Drop-in replacement for BullMQ + Redis. No Redis required — just PostgreSQL.
Features
- ✅ BullMQ-compatible API — Easy migration from BullMQ
- ✅ PostgreSQL backend — No Redis dependency, uses pg-boss under the hood
- ✅ bull-board integration — Monitor queues with the bull-board UI
- ✅ Flow support — Parent-child job relationships
- ✅ Retries & backoff — Exponential and fixed backoff strategies
- ✅ Job priorities — Process important jobs first
- ✅ Delayed jobs — Schedule jobs for future execution
- ✅ Concurrency control — Process multiple jobs in parallel
- ✅ TypeScript support — Full type definitions included
Installation
npm install pg-boss-bull-boardFor bull-board integration (optional):
npm install @bull-board/api @bull-board/expressQuick Start
1. Initialize the Library
import { init } from "pg-boss-bull-board";
// Basic initialization
init({
connectionString: "postgresql://user:password@localhost:5432/mydb",
});
// With SSL enabled (simple)
init({
connectionString: "postgresql://user:password@localhost:5432/mydb",
ssl: true,
});
// With SSL configuration (advanced)
init({
connectionString: "postgresql://user:password@localhost:5432/mydb",
ssl: {
require: true,
rejectUnauthorized: true,
ca: fs.readFileSync("/path/to/ca-certificate.crt").toString(),
cert: fs.readFileSync("/path/to/client-certificate.crt").toString(),
key: fs.readFileSync("/path/to/client-key.key").toString(),
},
});2. Create a Producer
import { Queue } from "pg-boss-bull-board";
const emailQueue = new Queue("emails");
// Add a job
await emailQueue.add("send-welcome", {
to: "[email protected]",
subject: "Welcome!",
});
// Add a job with options
await emailQueue.add(
"send-newsletter",
{ userId: 123 },
{
delay: 60000, // Delay 1 minute
attempts: 3, // Retry up to 3 times
priority: 10, // Higher priority
backoff: {
type: "exponential",
delay: 1000,
},
}
);
// Add multiple jobs at once
await emailQueue.addBulk([
{ name: "send-welcome", data: { to: "[email protected]" } },
{ name: "send-welcome", data: { to: "[email protected]" } },
]);3. Create a Worker
import { Worker } from "pg-boss-bull-board";
const worker = new Worker(
"emails",
async (job) => {
console.log(`Processing job ${job.id}: ${job.name}`);
console.log("Data:", job.data);
// Your job logic here
await sendEmail(job.data.to, job.data.subject);
return { success: true, sentAt: new Date() };
},
{
concurrency: 5, // Process 5 jobs in parallel
pollingIntervalSeconds: 2, // Check for jobs every 2 seconds
}
);
// Graceful shutdown
process.on("SIGTERM", async () => {
await worker.close();
});4. Add bull-board UI (Optional)
import express from "express";
import { createBullBoard } from "@bull-board/api";
import { ExpressAdapter } from "@bull-board/express";
import { init, BullBoardAdapter } from "pg-boss-bull-board";
// Initialize pg-boss
init({ connectionString: process.env.DATABASE_URL });
// Setup bull-board
const serverAdapter = new ExpressAdapter();
serverAdapter.setBasePath("/admin/queues");
createBullBoard({
queues: [
new BullBoardAdapter("emails"),
new BullBoardAdapter("notifications"),
new BullBoardAdapter("reports"),
],
serverAdapter,
});
// Mount on Express
const app = express();
app.use("/admin/queues", serverAdapter.getRouter());
app.listen(3000, () => {
console.log("bull-board running at http://localhost:3000/admin/queues");
});API Reference
init(config)
Initialize the library. Must be called before using any other functions.
init({
connectionString: "postgresql://...",
ssl: true, // Enable SSL (simple)
// OR
ssl: {
// SSL configuration (advanced)
require: true,
rejectUnauthorized: true,
ca: "...", // CA certificate
cert: "...", // Client certificate
key: "...", // Client key
},
archiveCompletedAfterSeconds: 604800, // 7 days (default)
deleteAfterSeconds: 1209600, // 14 days (default)
maintenanceIntervalSeconds: 30, // default
monitorStateIntervalSeconds: 10, // default
});| Option | Type | Default | Description |
| ------------------------------ | ---------------------- | ------------ | -------------------------------------- |
| connectionString | string | required | PostgreSQL connection string |
| ssl | boolean \| SSLConfig | undefined | SSL configuration (see below) |
| archiveCompletedAfterSeconds | number | 604800 | Archive completed jobs after N seconds |
| deleteAfterSeconds | number | 1209600 | Delete archived jobs after N seconds |
| maintenanceIntervalSeconds | number | 30 | Maintenance check interval |
| monitorStateIntervalSeconds | number | 10 | State monitoring interval |
SSL Configuration
interface SSLConfig {
require?: boolean; // Require SSL connection (default: false)
rejectUnauthorized?: boolean; // Reject unauthorized certificates (default: true)
ca?: string; // CA certificate content (optional)
cert?: string; // Client certificate content (optional)
key?: string; // Client private key content (optional)
passphrase?: string; // Passphrase for the private key (optional)
}All SSL fields are optional. Use only what you need:
Examples:
// 1. Simple SSL (require SSL but allow self-signed certificates)
// No certificates needed - just enable SSL
init({
connectionString: "postgresql://user:pass@host:5432/db",
ssl: true,
});
// 2. SSL with certificate validation (no client certs needed)
// Only validates server certificate against system CA store
init({
connectionString: "postgresql://user:pass@host:5432/db",
ssl: {
require: true,
rejectUnauthorized: true, // Validates server certificate
},
});
// 3. SSL with custom CA certificate
// Use when server uses a custom CA not in system store
import fs from "fs";
init({
connectionString: "postgresql://user:pass@host:5432/db",
ssl: {
require: true,
rejectUnauthorized: true,
ca: fs.readFileSync("/path/to/ca-certificate.crt").toString(), // Only CA needed
},
});
// 4. SSL with client certificate authentication
// Use when database requires client certificates
init({
connectionString: "postgresql://user:pass@host:5432/db",
ssl: {
require: true,
rejectUnauthorized: true,
cert: fs.readFileSync("/path/to/client-certificate.crt").toString(), // Client cert
key: fs.readFileSync("/path/to/client-key.key").toString(), // Client key
// ca is optional if using system CA store
},
});
// 5. Full SSL configuration (all certificates)
// Use when you need complete control over certificate chain
init({
connectionString: "postgresql://user:pass@host:5432/db",
ssl: {
require: true,
rejectUnauthorized: true,
ca: fs.readFileSync("/path/to/ca-certificate.crt").toString(),
cert: fs.readFileSync("/path/to/client-certificate.crt").toString(),
key: fs.readFileSync("/path/to/client-key.key").toString(),
passphrase: "your-key-passphrase", // Only if key is encrypted
},
});
// 6. SSL with certificates from environment variables
init({
connectionString: "postgresql://user:pass@host:5432/db",
ssl: {
require: true,
rejectUnauthorized: true,
...(process.env.DB_CA_CERT && { ca: process.env.DB_CA_CERT }),
...(process.env.DB_CLIENT_CERT && { cert: process.env.DB_CLIENT_CERT }),
...(process.env.DB_CLIENT_KEY && { key: process.env.DB_CLIENT_KEY }),
},
});When to use which fields:
| Field | When Required | Description |
| -------------------------- | ---------------------------- | --------------------------------------------------------------------------- |
| require: true | When you want to enforce SSL | Forces SSL connection |
| rejectUnauthorized: true | When validating certificates | Validates server certificate (default: true) |
| ca | Custom CA certificates | Use when server uses CA not in system store |
| cert | Client certificate auth | Required only if database requires client certificates |
| key | Client certificate auth | Required only if database requires client certificates (paired with cert) |
| passphrase | Encrypted keys | Only needed if your private key is encrypted |
Common Scenarios:
- Most cloud databases (AWS RDS, Google Cloud SQL, Azure): Just
ssl: trueorssl: { require: true } - Self-signed certificates:
ssl: { require: true, rejectUnauthorized: false } - Custom CA: Add
cafield - Client certificate authentication: Add
certandkeyfields
Queue
Create and manage job queues.
const queue = new Queue<MyDataType>("queue-name");Methods
| Method | Description |
| --------------------------- | ----------------- |
| add(name, data, options?) | Add a single job |
| addBulk(jobs) | Add multiple jobs |
| getJob(id) | Get job by ID |
Job Options
interface JobOptions {
attempts?: number; // Retry attempts on failure
delay?: number; // Delay in milliseconds
backoff?: {
type: "exponential" | "fixed";
delay: number;
};
priority?: number; // Higher = processed sooner
removeOnComplete?: boolean | number; // Auto-remove after completion
jobId?: string; // Custom ID for deduplication
}Worker
Process jobs from a queue.
const worker = new Worker<DataType, ResultType>(
"queue-name",
async (job) => {
// Process job
return result;
},
options
);Options
| Option | Type | Default | Description |
| ------------------------ | -------- | ------- | ------------------------------------- |
| concurrency | number | 1 | Number of jobs to process in parallel |
| pollingIntervalSeconds | number | 2 | How often to check for new jobs |
Methods
| Method | Description |
| ------------- | --------------------------------------- |
| start() | Start processing (called automatically) |
| close() | Stop processing gracefully |
| pause() | Pause the worker |
| resume() | Resume the worker |
| isRunning() | Check if worker is running |
Job Object (in handler)
interface WorkerJob<T> {
id: string;
name: string;
data: T;
attemptsMade: number;
opts: {
attempts: number;
priority: number;
};
}FlowProducer
Create parent-child job relationships.
import { FlowProducer } from "pg-boss-bull-board";
const flow = new FlowProducer();
const result = await flow.add({
name: "process-order",
queueName: "orders",
data: { orderId: "123" },
children: [
{
name: "charge-payment",
queueName: "payments",
data: { orderId: "123", amount: 99.99 },
},
{
name: "reserve-inventory",
queueName: "inventory",
data: { orderId: "123", items: ["SKU-001"] },
},
],
});
console.log("Parent ID:", result.parentId);
console.log("Child IDs:", result.childIds);BullBoardAdapter
Adapter for bull-board UI integration.
import { BullBoardAdapter } from "pg-boss-bull-board";
const adapter = new BullBoardAdapter("queue-name", {
readOnlyMode: false, // Disable job actions in UI
allowRetries: true, // Allow retry button
description: "My Queue",
});Utility Functions
import { shutdown, isInitialized, isRunning } from "pg-boss-bull-board";
// Check if library is initialized
if (isInitialized()) {
console.log("Library is configured");
}
// Check if client is running
if (isRunning()) {
console.log("Client is connected");
}
// Graceful shutdown
await shutdown();Migration from BullMQ
pg-boss-bull-board is designed to be a drop-in replacement for BullMQ. Here's a migration guide:
Before (BullMQ + Redis)
import { Queue, Worker } from "bullmq";
const queue = new Queue("emails", {
connection: { host: "localhost", port: 6379 },
});
const worker = new Worker(
"emails",
async (job) => {
// process job
},
{
connection: { host: "localhost", port: 6379 },
concurrency: 5,
}
);
await queue.add("send-email", { to: "[email protected]" });After (pg-boss-bull-board + PostgreSQL)
import { init, Queue, Worker } from "pg-boss-bull-board";
// Initialize once
init({ connectionString: "postgresql://..." });
const queue = new Queue("emails");
const worker = new Worker(
"emails",
async (job) => {
// process job (same handler!)
},
{
concurrency: 5,
}
);
await queue.add("send-email", { to: "[email protected]" });Key Differences
| Feature | BullMQ | pg-boss-bull-board |
| ------------- | ---------------------- | ------------------ |
| Backend | Redis | PostgreSQL |
| Connection | Per Queue/Worker | Global init() |
| Job Events | Event-based | Polling-based |
| Rate Limiting | Built-in | Not supported |
| Job Progress | job.updateProgress() | Not supported |
Examples
Express Server with bull-board
import express from "express";
import { createBullBoard } from "@bull-board/api";
import { ExpressAdapter } from "@bull-board/express";
import { init, Queue, Worker, BullBoardAdapter } from "pg-boss-bull-board";
// Initialize
init({ connectionString: process.env.DATABASE_URL! });
// Create queues
const emailQueue = new Queue("emails");
const reportQueue = new Queue("reports");
// Create workers
new Worker(
"emails",
async (job) => {
console.log("Sending email:", job.data);
return { sent: true };
},
{ concurrency: 3 }
);
new Worker("reports", async (job) => {
console.log("Generating report:", job.data);
return { generated: true };
});
// Setup bull-board
const serverAdapter = new ExpressAdapter();
serverAdapter.setBasePath("/admin/queues");
createBullBoard({
queues: [new BullBoardAdapter("emails"), new BullBoardAdapter("reports")],
serverAdapter,
});
// Express app
const app = express();
app.use(express.json());
app.use("/admin/queues", serverAdapter.getRouter());
app.post("/api/send-email", async (req, res) => {
const jobId = await emailQueue.add("send", req.body);
res.json({ jobId });
});
app.post("/api/generate-report", async (req, res) => {
const jobId = await reportQueue.add("generate", req.body, {
priority: req.body.urgent ? 10 : 1,
});
res.json({ jobId });
});
app.listen(3000);Delayed & Scheduled Jobs
import { Queue } from "pg-boss-bull-board";
const queue = new Queue("reminders");
// Send reminder in 1 hour
await queue.add(
"send-reminder",
{ userId: 123 },
{
delay: 60 * 60 * 1000,
}
);
// Send daily report every day at 9 AM (use external scheduler + delay)
await queue.add(
"daily-report",
{ type: "sales" },
{
delay: getMillisecondsUntil9AM(),
}
);Retries with Backoff
import { Queue, Worker } from "pg-boss-bull-board";
const queue = new Queue("api-calls");
// Add job with retry configuration
await queue.add(
"fetch-data",
{ url: "https://api.example.com" },
{
attempts: 5,
backoff: {
type: "exponential",
delay: 1000, // 1s, 2s, 4s, 8s, 16s
},
}
);
// Worker will automatically retry failed jobs
new Worker("api-calls", async (job) => {
const response = await fetch(job.data.url);
if (!response.ok) {
throw new Error(`HTTP ${response.status}`);
}
return await response.json();
});TypeScript
Full TypeScript support with generics:
interface EmailJob {
to: string;
subject: string;
body: string;
}
interface EmailResult {
messageId: string;
sentAt: Date;
}
const queue = new Queue<EmailJob>("emails");
const worker = new Worker<EmailJob, EmailResult>("emails", async (job) => {
// job.data is typed as EmailJob
const messageId = await sendEmail(
job.data.to,
job.data.subject,
job.data.body
);
return { messageId, sentAt: new Date() };
});Requirements
- Node.js >= 18.0.0
- PostgreSQL >= 12
License
MIT © flywl
