@maroonedsoftware/jobbroker
v1.4.1
Published
A flexible background job processing library with support for scheduled and on-demand jobs.
Maintainers
Readme
@maroonedsoftware/jobbroker
A flexible background job processing library with support for scheduled and on-demand jobs. Currently ships with a pg-boss implementation for PostgreSQL-backed job queues.
Features
- Abstract interfaces for easy testing and alternative implementations
- Dependency injection support via injectkit
- Scheduled jobs using cron expressions
- On-demand jobs for immediate execution
- PostgreSQL backing for reliability and transactional guarantees
Installation
pnpm add @maroonedsoftware/jobbroker injectkit pg-boss reflect-metadataNote: InjectKit requires
reflect-metadatato be imported at your application entry point and TypeScript configured withexperimentalDecorators: trueandemitDecoratorMetadata: true.
Quick Start
1. Define a Job
Create a job by extending the Job base class:
import { Injectable } from 'injectkit';
import { Job } from '@maroonedsoftware/jobbroker';
interface EmailPayload {
to: string;
subject: string;
body: string;
}
@Injectable()
export class SendEmailJob extends Job<EmailPayload> {
constructor(private readonly emailService: EmailService) {
super();
}
async run(payload: EmailPayload): Promise<void> {
await this.emailService.send(payload.to, payload.subject, payload.body);
}
}2. Register Jobs
Create a registry and register your jobs:
import { PgBossJobRegistryMap } from '@maroonedsoftware/jobbroker';
const registry = new PgBossJobRegistryMap();
// On-demand job (triggered manually)
registry.set('send-email', SendEmailJob);
// Scheduled job (runs on a cron schedule)
registry.set('daily-report', {
job: DailyReportJob,
cron: '0 9 * * *', // Every day at 9 AM
});3. Set Up the Broker and Runner
import 'reflect-metadata';
import { PgBoss } from 'pg-boss';
import { InjectKitRegistry } from 'injectkit';
import { ConsoleLogger, Logger } from '@maroonedsoftware/logger';
import { PgBossJobBroker, PgBossJobRunner, PgBossJobRegistryMap, JobBroker, JobRunner } from '@maroonedsoftware/jobbroker';
// Initialize pg-boss
const pgboss = new PgBoss('postgres://user:pass@localhost/mydb');
// Set up dependency injection registry
const diRegistry = new InjectKitRegistry();
diRegistry.register(PgBossJobRegistryMap).useInstance(registry);
diRegistry.register(PgBoss).useInstance(pgboss);
diRegistry.register(Logger).useClass(ConsoleLogger).asSingleton();
diRegistry.register(JobBroker).useClass(PgBossJobBroker).asSingleton();
diRegistry.register(JobRunner).useClass(PgBossJobRunner).asSingleton();
// Build the container
const container = diRegistry.build();
// Start the job runner (this also calls pgboss.start() internally)
const runner = container.get(JobRunner);
await runner.start();4. Send Jobs
Use the broker to queue jobs for processing:
const broker = container.get(JobBroker);
// Send an immediate job
await broker.send('send-email', {
to: '[email protected]',
subject: 'Welcome!',
body: 'Thanks for signing up.',
});
// Schedule a recurring job programmatically
await broker.schedule('cleanup', '0 0 * * *', { olderThan: 30 });
// Remove a schedule
await broker.unschedule('cleanup');API Reference
Job<Payload>
Abstract base class for job handlers.
| Method | Description |
| -------------------------------------- | -------------------------------------- |
| run(payload: Payload): Promise<void> | Execute the job with the given payload |
JobBroker
Abstract base class for sending jobs to the queue.
| Method | Description |
| --------------------------------------------------------------------- | ------------------------------------ |
| send<P>(name: string, payload: P): Promise<void> | Queue a job for immediate processing |
| schedule<P>(name: string, cron: string, payload?: P): Promise<void> | Create a recurring job schedule |
| unschedule(name: string): Promise<void> | Remove a recurring job schedule |
JobRunner
Abstract base class for processing jobs from the queue.
| Method | Description |
| ------------------------ | -------------------------- |
| start(): Promise<void> | Start processing jobs |
| stop(): Promise<void> | Gracefully stop processing |
PgBossJobRegistryMap
A Map<string, Identifier<Job> | PgBossJobRegistration> for registering jobs.
Entries can be either:
- A job class identifier (for on-demand jobs)
- A
PgBossJobRegistrationobject withjobandcronproperties (for scheduled jobs)
PgBossJobRegistration
Configuration object for a scheduled job.
| Property | Type | Description |
| -------- | ------------------ | ---------------------------------------------------------- |
| job | Identifier<Job> | The job class identifier to instantiate when the job runs. |
| cron | string | A cron expression defining when the job should run. |
PgBossJobBroker
Concrete JobBroker implementation backed by pg-boss. Constructor signature: new PgBossJobBroker(registrations: PgBossJobRegistryMap, pgboss: PgBoss). Typically resolved through the DI container rather than instantiated directly.
PgBossJobRunner
Concrete JobRunner implementation backed by pg-boss. Constructor signature: new PgBossJobRunner(container: Container, registrations: PgBossJobRegistryMap, pgboss: PgBoss, logger: Logger). Calls pgboss.start() during start() and pgboss.stop() during stop(). Job instances are resolved from the DI container on each invocation. Typically resolved through the DI container rather than instantiated directly.
Graceful Shutdown
Ensure you stop the runner during application shutdown:
process.on('SIGTERM', async () => {
await runner.stop(); // Stops pg-boss internally
process.exit(0);
});Peer Dependencies
pg-boss^12.5.4 - PostgreSQL-based job queuereflect-metadata- Required by InjectKit for decorator metadata
License
MIT
