npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

@lokalise/background-jobs-common

v14.2.0

Published

This library provides a basic abstraction over BullMQ-powered background jobs. There are two types available:

Readme

Common background jobs library

This library provides a basic abstraction over BullMQ-powered background jobs. There are two types available:

  • AbstractBackgroundJobProcessor: a base class for running jobs, it provides an instrumentation and logger integration plus basic API for enqueuing jobs.

Getting Started

Install all dependencies:

npm install

Start Docker containers:

docker compose up -d

Run all tests:

npm run test

Deprecation notice

AbstractBackgroundJobProcessor is deprecated and will be removed in the future. Please use AbstractBackgroundJobProcessorNew instead. The major difference between the two is that AbstractBackgroundJobProcessorNew does no queue management, so you need a separate QueueManager instance to manage the queue.

Persisted background jobs

Usage

See test implementations in ./test/processors folder. Extend AbstractBackgroundJobProcessorNew and QueueManager to implement required methods.

const supportedQueues = [
  {
    queueId: 'queue1',
    jobPayloadSchema: z.object({
      id: z.string(),
      value: z.string(),
      metadata: z.object({
        correlationId: z.string(),
      }),
    }),
  }
] as const satisfies QueueConfiguration[]

const queueManager = new QueueManager(supportedQueues, {
  redisConfig: config.getRedisConfig(),
  isTest: false,
  lazyInitEnabled: false,
})
await queueManager.start()

const processor = new FakeBackgroundJobProcessorNew<typeof supportedQueues, 'queue1'>(
  deps,
  'queue1',
)
await processor.start()

const jobId = await queueManager.schedule('queue1', {
  id: randomUUID(),
  value: 'test',
  metadata: { correlationId: randomUUID() },
})

There's also a way to start only specific queues providing an array of queue names to the start method.

await queueManager.start(['queue1'])

Queue Configuration

To set up a queue configuration, you need to define a list of objects containing the following properties:

  • queueId: The unique identifier for the queue.
  • queueOptions: Options for the queue. Refer to the BullMQ documentation for more details.
  • bullDashboardGrouping: Optional array of strings to control how queues are grouped in the bull-dashboard, using the queues grouping feature. Array elements are applied left to right: the first string is the top-level group, and each subsequent string defines a nested subgroup. The delimiter is QUEUE_GROUP_DELIMITER. You can use the helper commonBullDashboardGroupingBuilder(serviceId, moduleId) to generate a grouping like ['serviceId', 'moduleId'].
  • jobPayloadSchema: A Zod schema that defines the structure of the jobs payload for this queue.
  • jobOptions: Default options for jobs in this queue. Can be a function that will be resolved when a job is scheduled. See BullMQ documentation.

Job Deduplication

To enable job deduplication, following BullMQ doc you should define a deduplication.id for the job within jobOptions. However, this approach can be inflexible. Therefore, QueueConfiguration allows you to specify a deduplication.idBuilder, it is callback that accepts the job payload and returns a unique string used as the deduplication ID.

const supportedQueues = [
  {
    queueId: 'queue_valid',
    jobPayloadSchema: z.object({
      id: z.string(),
      value: z.string(),
      metadata: z.object({ correlationId: z.string() }),
    }),
    jobOptions: {
      deduplication: {
        idBuilder: (jobData: any) => `${jobData.id}:${jobData.value}`,
      },
    },
  },
] as const satisfies QueueConfiguration[]

Do not report UnrecoverableError

By default, unrecoverable errors (BullMQ) are passed to the error reporting system. If you want to disable this behavior, throw MutedUnrecoverableError error instead. This will prevent the error from being reported and job will not be retried.

import { MutedUnrecoverableError } from '@lokalise/background-jobs-common'

export class Processor extends AbstractBackgroundJobProcessorNew<Data> {
  constructor(dependencies: BackgroundJobProcessorNewDependencies<Data>) {
    super(dependencies)
  }

  protected async process(
    _job: Job<JobPayloadForQueue<QueueConfiguration, QueueId>>,
    _requestContext: RequestContext,
  ): Promise < void > {
    doSomeProcessing();

    throw new MutedUnrecoverableError('Do not retry the job, and do not report the error')
  }
}

Common jobs

For that type of job, you will need to extend AbstractBackgroundJobProcessorNew and implement a processInternal method. It will be called when a job is dequeued. Processing logic is automatically wrapped into NewRelic and basic logger calls, so you only need to add your domain logic.

By default, the worker is automatically started when you instantiate the processor. There is a default configuration which you can override by passing workerOptions params to the constructor.

Similarly, queues are automatically started when you instantiate a queue manager providing a list of queues.

Use dispose() to correctly stop processing any new messages and wait for the current ones to finish.

Spies

