npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

tasklane

v0.1.1

Published

Background jobs that feel like normal async functions. Built on top of BullMQ and Redis.

Readme

tasklane

Background jobs that feel like normal async functions, powered by BullMQ and Redis.

No central job registry. No manual registration. You wrap a function with job(), and calling it enqueues it. That's it.

import { job } from "tasklane";

export const sendSms = job(async function sendSms(to: string, message: string) {
  await smsProvider.send({ to, message });
});

// Enqueues in the background — returns immediately
await sendSms("Mushud", "Your OTP is 1234");

Table of Contents


Requirements

  • Node.js 18+
  • Redis 6+

Installation

npm install tasklane
# or
pnpm add tasklane
# or
yarn add tasklane

BullMQ (and its Redis client ioredis) are included as dependencies — nothing extra to install. You just need a running Redis server.

initJobs({ redis: "redis://127.0.0.1:6379" });     // local, no auth
initJobs({ redis: "redis://:password@host:6379" }); // with password
initJobs({ redis: "redis://host:6379/2" });         // database 2

Quick Start

1. Define a job anywhere

// jobs/sms.ts
import { job } from "tasklane";

export const sendSms = job(async function sendSms(to: string, message: string) {
  await smsProvider.send({ to, message });
});

2. Use it anywhere in your app

// routes/booking.ts
import { sendSms } from "../jobs/sms";

router.post("/book", async (req, res) => {
  await createBooking(req.body);
  await sendSms(req.body.phone, "Booking confirmed!");
  res.json({ ok: true });
});

Importing sendSms is enough — the handler self-registers the moment the module loads. No separate registration step.

3. Initialize and start the worker once at app startup

// app.ts
import { initJobs, startWorker, onJobFailed } from "tasklane";
import "./routes"; // your routes import job functions, which registers their handlers

initJobs({ redis: process.env.REDIS_URL! });
await startWorker();

onJobFailed((event) => {
  console.error(`Job "${event.name}" failed:`, event.error.message);
  if (event.isFinalFailure) {
    console.error("All retries exhausted.");
  }
});

The only rule: call startWorker() after your app's modules have been imported — so all handlers are registered before the worker starts picking up jobs.


API Reference

initJobs

Initializes the jobs runtime. Call this once at application startup before dispatching or starting any workers.

initJobs(config: JobsConfig): void
initJobs({
  redis: "redis://127.0.0.1:6379", // required
  queue: "default",                 // optional, default: "default"
  attempts: 3,                      // optional, default: 3
});

| Option | Type | Default | Description | |---|---|---|---| | redis | string | — | Redis connection URL | | queue | string | "default" | Queue name for all jobs | | attempts | number | 3 | Default retry attempts for all jobs | | backoff | BackoffStrategy | { type: "exponential", delay: 1000 } | Default backoff strategy for all jobs |


job

Wraps an async function as a background job. The returned function has the same call signature as the original, but calling it enqueues the job instead of running it inline.

job(fn: AsyncFunction, opts?: JobFnOptions): JobFn
export const sendSms = job(async function sendSms(to: string, message: string) {
  await smsProvider.send({ to, message });
});

The function must be named — arrow functions are not allowed because the function name is used as the job identifier.

// ✅ correct
const sendSms = job(async function sendSms(to: string) { ... });

// ❌ throws — no name to use as job id
const sendSms = job(async (to: string) => { ... });

Per-job options:

export const sendSms = job(
  async function sendSms(to: string, message: string) { ... },
  {
    attempts: 5,
    backoff: { type: "fixed", delay: 2000 },
  }
);

| Option | Type | Default | Description | |---|---|---|---| | attempts | number | global attempts | Retry attempts for this job | | backoff | BackoffStrategy | global backoff | Backoff strategy for this job (overrides global) |


startWorker

Starts consuming jobs from the queue. Call after initJobs() and after importing all job definition files.

await startWorker(): Promise<void>

