tasklane
v0.1.1
Published
Background jobs that feel like normal async functions. Built on top of BullMQ and Redis.
Maintainers
Readme
tasklane
Background jobs that feel like normal async functions, powered by BullMQ and Redis.
No central job registry. No manual registration. You wrap a function with job(), and calling it enqueues it. That's it.
import { job } from "tasklane";
export const sendSms = job(async function sendSms(to: string, message: string) {
await smsProvider.send({ to, message });
});
// Enqueues in the background — returns immediately
await sendSms("Mushud", "Your OTP is 1234");Table of Contents
- Requirements
- Installation
- Quick Start
- API Reference
- Dispatching Jobs
- Backoff Strategies
- Job Flows
- Failure Handling
- Multiple Processes
- TypeScript
- Contributing
- License
Requirements
- Node.js 18+
- Redis 6+
Installation
npm install tasklane
# or
pnpm add tasklane
# or
yarn add tasklaneBullMQ (and its Redis client ioredis) are included as dependencies — nothing extra to install. You just need a running Redis server.
initJobs({ redis: "redis://127.0.0.1:6379" }); // local, no auth
initJobs({ redis: "redis://:password@host:6379" }); // with password
initJobs({ redis: "redis://host:6379/2" }); // database 2Quick Start
1. Define a job anywhere
// jobs/sms.ts
import { job } from "tasklane";
export const sendSms = job(async function sendSms(to: string, message: string) {
await smsProvider.send({ to, message });
});2. Use it anywhere in your app
// routes/booking.ts
import { sendSms } from "../jobs/sms";
router.post("/book", async (req, res) => {
await createBooking(req.body);
await sendSms(req.body.phone, "Booking confirmed!");
res.json({ ok: true });
});Importing sendSms is enough — the handler self-registers the moment the module loads. No separate registration step.
3. Initialize and start the worker once at app startup
// app.ts
import { initJobs, startWorker, onJobFailed } from "tasklane";
import "./routes"; // your routes import job functions, which registers their handlers
initJobs({ redis: process.env.REDIS_URL! });
await startWorker();
onJobFailed((event) => {
console.error(`Job "${event.name}" failed:`, event.error.message);
if (event.isFinalFailure) {
console.error("All retries exhausted.");
}
});The only rule: call startWorker() after your app's modules have been imported — so all handlers are registered before the worker starts picking up jobs.
API Reference
initJobs
Initializes the jobs runtime. Call this once at application startup before dispatching or starting any workers.
initJobs(config: JobsConfig): voidinitJobs({
redis: "redis://127.0.0.1:6379", // required
queue: "default", // optional, default: "default"
attempts: 3, // optional, default: 3
});| Option | Type | Default | Description |
|---|---|---|---|
| redis | string | — | Redis connection URL |
| queue | string | "default" | Queue name for all jobs |
| attempts | number | 3 | Default retry attempts for all jobs |
| backoff | BackoffStrategy | { type: "exponential", delay: 1000 } | Default backoff strategy for all jobs |
job
Wraps an async function as a background job. The returned function has the same call signature as the original, but calling it enqueues the job instead of running it inline.
job(fn: AsyncFunction, opts?: JobFnOptions): JobFnexport const sendSms = job(async function sendSms(to: string, message: string) {
await smsProvider.send({ to, message });
});The function must be named — arrow functions are not allowed because the function name is used as the job identifier.
// ✅ correct
const sendSms = job(async function sendSms(to: string) { ... });
// ❌ throws — no name to use as job id
const sendSms = job(async (to: string) => { ... });Per-job options:
export const sendSms = job(
async function sendSms(to: string, message: string) { ... },
{
attempts: 5,
backoff: { type: "fixed", delay: 2000 },
}
);| Option | Type | Default | Description |
|---|---|---|---|
| attempts | number | global attempts | Retry attempts for this job |
| backoff | BackoffStrategy | global backoff | Backoff strategy for this job (overrides global) |
startWorker
Starts consuming jobs from the queue. Call after initJobs() and after importing all job definition files.
await startWorker(): Promise<void>Retry backoff is configured via initJobs() or per-job options. See Backoff Strategies.
stopWorker
Gracefully closes the worker and all Redis connections.
await stopWorker(): Promise<void>onJobFailed
Registers a listener that fires on every failed job attempt. Returns an unsubscribe function.
onJobFailed(listener: (event: FailedEvent) => void): () => voidconst unsub = onJobFailed((event) => {
console.error(event.name, event.error.message);
if (event.isFinalFailure) {
// All retries exhausted — alert, log to DB, etc.
alerting.send(`Job ${event.name} permanently failed`);
}
});
// Stop listening later
unsub();FailedEvent shape:
| Field | Type | Description |
|---|---|---|
| jobId | string | BullMQ job ID |
| name | string | Job function name |
| args | unknown[] | Arguments the job was called with |
| error | Error | The error that was thrown |
| attemptsMade | number | How many attempts have been made |
| isFinalFailure | boolean | true when all retries are exhausted |
onJobCompleted
Registers a listener that fires when a job completes successfully. Returns an unsubscribe function.
onJobCompleted(listener: (event: CompletedEvent) => void): () => voidconst unsub = onJobCompleted((event) => {
console.log(`Job "${event.name}" completed`, event.result);
});
// Stop listening later
unsub();CompletedEvent shape:
| Field | Type | Description |
|---|---|---|
| jobId | string | BullMQ job ID |
| name | string | Job function name |
| args | unknown[] | Arguments the job was called with |
| result | unknown | Return value of the handler |
flow
Dispatches a job flow — a parent job that runs only after all its children complete. Supports unlimited nesting depth.
flow(node: FlowNode): Promise<void>import { flow } from "tasklane";
await flow({
job: processOrder,
args: ["ord_123"],
children: [
{ job: chargePayment, args: ["ord_123"] },
{ job: reserveInventory, args: ["ord_123"] },
],
});Children run in parallel. The parent runs only after every child (and their children) completes. If any child fails permanently, the parent is not executed.
See Job Flows for detailed examples.
getFailed
Returns a list of permanently failed jobs (all retries exhausted). Useful for building an admin dashboard or alerting pipeline.
getFailed(start?: number, limit?: number): Promise<FailedJob[]>import { getFailed } from "tasklane";
const jobs = await getFailed(); // first 50
const next = await getFailed(50, 50); // next page
for (const job of jobs) {
console.log(job.jobId, job.name, job.failedReason, job.attemptsMade);
}FailedJob shape:
| Field | Type | Description |
|---|---|---|
| jobId | string | Pass this to retryJob() to re-queue |
| name | string | Job function name |
| args | unknown[] | Arguments the job was originally called with |
| failedReason | string | Last error message |
| attemptsMade | number | Total attempts made before giving up |
| timestamp | number | Unix ms when the job was first created |
| finishedOn | number \| undefined | Unix ms when the job finally failed |
retryJob
Re-queues a permanently failed job by its ID. The job is picked up by a worker and retried from scratch — attempt count resets to zero.
retryJob(jobId: string): Promise<void>import { getFailed, retryJob } from "tasklane";
// Retry all permanently failed jobs
const failed = await getFailed();
await Promise.all(failed.map((job) => retryJob(job.jobId)));
// Or retry a single job by known ID
await retryJob("job_12345");Dispatching Jobs
Every job created with job() supports five dispatch modes.
Immediate
Enqueues the job to run as soon as a worker is available.
await sendSms("Mushud", "Your OTP is 1234");Delayed
Enqueues the job to run after a delay in milliseconds.
// Run after 1 hour
await sendSms.delay(60 * 60 * 1000)("Mushud", "Just checking in");One-time scheduled
Enqueues the job to run once at a specific Date.
await sendSms.at(new Date("2026-04-13T09:00:00Z"))("Mushud", "Good morning");Throws if the date is in the past.
Recurring cron
Registers a repeating schedule using a cron expression. Uses BullMQ's job scheduler internally — safe to call on every deploy since it is an idempotent upsert.
// Every day at 9:00 AM
await sendSms.cron("0 9 * * *")("Mushud", "Daily reminder");
// Every Monday at 8:00 AM
await sendSms.cron("0 8 * * 1")("Mushud", "Weekly report");
// Every hour
await sendSms.cron("0 * * * *")("Mushud", "Hourly ping");Cron expression format:
┌─ minute (0-59)
│ ┌─ hour (0-23)
│ │ ┌─ day of month (1-31)
│ │ │ ┌─ month (1-12)
│ │ │ │ ┌─ day of week (0-6, Sunday = 0)
│ │ │ │ │
* * * * *Run directly without queue
Calls the original handler function immediately, bypassing Redis and the queue entirely. Returns the handler's actual return value.
// No Redis needed — runs inline like a normal async function
await sendSms.run("Mushud", "Hello");Useful for:
- Testing handlers without a Redis connection
- Running a job inline when you need the result right away
- CLI scripts and one-off tasks
Backoff Strategies
Backoff controls how long BullMQ waits before retrying a failed job. You can configure it globally, per job definition, or leave it at the default.
Default: exponential backoff starting at 1 second.
Exponential backoff
Doubles the wait on each retry: 1s → 2s → 4s → 8s → ...
initJobs({
redis: process.env.REDIS_URL!,
backoff: { type: "exponential", delay: 1000 },
});Fixed backoff
Waits the same amount of time between every retry.
initJobs({
redis: process.env.REDIS_URL!,
backoff: { type: "fixed", delay: 5000 }, // always 5s
});Per-job backoff
Override the global default for a specific job. Per-job settings always win over the global setting.
export const chargePayment = job(
async function chargePayment(orderId: string) { ... },
{ backoff: { type: "fixed", delay: 2000 }, attempts: 5 }
);BackoffStrategy shape:
| Field | Type | Description |
|---|---|---|
| type | "exponential" \| "fixed" | Retry timing pattern |
| delay | number | Base delay in milliseconds |
Priority order: per-job backoff → global initJobs backoff → default { type: "exponential", delay: 1000 }
Job Flows
A flow is a group of jobs with explicit dependencies. Children always run before their parent, so you can model multi-step pipelines without polling or callbacks.
import { flow } from "tasklane";
await flow({
job: processOrder,
args: ["ord_123"],
children: [
{ job: chargePayment, args: ["ord_123"] },
{ job: reserveInventory, args: ["ord_123"] },
],
});chargePayment and reserveInventory run in parallel. processOrder runs only after both complete.
Nested flows
Children can have their own children, to any depth:
await flow({
job: processOrder,
args: ["ord_123"],
children: [
{ job: chargePayment, args: ["ord_123"] },
{
job: prepareShipment,
args: ["ord_123"],
children: [
{ job: validateAddress, args: ["ord_123"] },
{ job: pickInventory, args: ["ord_123"] },
],
},
],
});Execution order: validateAddress + pickInventory → prepareShipment + chargePayment → processOrder.
Flow options
Each node in the flow accepts attempts and backoff overrides:
await flow({
job: processOrder,
args: ["ord_123"],
children: [
{
job: chargePayment,
args: ["ord_123"],
attempts: 5,
backoff: { type: "fixed", delay: 3000 },
},
],
});Failure behaviour
If any child fails permanently (all retries exhausted), the parent job is never executed. The parent stays in a waiting state in Redis and can be inspected or cleaned up via the BullMQ dashboard.
Completed events
onJobCompleted fires for every job in the flow — children and parent alike.
onJobCompleted((event) => {
console.log(`${event.name} done`);
});
await flow({
job: processOrder,
args: ["ord_123"],
children: [{ job: chargePayment, args: ["ord_123"] }],
});
// fires: "chargePayment done", then "processOrder done"Failure Handling
How retries work
onJobFailed fires on every failed attempt — not just the final one. Use isFinalFailure to tell them apart.
onJobFailed((event) => {
if (!event.isFinalFailure) {
// This attempt failed but BullMQ will retry automatically
console.warn(`Job "${event.name}" failed (attempt ${event.attemptsMade}), retrying...`);
} else {
// All retries exhausted — job is now permanently failed
console.error(`Job "${event.name}" permanently failed:`, event.error.message);
// Alert, write to DB, notify a human, etc.
}
});Jobs automatically retry with exponential backoff when they throw. Default is 3 attempts. Configure attempts and backoff per job or globally — see Backoff Strategies.
Once isFinalFailure is true, the job sits in the failed set and will not run again unless you manually retry it via retryJob.
Common pattern — retry on deploy:
// scripts/retry-failed.ts
import { initJobs, getFailed, retryJob } from "tasklane";
initJobs({ redis: process.env.REDIS_URL! });
const failed = await getFailed();
console.log(`Retrying ${failed.length} failed jobs...`);
await Promise.all(failed.map((j) => retryJob(j.jobId)));
console.log("Done.");
process.exit(0);Multiple Processes
Running multiple processes (PM2, cluster, Docker replicas) works out of the box. Each process has its own in-memory state so a few rules apply.
Every process must:
- Call
initJobs()independently - Have all job modules in its import graph before calling
startWorker() - Call
startWorker()if it should process jobs
In most apps this is automatic — your worker entry point imports your routes or services, which import job functions, which register their handlers:
// worker.ts
import { initJobs, startWorker } from "tasklane";
import "./routes"; // transitively imports all job definitions
initJobs({ redis: process.env.REDIS_URL! });
await startWorker();If you run a dedicated worker process that doesn't import routes, explicitly import the job files that process handles:
// worker.ts — dedicated worker without routes
import { initJobs, startWorker } from "tasklane";
import "./jobs/sms";
import "./jobs/email";
initJobs({ redis: process.env.REDIS_URL! });
await startWorker();Cron schedules — call .cron() from only one process per deploy. Since upsertJobScheduler is idempotent (it upserts by a stable key in Redis), calling it from multiple processes is safe but unnecessary. The cleanest approach:
// PM2 / cluster: only the primary process sets up cron schedules
if (process.env.pm_id === "0") {
await sendSms.cron("0 9 * * *")("Mushud", "Daily reminder");
await generateReport.cron("0 0 * * *")();
}
await startWorker(); // all processes run thisWith 4 worker processes, each job is still executed exactly once — BullMQ uses atomic Redis operations to ensure no duplicate processing.
TypeScript
job() is fully generic. The return type and parameter types of the original function flow through automatically.
export const sendSms = job(async function sendSms(to: string, message: string) {
return { sent: true };
});
// ✅ TypeScript knows these are (string, string)
await sendSms("Mushud", "Hello");
// ✅ .run() preserves the return type
const result = await sendSms.run("Mushud", "Hello");
// result: { sent: boolean }
// ❌ TypeScript error — wrong argument types
await sendSms(123, "Hello");Available types:
import type {
JobsConfig,
JobFnOptions,
JobFn,
BackoffStrategy,
FlowNode,
FailedEvent,
CompletedEvent,
FailedJob,
} from "tasklane";Contributing
Contributions are welcome. Please open an issue before submitting a pull request for large changes.
Setup:
git clone https://github.com/mushud/tasklane
cd tasklane
pnpm install
pnpm buildProject structure:
src/
index.ts — public API exports
job.ts — job() factory
registry.ts — global singleton (BullMQ Queue + Worker + handler map)
types.ts — TypeScript interfacesBefore submitting a PR:
pnpm typecheck
pnpm buildLicense
MIT — see LICENSE.