Testing asynchronous code can be challenging. To tackle this, we've developed an integrated spy feature for both jobs and queue managers. This functionality enables you to monitor a job as it transitions to a specific state.

Additionally, the spy instance is shared between the processor and the queue manager, allowing you to use either component to verify the status of a job.

Example Usage

const scheduledJobIds = await queueManager.scheduleBulk(queueId, [
  {
    id: randomUUID(),
    value: 'first',
    metadata: { correlationId: generateMonotonicUuid() },
  },
  {
    id: randomUUID(),
    value: 'second',
    metadata: { correlationId: randomUUID() },
  },
]);

const firstScheduledJob = await queueManager.getSpy(queueId).waitForJobWithId(scheduledJobIds[0], 'scheduled');
// or using processor spy
// const firstScheduledJob = await processor.spy.waitForJobWithId(scheduledJobIds[0], 'scheduled');


const firstJob = await processor.spy.waitForJobWithId(scheduledJobIds[0], 'completed');
// or using queue manager spy
// const firstJob = await queueManager.getSpy(queueId).waitForJobWithId(scheduledJobIds[0], 'completed');
const secondJob = await processor.spy.waitForJob(
  (data) => data.value === 'second',
  'completed'
);

expect(firstScheduledJob.data.value).toBe('first');
expect(firstJob.data.value).toBe('first');
expect(secondJob.data.value).toBe('second');

Spy Methods

  • processor.spy.waitForJobWithId(jobId, status), queueManager.getSpy(queueId).waitForJobWithId(jobId, status):

    • Waits for a job with a specific ID to reach the specified status.
    • Returns the job instance when the status is achieved.
  • processor.spy.waitForJob(predicate, status), queueManager.getSpy(queueId).waitForJob(predicate, status):

    • Waits for any job that matches the custom predicate to reach the specified status.
    • Returns the matching job instance when the status is achieved.

Awaitable Job States

Spies can await jobs in the following states:

  • scheduled: The job is scheduled but not yet processed.
  • failed: The job is processed but failed.
  • completed: The job is processed successfully.

Important Notes

  • Spies do not need to be invoked before the job is processed, accommodating the unpredictability of asynchronous operations.
  • Even if you call await processor.spy.waitForJobWithId(scheduledJobId[], {state}) after the job has already been scheduled or processed, the spy can still resolve the job state for you.
  • Spies are disabled in production.
    • To enable them, set the isTest option of BackgroundJobProcessorConfig to true in your processor configuration.

By utilizing these spy functions, you can more effectively manage and test the behavior of asynchronous jobs within your system.

Barriers

In case you want to conditionally delay execution of the job (e. g. until some data necessary for processing the job arrives, or until amount of jobs in the subsequent step go below the threshold), you can use the barrier parameter, which delays the execution of the job until a specified condition passes.

Barrier looks like this:

const barrier = async(_job: Job<JobData>) => {
          if (barrierConditionIsPassing) {
            return {
              isPassing: true,
            }
          }

          return {
            isPassing: false,
            delayAmountInMs: 30000, // retry in 30 seconds
          }
        }

You pass it as a part of AbstractBackgroundJobProcessor config.

You can also pass over some dependencies from the processor to the barrier:

class myJobProcessor extends AbstractBackgroundJobProcessor<Generics> {
    protected override resolveExecutionContext(): ExecutionContext {
        return {
            userService: this.userService
        }
    }
}

This will be passed to the barrier:

const barrier = async(_job: Job<JobData>, context: ExecutionContext) => {
          if (await context.userService.userExists(job.data.userId)) {
            return {
              isPassing: true,
            }
          }

          return {
            isPassing: false,
            delayAmountInMs: 30000, // retry in 30 seconds
          }
        }

Available prebuilt barriers

@lokalise/background-jobs-common provides one barrier out-of-the-box - a JobQueueSizeThrottlingBarrier, which is used to control amount of jobs that are being spawned by a job processor (in a different queue).

Here is an example usage:

import { createJobQueueSizeThrottlingBarrier } from '@lokalise/background-jobs-common'    

const processor = new MyJobProcessor(
        dependencies, {
          // ... the rest of the config
          barrier: createJobQueueSizeThrottlingBarrier({
            maxQueueJobsInclusive: 2, // optimistic limit, if exceeded, job with the barrier will be delayed
            retryPeriodInMsecs: 30000, // job with the barrier will be retried in 30 seconds if there are too many jobs in the throttled queue
          })
        })
await processor.start()

Note that throttling is based on an optimistic check (checking the count and executing the job that uses the barrier is not an atomic operation), so potentially it is possible to go over the limit in a highly concurrent system. For this reason it is recommended to set the limits with a buffer for the possible overflow.

This barrier depends on defining the following ExecutionContext:

import type { JobQueueSizeThrottlingBarrierContext } from '@lokalise/background-jobs-common'