Retry backoff is configured via initJobs() or per-job options. See Backoff Strategies.


stopWorker

Gracefully closes the worker and all Redis connections.

await stopWorker(): Promise<void>

onJobFailed

Registers a listener that fires on every failed job attempt. Returns an unsubscribe function.

onJobFailed(listener: (event: FailedEvent) => void): () => void
const unsub = onJobFailed((event) => {
  console.error(event.name, event.error.message);

  if (event.isFinalFailure) {
    // All retries exhausted — alert, log to DB, etc.
    alerting.send(`Job ${event.name} permanently failed`);
  }
});

// Stop listening later
unsub();

FailedEvent shape:

| Field | Type | Description | |---|---|---| | jobId | string | BullMQ job ID | | name | string | Job function name | | args | unknown[] | Arguments the job was called with | | error | Error | The error that was thrown | | attemptsMade | number | How many attempts have been made | | isFinalFailure | boolean | true when all retries are exhausted |


onJobCompleted

Registers a listener that fires when a job completes successfully. Returns an unsubscribe function.

onJobCompleted(listener: (event: CompletedEvent) => void): () => void
const unsub = onJobCompleted((event) => {
  console.log(`Job "${event.name}" completed`, event.result);
});

// Stop listening later
unsub();

CompletedEvent shape:

| Field | Type | Description | |---|---|---| | jobId | string | BullMQ job ID | | name | string | Job function name | | args | unknown[] | Arguments the job was called with | | result | unknown | Return value of the handler |


flow

Dispatches a job flow — a parent job that runs only after all its children complete. Supports unlimited nesting depth.

flow(node: FlowNode): Promise<void>
import { flow } from "tasklane";

await flow({
  job: processOrder,
  args: ["ord_123"],
  children: [
    { job: chargePayment, args: ["ord_123"] },
    { job: reserveInventory, args: ["ord_123"] },
  ],
});

Children run in parallel. The parent runs only after every child (and their children) completes. If any child fails permanently, the parent is not executed.

See Job Flows for detailed examples.


getFailed

Returns a list of permanently failed jobs (all retries exhausted). Useful for building an admin dashboard or alerting pipeline.

getFailed(start?: number, limit?: number): Promise<FailedJob[]>
import { getFailed } from "tasklane";

const jobs = await getFailed();        // first 50
const next = await getFailed(50, 50);  // next page

for (const job of jobs) {
  console.log(job.jobId, job.name, job.failedReason, job.attemptsMade);
}

FailedJob shape:

| Field | Type | Description | |---|---|---| | jobId | string | Pass this to retryJob() to re-queue | | name | string | Job function name | | args | unknown[] | Arguments the job was originally called with | | failedReason | string | Last error message | | attemptsMade | number | Total attempts made before giving up | | timestamp | number | Unix ms when the job was first created | | finishedOn | number \| undefined | Unix ms when the job finally failed |


retryJob

Re-queues a permanently failed job by its ID. The job is picked up by a worker and retried from scratch — attempt count resets to zero.

retryJob(jobId: string): Promise<void>
import { getFailed, retryJob } from "tasklane";

// Retry all permanently failed jobs
const failed = await getFailed();
await Promise.all(failed.map((job) => retryJob(job.jobId)));

// Or retry a single job by known ID
await retryJob("job_12345");

Dispatching Jobs

Every job created with job() supports five dispatch modes.

Immediate

Enqueues the job to run as soon as a worker is available.

await sendSms("Mushud", "Your OTP is 1234");

Delayed

Enqueues the job to run after a delay in milliseconds.

// Run after 1 hour
await sendSms.delay(60 * 60 * 1000)("Mushud", "Just checking in");

One-time scheduled

Enqueues the job to run once at a specific Date.

await sendSms.at(new Date("2026-04-13T09:00:00Z"))("Mushud", "Good morning");

Throws if the date is in the past.

Recurring cron

