@wisemen/pgboss-nestjs-job
v2.1.1
Published
Make sure that the env variable `DATABASE_URI` is defined.
Maintainers
Keywords
Readme
Config
Make sure that the env variable DATABASE_URI is defined.
Usage
- Create an entrypoint that creates an NestJs application context instance that contains the
PgBossWorkerModule.
@Module({
imports: [
AppModule.forRoot(),
PgBossWorkerModule.forRoot({
queueName, // The name of the queue to process
concurrency, // The number of jobs to process concurrently
batchSize, // The number of jobs to fetch
fetchRefreshThreshold, // Refresh threshold to fetch jobs
pollInterval, // The interval (in milliseconds) to poll for new jobs
bouncerModule // An optional bouncer which will prevent jobs from being fetched
})
]
})
class WorkerModule {}
class Worker extends WorkerContainer {
async bootstrap (): Promise<INestApplicationContext> {
return await NestFactory.createApplicationContext(WorkerModule)
}
}
const _worker = new Worker()- Create a type to define the data your job needs
export interface MyJobData extends BaseJobData {
uuid: string
// other data here
}- Create a job definition
@PgBossJob('queue-name')
export class MyJob extends BaseJob<MyJobData> {}- Create a job handler (make sure to provide it)
@Injectable()
@PgBossJobHandler(MyJob)
export class MyJobHandler extends JobHandler<MyJob> {
public async run (data: MyJobData): Promise<void> {
// Do stuff
}
}QueueBouncer
Some workers / queues only need to run when some external service is online. The QueueBouncer base class is used by workers to determine wether they should poll for jobs or not by calling the canProceed method on the bouncer. This method typically performs the health check on an external service.
The queuebouncer is provided to the worker by creating and exportin a provider for the QueueBouncer class. An example module can be:
@Module({
imports: [CuoptClientModule],
providers: [{
provide: QueueBouncer,
useClass: CuoptWorkerBouncer
}],
exports: [QueueBouncer]
})
export class CuoptWorkerBouncerModule {}
When no bouncer is set, the package will default to AllowBouncer which never blocks a worker / queue from polling for jobs.
An example of a bouncer for an external cuopt system.
@Injectable()
export class CuoptWorkerBouncer extends QueueBouncer {
private isCuoptRunning: boolean
private lastPolledAt: Date
private pollPromise: Promise<boolean> | undefined
constructor (
private cuopt: CuoptClient
) {
super()
}
async canProceed (): Promise<boolean> {
if (dayjs().diff(this.lastPolledAt, 'seconds') > 2) {
await this.pollCuopt()
}
return this.isCuoptRunning
}
private async pollCuopt () {
if (this.pollPromise !== undefined) {
await this.pollPromise
return
}
this.pollPromise = this.cuopt.isReady()
try {
this.isCuoptRunning = await this.pollPromise
} catch {
this.isCuoptRunning = false
} finally {
this.lastPolledAt = new Date()
}
}
}