class myJobProcessor extends AbstractBackgroundJobProcessor<Generics> {
    protected override resolveExecutionContext(): JobQueueSizeThrottlingBarrierContext {
        return {
          throttledQueueJobProcessor: this.throttledQueueJobProcessor, // AbstractBackgroundJobProcessor
        }
    }
}

Redis Connection Resilience

The library provides utilities to enrich Redis configuration for better connection resilience during failover scenarios.

enrichRedisConfig

Automatically adds a reconnectOnError hook to Redis connections if not already present. This is particularly important when Redis switches from master to replica during failover events.

How it works:

The enrichRedisConfig function is automatically applied to all Redis connections (QueueManager, AbstractBackgroundJobProcessor, etc.) through the internal connection setup. It adds a default reconnectOnError hook that:

  • Returns true for READONLY errors (triggering reconnection)
  • Returns false for other errors

Default Behavior:

// reconnectOnError is automatically added by enrichRedisConfig
const queueManager = new QueueManager(new CommonBullmqFactory(), supportedQueues, {
  redisConfig: {
    host: 'localhost',
    port: 6379,
    useTls: false,
  },
  isTest: false,
  lazyInitEnabled: false,
})

Custom Reconnection Logic:

If you need custom reconnection logic, provide your own reconnectOnError function - it will be preserved:

const queueManager = new QueueManager(new CommonBullmqFactory(), supportedQueues, {
  redisConfig: {
    host: 'localhost',
    port: 6379,
    useTls: false,
    reconnectOnError: (err) => {
      // Custom reconnection logic
      if (err.message.includes('READONLY')) return true
      if (err.message.includes('CUSTOM_ERROR')) return true
      return false
    },
  },
  isTest: false,
  lazyInitEnabled: false,
})

The reconnectOnError function can return:

  • true or 1: Reconnect immediately
  • 2: Reconnect and resend the failed command
  • false: Do not reconnect

enrichRedisConfigOptimizedForCloud

For managed Redis instances (AWS ElastiCache, GCP Memorystore, etc.), use this enhanced version that adds both reconnectOnError and optimized DNS lookup.

Why DNS lookup matters:

Managed Redis services often use DNS-based failover. When a failover occurs, the DNS record is updated to point to the new master. The dnsLookup configuration ensures DNS resolution uses IPv4 and doesn't cache stale records.

Usage:

import { enrichRedisConfigOptimizedForCloud, sanitizeRedisConfig } from '@lokalise/background-jobs-common'

// In QueueRegistry or processor setup, manually apply before sanitizing:
const enrichedConfig = enrichRedisConfigOptimizedForCloud(redisConfig)
const finalConfig = sanitizeRedisConfig(enrichedConfig)

Note: Currently, you need to manually apply enrichRedisConfigOptimizedForCloud if you want the cloud-optimized DNS behavior. The standard enrichRedisConfig is applied automatically by the library.

What it adds:

  1. reconnectOnError hook (same as enrichRedisConfig)
  2. dnsLookup - Optimized DNS resolution for cloud environments:
    • Forces IPv4 resolution
    • Ensures fresh DNS lookups on reconnection
    • Helps with faster failover detection in managed Redis services

Queue events.

The library optimized the default event stream settings to save memory. Specifically, the library sets the default maximum length of the BullMQ queue events stream to 0 (doc). This means the event stream will not store any events by default, greatly reducing memory usage.

If you need to store more events in the stream, you can easily configure the maximum length via the queueOptions parameter during the processor creation.

export class Processor extends AbstractBackgroundJobProcessor<Data> {
    constructor(dependencies: BackgroundJobProcessorDependencies<Data>) {
        super(dependencies, {
            queueId: 'queue',
            ownerName: 'example owner',
            queueOptions: {
                streams: {events:{maxLen: 1000}},
            }
        })
    }
    // ...
}

In-memory periodic jobs

Usage

export class MyJob extends AbstractPeriodicJob {
  public static JOB_NAME = 'MyJob'

  constructor(
          dependencies: PeriodicJobDependencies,
  ) {
    super(
            {
              jobId: MyJob.JOB_NAME,
              schedule: {
                intervalInMs: config?.intervalInMs ?? 15000,
              },
              shouldLogExecution: config?.shouldLogExecution ?? false,
              singleConsumerMode: {
                enabled: false, // if true, only one job instance across all cluster nodes can be running at the same time
              },
              runImmediately: true, // if true, job will be triggered immediately after registration
            },
            dependencies,
    )
  }

  protected processInternal(executionUuid: JobExecutionContext): Promise<unknown> {
    // implement job execution logic 
  }
}

const job = new MyJob(dependencies)
await job.asyncRegister() // if runImmediately is true, this will both register the job to a scheduler and also immediately trigger execution and await its completion