Registers a repeating schedule using a cron expression. Uses BullMQ's job scheduler internally — safe to call on every deploy since it is an idempotent upsert.

// Every day at 9:00 AM
await sendSms.cron("0 9 * * *")("Mushud", "Daily reminder");

// Every Monday at 8:00 AM
await sendSms.cron("0 8 * * 1")("Mushud", "Weekly report");

// Every hour
await sendSms.cron("0 * * * *")("Mushud", "Hourly ping");

Cron expression format:

┌─ minute (0-59)
│ ┌─ hour (0-23)
│ │ ┌─ day of month (1-31)
│ │ │ ┌─ month (1-12)
│ │ │ │ ┌─ day of week (0-6, Sunday = 0)
│ │ │ │ │
* * * * *

Run directly without queue

Calls the original handler function immediately, bypassing Redis and the queue entirely. Returns the handler's actual return value.

// No Redis needed — runs inline like a normal async function
await sendSms.run("Mushud", "Hello");

Useful for:

  • Testing handlers without a Redis connection
  • Running a job inline when you need the result right away
  • CLI scripts and one-off tasks

Backoff Strategies

Backoff controls how long BullMQ waits before retrying a failed job. You can configure it globally, per job definition, or leave it at the default.

Default: exponential backoff starting at 1 second.

Exponential backoff

Doubles the wait on each retry: 1s → 2s → 4s → 8s → ...

initJobs({
  redis: process.env.REDIS_URL!,
  backoff: { type: "exponential", delay: 1000 },
});

Fixed backoff

Waits the same amount of time between every retry.

initJobs({
  redis: process.env.REDIS_URL!,
  backoff: { type: "fixed", delay: 5000 }, // always 5s
});

Per-job backoff

Override the global default for a specific job. Per-job settings always win over the global setting.

export const chargePayment = job(
  async function chargePayment(orderId: string) { ... },
  { backoff: { type: "fixed", delay: 2000 }, attempts: 5 }
);

BackoffStrategy shape:

| Field | Type | Description | |---|---|---| | type | "exponential" \| "fixed" | Retry timing pattern | | delay | number | Base delay in milliseconds |

Priority order: per-job backoff → global initJobs backoff → default { type: "exponential", delay: 1000 }


Job Flows

A flow is a group of jobs with explicit dependencies. Children always run before their parent, so you can model multi-step pipelines without polling or callbacks.

import { flow } from "tasklane";

await flow({
  job: processOrder,
  args: ["ord_123"],
  children: [
    { job: chargePayment, args: ["ord_123"] },
    { job: reserveInventory, args: ["ord_123"] },
  ],
});

chargePayment and reserveInventory run in parallel. processOrder runs only after both complete.

Nested flows

Children can have their own children, to any depth:

await flow({
  job: processOrder,
  args: ["ord_123"],
  children: [
    { job: chargePayment, args: ["ord_123"] },
    {
      job: prepareShipment,
      args: ["ord_123"],
      children: [
        { job: validateAddress, args: ["ord_123"] },
        { job: pickInventory, args: ["ord_123"] },
      ],
    },
  ],
});

Execution order: validateAddress + pickInventoryprepareShipment + chargePaymentprocessOrder.

Flow options

Each node in the flow accepts attempts and backoff overrides:

await flow({
  job: processOrder,
  args: ["ord_123"],
  children: [
    {
      job: chargePayment,
      args: ["ord_123"],
      attempts: 5,
      backoff: { type: "fixed", delay: 3000 },
    },
  ],
});

Failure behaviour

If any child fails permanently (all retries exhausted), the parent job is never executed. The parent stays in a waiting state in Redis and can be inspected or cleaned up via the BullMQ dashboard.

Completed events

onJobCompleted fires for every job in the flow — children and parent alike.

onJobCompleted((event) => {
  console.log(`${event.name} done`);
});

await flow({
  job: processOrder,
  args: ["ord_123"],
  children: [{ job: chargePayment, args: ["ord_123"] }],
});
// fires: "chargePayment done", then "processOrder done"

