queuert
v0.6.0
Published
Core package for Queuert job queue system
Downloads
705
Maintainers
Readme
queuert
Core package for Queuert - a TypeScript library for database-backed job queues.
What is Queuert?
Queuert is a type-safe job queue library that stores jobs in your database. It brings the familiar Promise chain pattern to distributed job processing:
// Just like Promise chains...
fetch(url).then(process).then(format);
// Queuert chains work the same way, but persist across restarts
startJobChain({ typeName: "fetch", input: { url } });
// .continueWith('process')
// .continueWith('format')Key features:
- Type-safe - Full TypeScript support with compile-time validation
- Database-backed - Jobs survive restarts; no separate queue server needed
- Distributed - Multiple workers can process jobs with proper locking
- Flexible - Linear chains, branching, loops, job dependencies (blockers)
Requirements
- Node.js 22 or later
- TypeScript 5.0+ (recommended)
Installation
npm install queuertThis is the core package. You also need a state adapter to store jobs:
@queuert/postgres- PostgreSQL (recommended for production)@queuert/sqlite- SQLite (experimental)
Optional adapters:
@queuert/redis- Redis notify adapter (recommended for production)@queuert/nats- NATS notify adapter (experimental)@queuert/otel- OpenTelemetry observability (metrics and tracing)
Quick Start
import {
createClient,
createInProcessWorker,
defineJobTypeRegistry,
createJobTypeProcessorRegistry,
withTransactionHooks,
} from "queuert";
import { createSqliteStateAdapter } from "@queuert/sqlite";
// Define your job types with full type safety
const jobTypeRegistry = defineJobTypeRegistry<{
"send-email": {
entry: true;
input: { to: string; subject: string };
output: { sent: true };
};
}>();
// Create client and adapters
const stateAdapter = await createSqliteStateAdapter({ stateProvider: myProvider });
const client = await createClient({
stateAdapter,
registry: jobTypeRegistry,
});
// Create a worker
const worker = await createInProcessWorker({
client,
workerId: "worker-1",
processorRegistry: createJobTypeProcessorRegistry(client, jobTypeRegistry, {
"send-email": {
attemptHandler: async ({ job, complete }) => {
await sendEmail(job.input.to, job.input.subject);
return complete(async () => ({ sent: true }));
},
},
}),
});
// Start a job chain (within your database transaction)
// Use your database client's transaction mechanism and pass the context
await withTransactionHooks(async (transactionHooks) =>
db.transaction(async (tx) =>
client.startJobChain({
tx, // Transaction context - matches your stateProvider's TTxContext
transactionHooks,
typeName: "send-email",
input: { to: "[email protected]", subject: "Hello!" },
}),
),
);Worker Configuration
const worker = await createInProcessWorker({
client,
workerId: "worker-1", // Unique worker identifier (optional)
concurrency: 10, // Number of jobs to process in parallel (default: 1)
// Worker loop recovery backoff (separate from per-job backoff below)
backoffConfig: {
initialDelayMs: 1_000,
multiplier: 2.0,
maxDelayMs: 30_000,
},
processDefaults: {
pollIntervalMs: 60_000, // How often to poll for new jobs (default: 60s)
// Backoff configuration for failed job attempts
backoffConfig: {
initialDelayMs: 10_000, // Initial retry delay (default: 10s)
multiplier: 2.0, // Exponential backoff multiplier
maxDelayMs: 300_000, // Maximum retry delay (default: 5min)
},
// Lease configuration for job ownership
leaseConfig: {
leaseMs: 60_000, // How long a worker holds a job (default: 60s)
renewIntervalMs: 30_000, // How often to renew the lease (default: 30s)
},
// Middlewares that wrap each job attempt (e.g., for contextual logging)
attemptMiddlewares: [
async ({ job, workerId }, next) => {
// Setup context before job processing
return await next();
// Cleanup after job processing
},
],
},
processorRegistry: createJobTypeProcessorRegistry(client, jobTypeRegistry, {
// ... job type processors
}),
});Per-job-type configuration:
const worker = await createInProcessWorker({
client,
processorRegistry: createJobTypeProcessorRegistry(client, jobTypeRegistry, {
'long-running-job': {
backoffConfig: { initialDelayMs: 30_000, multiplier: 2.0, maxDelayMs: 600_000 },
leaseConfig: { leaseMs: 300_000, renewIntervalMs: 60_000 },
attemptHandler: async ({ job, complete }) => { ... },
},
}),
});API Reference
For the full API reference with types and signatures, see the queuert reference.
Documentation
For full documentation and examples, see the Queuert documentation.