Failure Handling

How retries work

onJobFailed fires on every failed attempt — not just the final one. Use isFinalFailure to tell them apart.

onJobFailed((event) => {
  if (!event.isFinalFailure) {
    // This attempt failed but BullMQ will retry automatically
    console.warn(`Job "${event.name}" failed (attempt ${event.attemptsMade}), retrying...`);
  } else {
    // All retries exhausted — job is now permanently failed
    console.error(`Job "${event.name}" permanently failed:`, event.error.message);
    // Alert, write to DB, notify a human, etc.
  }
});

Jobs automatically retry with exponential backoff when they throw. Default is 3 attempts. Configure attempts and backoff per job or globally — see Backoff Strategies.

Once isFinalFailure is true, the job sits in the failed set and will not run again unless you manually retry it via retryJob.

Common pattern — retry on deploy:

// scripts/retry-failed.ts
import { initJobs, getFailed, retryJob } from "tasklane";

initJobs({ redis: process.env.REDIS_URL! });

const failed = await getFailed();
console.log(`Retrying ${failed.length} failed jobs...`);
await Promise.all(failed.map((j) => retryJob(j.jobId)));
console.log("Done.");
process.exit(0);

Multiple Processes

Running multiple processes (PM2, cluster, Docker replicas) works out of the box. Each process has its own in-memory state so a few rules apply.

Every process must:

  1. Call initJobs() independently
  2. Have all job modules in its import graph before calling startWorker()
  3. Call startWorker() if it should process jobs

In most apps this is automatic — your worker entry point imports your routes or services, which import job functions, which register their handlers:

// worker.ts
import { initJobs, startWorker } from "tasklane";
import "./routes"; // transitively imports all job definitions

initJobs({ redis: process.env.REDIS_URL! });
await startWorker();

If you run a dedicated worker process that doesn't import routes, explicitly import the job files that process handles:

// worker.ts — dedicated worker without routes
import { initJobs, startWorker } from "tasklane";
import "./jobs/sms";
import "./jobs/email";

initJobs({ redis: process.env.REDIS_URL! });
await startWorker();

Cron schedules — call .cron() from only one process per deploy. Since upsertJobScheduler is idempotent (it upserts by a stable key in Redis), calling it from multiple processes is safe but unnecessary. The cleanest approach:

// PM2 / cluster: only the primary process sets up cron schedules
if (process.env.pm_id === "0") {
  await sendSms.cron("0 9 * * *")("Mushud", "Daily reminder");
  await generateReport.cron("0 0 * * *")();
}

await startWorker(); // all processes run this

With 4 worker processes, each job is still executed exactly once — BullMQ uses atomic Redis operations to ensure no duplicate processing.


TypeScript

job() is fully generic. The return type and parameter types of the original function flow through automatically.

export const sendSms = job(async function sendSms(to: string, message: string) {
  return { sent: true };
});

// ✅ TypeScript knows these are (string, string)
await sendSms("Mushud", "Hello");

// ✅ .run() preserves the return type
const result = await sendSms.run("Mushud", "Hello");
// result: { sent: boolean }

// ❌ TypeScript error — wrong argument types
await sendSms(123, "Hello");

Available types:

import type {
  JobsConfig,
  JobFnOptions,
  JobFn,
  BackoffStrategy,
  FlowNode,
  FailedEvent,
  CompletedEvent,
  FailedJob,
} from "tasklane";

Contributing

Contributions are welcome. Please open an issue before submitting a pull request for large changes.

Setup:

git clone https://github.com/mushud/tasklane
cd tasklane
pnpm install
pnpm build

Project structure:

src/
  index.ts      — public API exports
  job.ts        — job() factory
  registry.ts   — global singleton (BullMQ Queue + Worker + handler map)
  types.ts      — TypeScript interfaces

Before submitting a PR:

pnpm typecheck
pnpm build

License

MIT — see LICENSE.