npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

@pico-brief/task-manager

v0.1.0

Published

A TypeScript library that makes sure a piece of expensive work only runs once at a time, even when many servers or processes try to start it simultaneously. When the work finishes it caches the outcome, so any later caller gets the result instantly withou

Readme

task-manager

A TypeScript library that makes sure a piece of expensive work only runs once at a time, even when many servers or processes try to start it simultaneously. When the work finishes it caches the outcome, so any later caller gets the result instantly without re-running the job.


The problem this solves

Imagine your application needs to generate a large PDF report. Ten users click "Generate Report" at the same moment. Without coordination, all ten servers start generating the same PDF simultaneously — wasting resources and potentially producing ten identical files. With task-manager, only the first request actually does the work. The other nine wait and, when the first one finishes, they all receive the same result.

This pattern is called task deduplication. It is useful any time a job is:

  • Expensive — slow, CPU-heavy, or costly to call (e.g. an AI API)
  • Idempotent — running it once or ten times should produce the same result
  • Shared — many callers might trigger it at the same time

How it works (plain English)

When you call manager.run(taskId, work):

  1. One caller wins the lock. The first caller to arrive claims an exclusive lock on taskId and starts running work. Think of this like grabbing a "Currently working on it" sign and putting it on the door.

  2. Other callers become waiters. Any other process that calls run() with the same taskId while the first is still working sees the lock is taken and starts polling every 500 ms to check whether the job has finished.

  3. The owner keeps the lock alive with a heartbeat. Every 3 seconds the owner refreshes the lock's expiry. This is how waiters know the owner is still alive and making progress.

  4. If the owner dies, a waiter takes over. If the lock stops being refreshed for 30 seconds (the takeoverAfterMs threshold), a waiting process assumes the owner crashed. It acquires the lock itself and re-runs the work from scratch.

  5. The result is cached. Success or failure is written to the store so that any caller arriving after the job has already finished gets the result immediately — the work function is never called again for that taskId.


Installation

npm install @pico-brief/task-manager
# or with yarn
yarn add @pico-brief/task-manager

Requirements: Node.js 18+, TypeScript 5+ (if using TypeScript).


Quick start

import { TaskManager, InMemoryTaskStore } from '@pico-brief/task-manager';

// Create a store (use InMemoryTaskStore for a single process,
// RedisTaskStore for multiple servers).
const store = new InMemoryTaskStore();
const manager = new TaskManager(store);

// Run a task. The second argument is the actual work to do.
const succeeded = await manager.run('my-first-task', async (reportProgress) => {
  console.log('Doing the work...');

  // Optionally report how far along you are (a number from 0 to 1).
  await reportProgress(0.5); // 50% done

  console.log('Work complete!');
});

if (succeeded) {
  console.log('Task finished successfully.');
} else {
  const error = await manager.getError('my-first-task');
  console.log('Task failed:', error);
}

run() always returns a boolean:

  • true — the work completed without throwing
  • false — the work threw an error (or exhausted all retries)

Choosing a store

The store is where the library keeps track of which tasks are running, their progress, and their outcomes.

InMemoryTaskStore

Keeps everything in the current process's memory. Fast and zero-config.

import { InMemoryTaskStore } from 'task-manager';
const store = new InMemoryTaskStore();

Use this when: running a single Node.js process, writing tests, or prototyping.

Do not use this when: you have multiple servers, because each server has its own memory — they cannot coordinate with each other.

RedisTaskStore

Keeps everything in a shared Redis instance that all your servers can see.

import { RedisTaskStore } from 'task-manager';
import { createClient } from 'redis';

const redis = createClient({ socket: { host: 'localhost', port: 6379 } });
await redis.connect();

const store = new RedisTaskStore(redis);

Use this when: you have multiple servers that need to coordinate, which is the typical production setup.


Common use cases

1. Preventing duplicate API calls

You have an endpoint that fetches and caches a slow third-party API response. Many users might hit it at the same time.

import { TaskManager, RedisTaskStore } from 'task-manager';
import Redis from 'ioredis';
import { fetchWeatherData, saveToCache, getFromCache } from './weather';

const store = new RedisTaskStore(new Redis());
const manager = new TaskManager(store);

async function getWeatherReport(city: string): Promise<WeatherData> {
  const taskId = `weather:${city}`;

  // If the report is already cached, return it immediately.
  const cached = await getFromCache(taskId);
  if (cached) return cached;

  // Otherwise, run the fetch. If ten requests come in at once for the same
  // city, only one HTTP call is made — the others wait.
  const succeeded = await manager.run(taskId, async () => {
    const data = await fetchWeatherData(city);
    await saveToCache(taskId, data);
  });

  if (!succeeded) {
    throw new Error(`Failed to fetch weather for ${city}`);
  }

  return getFromCache(taskId)!;
}

2. Generating a file on demand

A user requests a PDF export. You want to avoid generating the same PDF multiple times if they click "Export" impatiently.

async function exportUserReport(userId: string): Promise<string> {
  const taskId = `report:pdf:${userId}`;

  const succeeded = await manager.run(taskId, async (reportProgress) => {
    await reportProgress(0.1);

    const data = await db.fetchUserData(userId);
    await reportProgress(0.4);

    const pdf = await generatePdf(data);
    await reportProgress(0.8);

    await s3.upload(`reports/${userId}.pdf`, pdf);
    await reportProgress(1.0);
  });

  if (!succeeded) {
    const error = await manager.getError(taskId);
    throw new Error(`Report generation failed: ${error}`);
  }

  return `reports/${userId}.pdf`;
}

While the PDF is generating you can poll its progress from a separate request:

async function getExportProgress(userId: string): Promise<number> {
  // Returns a number between 0 and 1, or null if not started.
  return manager.getProgress(`report:pdf:${userId}`) ?? 0;
}

3. Expensive AI or ML jobs

Generating an embedding, running inference, or calling a paid AI API that you never want to call twice for the same input.

async function getEmbedding(text: string): Promise<number[]> {
  // Use a stable hash of the input as the task ID.
  const taskId = `embedding:${hashText(text)}`;

  const succeeded = await manager.run(taskId, async () => {
    const embedding = await openai.createEmbedding(text);
    await vectorDb.store(taskId, embedding);
  });

  if (!succeeded) throw new Error('Embedding failed');

  return vectorDb.fetch(taskId);
}

4. Scheduled or background jobs

A job runs on a cron schedule but can also be triggered manually. You want to make sure it never runs more than once at a time, even if two cron workers fire simultaneously.

// This runs on every server in your cluster. Only one will actually
// execute the work; the others will wait and receive the same result.
async function runNightlySync() {
  const taskId = `nightly-sync:${todayDateString()}`;

  const succeeded = await manager.run(taskId, async (progress) => {
    await syncUsers();
    await progress(0.33);

    await syncOrders();
    await progress(0.66);

    await syncInventory();
    await progress(1.0);
  });

  return succeeded;
}

5. Re-running a failed task

By default, once a task is failed, all subsequent callers see the cached failure immediately (so they don't pile up retrying at the same time). To explicitly reset a task and let it run again:

// Clear the cached state so the next run() starts fresh.
await manager.clearTask('report:pdf:user-123');

// Now re-run normally.
const succeeded = await manager.run('report:pdf:user-123', generateReport);

Pipelines

A pipeline is a sequence of steps that run one after another. Each step is itself a deduplicated task. This is useful for multi-stage jobs where you want:

  • Steps to run in a fixed order
  • Each step to be individually deduplicated (two processes running the same pipeline coordinate step-by-step)
  • Partial resume — if the pipeline crashes halfway through, it picks up from where it left off rather than starting over

Basic pipeline

import { TaskManager, InMemoryTaskStore, PipelineStep } from 'task-manager';

const manager = new TaskManager(new InMemoryTaskStore());

const steps: PipelineStep[] = [
  {
    taskId: 'etl:extract',
    run: async (ctx, reportProgress) => {
      console.log(`Running step ${ctx.stepIndex + 1} of ${ctx.totalSteps}`);
      await extractDataFromSource();
      await reportProgress(1.0);
    },
  },
  {
    taskId: 'etl:transform',
    run: async (ctx, reportProgress) => {
      await reportProgress(0.0);
      await transformData();
      await reportProgress(1.0);
    },
  },
  {
    taskId: 'etl:load',
    run: async (ctx, reportProgress) => {
      await loadIntoWarehouse();
      await reportProgress(1.0);
    },
  },
];

const pipeline = manager.pipeline('etl-job', steps);
const succeeded = await pipeline.run();

if (succeeded) {
  console.log('ETL complete!');
} else {
  const status = await pipeline.getStatus();
  console.log(`Failed at step "${status.failedTaskId}": ${status.error}`);
}

Checking pipeline progress

getStatus() gives you a snapshot of where the pipeline is right now. You can call it from a separate process or a polling endpoint while the pipeline is running.

const status = await pipeline.getStatus();

console.log(status.pipelineId);      // 'etl-job'
console.log(status.currentStep);     // 1  (zero-based index of active step)
console.log(status.totalSteps);      // 3
console.log(status.overallProgress); // 0.38  (a number from 0 to 1)
console.log(status.status);          // 'running' | 'completed' | 'failed'

overallProgress accounts for each step equally. If there are 4 steps and step 1 (the second step) is 50% done, overallProgress is (1 completed + 0.5 in progress) / 4 = 0.375.

Partial resume after a crash

By default (resumeFromLastCompleted: true), if the pipeline is interrupted and you run it again, it skips any steps that already reached completed and continues from the first incomplete step.

// First run — crashes halfway through step 2.
await pipeline.run(); // returns false

// Second run — step 1 is already completed in the store, so it is skipped.
// Only steps 2 and 3 run.
await pipeline.run(); // picks up from step 2

To disable this and always run every step from scratch:

const pipeline = manager.pipeline('etl-job', steps, {
  resumeFromLastCompleted: false,
});

Advanced usage

Retrying on failure

By default, a task that throws is marked as failed with no retries. You can configure automatic retries with a backoff delay.

const succeeded = await manager.run('flaky-api-call', async () => {
  await callUnreliableExternalApi();
}, {
  maxRetries: 4,                              // 1 initial attempt + 4 retries = 5 total tries
  backoff: {
    type: 'exponential',
    initialDelayMs: 500,                      // wait 500 ms before the first retry
    multiplier: 2,                            // double the wait each time
    maxDelayMs: 15_000,                       // never wait more than 15 seconds
    jitter: true,                             // add randomness to avoid thundering herd
  },
});

Fixed backoff

Always waits the same amount of time between retries. Simpler, but can cause all servers to retry at the same moment.

backoff: { type: 'fixed', delayMs: 2000 }   // always wait 2 seconds

Exponential backoff

Waits longer and longer between retries. With jitter: true (the default) the wait is randomised within [0, cappedDelay], which prevents a thundering herd — the situation where every server retries at exactly the same instant and overloads the downstream service.

backoff: {
  type: 'exponential',
  initialDelayMs: 200,   // retry 1: up to  200 ms
  multiplier: 3,         // retry 2: up to  600 ms
  maxDelayMs: 10000,    // retry 3: up to 1800 ms, retry 4+: capped at 10 s
}

Conditional retries

Use shouldRetry to inspect the error and decide whether a retry makes sense. This is useful when some errors are permanent (e.g. "resource not found") and others are transient (e.g. "connection timeout").

const succeeded = await manager.run('delete-resource', async () => {
  await callApi('/resource/123', 'DELETE');
}, {
  maxRetries: 5,
  backoff: { type: 'exponential', initialDelayMs: 1_000 },
  shouldRetry: (errorMessage) => {
    // Do not retry if the resource was not found — it will never succeed.
    if (errorMessage.includes('404')) return false;
    // Do not retry authentication failures.
    if (errorMessage.includes('401')) return false;
    // Retry everything else (timeouts, 5xx errors, etc.).
    return true;
  },
});

Default options for all tasks

Pass options to the TaskManager constructor to apply them as defaults to every run() and pipeline() call. Individual calls can still override them.

const manager = new TaskManager(store, {
  // Every task gets 3 retries by default...
  maxRetries: 3,
  backoff: { type: 'exponential', initialDelayMs: 500 },
  // ...and a 2-minute overall timeout for waiters.
  giveUpAfterMs: 120_000,
});

// This call inherits maxRetries: 3 from the defaults above,
// but overrides the timeout to 5 minutes.
await manager.run('long-job', work, { giveUpAfterMs: 300_000 });

// This call gets all defaults as-is.
await manager.run('quick-job', work);

Per-step options in pipelines

Each step in a pipeline can have its own options that override the pipeline-level defaults. This lets you tune each step individually.

const pipeline = manager.pipeline('data-import', [
  {
    taskId: 'import:download',
    run: downloadFromS3,
    options: {
      // Download can take a long time — give it more headroom.
      giveUpAfterMs: 600_000,   // 10 minutes
    },
  },
  {
    taskId: 'import:validate',
    run: validateSchema,
    // Uses the pipeline-level defaults.
  },
  {
    taskId: 'import:insert',
    run: insertIntoDatabase,
    options: {
      // Database writes are fragile — retry aggressively.
      maxRetries: 10,
      backoff: { type: 'exponential', initialDelayMs: 200, maxDelayMs: 5_000 },
      shouldRetry: (err) => err.includes('deadlock'),
    },
  },
], {
  // Pipeline-level defaults applied to every step.
  maxRetries: 2,
  backoff: { type: 'fixed', delayMs: 1_000 },
});

Tuning lock and heartbeat timing

The defaults work well for most jobs, but you can tune them for very short or very long tasks.

await manager.run('quick-check', work, {
  // Lock only needs to live for a short job.
  lockTtlMs: 3_000,           // lock expires after 3 s if not renewed
  heartbeatIntervalMs: 1_000, // renew every 1 s (must be less than lockTtlMs)

  // Waiters check for completion frequently.
  pollIntervalMs: 100,        // check every 100 ms

  // Waiters attempt a takeover quickly if the owner goes silent.
  takeoverAfterMs: 5_000,     // take over after 5 s of silence
  giveUpAfterMs: 15_000,      // give up entirely after 15 s
});

Rule of thumb: lockTtlMs must be comfortably larger than heartbeatIntervalMs. If the heartbeat fires every 3 seconds, the lock TTL should be at least 6–10 seconds, so a single missed heartbeat does not cause the lock to expire prematurely.

Reading and checking results

// Was the task completed or failed?
const status = await store.getStatus('my-task');
// 'running' | 'completed' | 'failed' | null (not started)

// What went wrong?
const error = await manager.getError('my-task');
// The error message string, or null if the task succeeded or hasn't failed.

// How far along is it?
const progress = await manager.getProgress('my-task');
// A number from 0 to 1, or null if no progress has been reported yet.

// Reset the task entirely so it can be run again.
await manager.clearTask('my-task');

Building a progress polling endpoint

A common pattern is to kick off a long-running task and let the client poll a separate endpoint for progress.

// POST /reports/:id — kicks off the job
app.post('/reports/:id', async (req, res) => {
  const taskId = `report:${req.params.id}`;

  // Fire and forget — do not await so the HTTP response returns immediately.
  manager.run(taskId, async (reportProgress) => {
    await buildReport(req.params.id, reportProgress);
  });

  res.json({ taskId, message: 'Report generation started.' });
});

// GET /reports/:id/progress — client polls this
app.get('/reports/:id/progress', async (req, res) => {
  const taskId = `report:${req.params.id}`;

  const [progress, error] = await Promise.all([
    manager.getProgress(taskId),
    manager.getError(taskId),
  ]);

  const status = await store.getStatus(taskId);

  res.json({
    status:   status ?? 'pending',
    progress: progress ?? 0,
    error:    error ?? null,
  });
});

API reference

TaskManager

new TaskManager(store: TaskStore, defaultOptions?: TaskOptions)

The main entry point. Creates a task manager backed by the given store. defaultOptions are merged into every run() and pipeline() call and can be overridden per-call.


manager.run(taskId, work, options?)

run(
  taskId:  string,
  work:    (reportProgress: ProgressReporter) => Promise<void>,
  options?: TaskOptions,
): Promise<boolean>

Runs work under a distributed lock identified by taskId.

| Parameter | Description | |---|---| | taskId | A unique string that identifies this unit of work. Two callers with the same taskId will be deduplicated. | | work | The async function to run. Receives a reportProgress(n) callback you can call with a number from 0 to 1. | | options | Optional overrides for this specific call (see Options below). |

Returns true if the work completed successfully, false if it failed.


manager.pipeline(pipelineId, steps, options?)

pipeline(
  pipelineId: string,
  steps:      PipelineStep[],
  options?:   PipelineOptions,
): Pipeline

Creates a Pipeline object. Call .run() on the returned object to execute it.


manager.getProgress(taskId)

getProgress(taskId: string): Promise<number | null>

Returns the most recently reported progress value (0–1), or null if no progress has been reported yet.


manager.getError(taskId)

getError(taskId: string): Promise<string | null>

Returns the cached error message for a failed task, or null if the task has not failed (or the error TTL has expired).


manager.clearTask(taskId)

clearTask(taskId: string): Promise<void>

Deletes all stored state for the task (lock, status, progress, error). The next run() call will execute the work function again from scratch.


Pipeline

Created by manager.pipeline(...). Never instantiated directly.


pipeline.run()

run(): Promise<boolean>

Executes the pipeline step by step. Returns true if every step completed, false if any step failed. On failure, remaining steps are not run.


pipeline.getStatus()

getStatus(): Promise<PipelineStatus>

Returns a snapshot of the pipeline's current state. Safe to call from any process at any time, including while the pipeline is running.

interface PipelineStatus {
  pipelineId:      string;
  currentStep:     number;          // zero-based index of the active step
  totalSteps:      number;
  overallProgress: number;          // 0–1, weighted across all steps
  status:          'running' | 'completed' | 'failed';
  failedTaskId?:   string;          // taskId of the step that failed
  error?:          string;          // error message from the failed step
}

Options

All options are optional. Values shown are the defaults.

interface TaskOptions {
  // --- Locking -----------------------------------------------------------------

  /** How long the lock lives without a heartbeat renewal, in ms. */
  lockTtlMs?: number;              // default: 10_000 (10 seconds)

  /** How often the lock owner sends a heartbeat to keep the lock alive, in ms. */
  heartbeatIntervalMs?: number;    // default: 3_000  (3 seconds)

  // --- Waiting -----------------------------------------------------------------

  /** How often a waiting process checks whether the task has finished, in ms. */
  pollIntervalMs?: number;         // default: 500    (0.5 seconds)

  /**
   * How long a waiter will wait before assuming the owner has died
   * and attempting to take over the task, in ms.
   */
  takeoverAfterMs?: number;        // default: 30_000 (30 seconds)

  /**
   * Hard timeout for waiters. If the task is still not done after this
   * many ms, the waiter throws an error, in ms.
   */
  giveUpAfterMs?: number;          // default: 120_000 (2 minutes)

  // --- Retries -----------------------------------------------------------------

  /** Maximum number of retries after the initial attempt fails. */
  maxRetries?: number;             // default: 0 (no retries)

  /** Strategy that controls how long to wait between retries. */
  backoff?: BackoffStrategy;       // default: { type: 'fixed', delayMs: 1_000 }

  /**
   * Called before each retry with the error message from the last attempt.
   * Return false to stop retrying early.
   */
  shouldRetry?: (errorMessage: string) => boolean;  // default: () => true

  // --- Error caching -----------------------------------------------------------

  /** How long the error message is kept in the store after a failure, in ms. */
  errorTtlMs?: number;             // default: 60_000 (1 minute)
}

PipelineOptions extends TaskOptions with one additional field:

interface PipelineOptions extends TaskOptions {
  /**
   * When true (the default), steps already marked 'completed' in the store
   * are skipped on re-run, enabling partial resume after a crash.
   */
  resumeFromLastCompleted?: boolean;  // default: true
}

Backoff strategies

Fixed

{ type: 'fixed'; delayMs: number }

Waits exactly delayMs milliseconds before every retry.

Exponential

{
  type:           'exponential';
  initialDelayMs: number;        // base delay for the first retry
  multiplier?:    number;        // how much to multiply each time  (default: 2)
  maxDelayMs?:    number;        // ceiling — never wait longer than this (default: 30_000)
  jitter?:        boolean;       // randomise within [0, delay] to spread retries (default: true)
}

With multiplier: 2 and initialDelayMs: 500, the delays before retries are:

| Retry | No jitter | With jitter | |---|---|---| | 1st | 500 ms | 0–500 ms | | 2nd | 1 000 ms | 0–1 000 ms | | 3rd | 2 000 ms | 0–2 000 ms | | 4th | 4 000 ms | 0–4 000 ms | | 5th+ | capped at maxDelayMs | 0–maxDelayMs |


Implementing a custom store

InMemoryTaskStore and RedisTaskStore are the two built-in implementations, but you can write your own for any backing store (PostgreSQL, DynamoDB, etc.) by implementing the TaskStore interface.

import { TaskStore, TaskStatus } from 'task-manager';

class PostgresTaskStore implements TaskStore {
  async acquireLock(taskId, token, ttlMs) { /* ... */ }
  async renewLock(taskId, token, ttlMs)   { /* ... */ }
  async releaseLock(taskId, token)        { /* ... */ }

  async getStatus(taskId)                 { /* ... */ }
  async setStatus(taskId, status)         { /* ... */ }

  async getProgress(taskId)               { /* ... */ }
  async setProgress(taskId, progress)     { /* ... */ }

  async getError(taskId)                  { /* ... */ }
  async setError(taskId, error, ttlMs)    { /* ... */ }

  async clearTask(taskId)                 { /* ... */ }
}

The only method with strict requirements is acquireLock: it must be atomic. It should succeed (return true) only if no lock currently exists for taskId, and it must set the new lock in the same atomic operation — there must be no gap between checking and writing. In Redis this is done with a Lua script or SET NX. In PostgreSQL it can be done with INSERT ... ON CONFLICT DO NOTHING.

All other methods have no atomicity requirements.


TypeScript types

All types are exported from the package root:

import type {
  TaskStore,          // the interface you implement for a custom store
  TaskStatus,         // 'running' | 'completed' | 'failed'
  TaskOptions,        // options accepted by run() and pipeline()
  ResolvedTaskOptions,// options with all defaults filled in
  BackoffStrategy,    // fixed | exponential
  WorkFn,             // (reportProgress: ProgressReporter) => Promise<void>
  ProgressReporter,   // (progress: number) => Promise<void>
  PipelineStep,       // { taskId, run, options? }
  PipelineOptions,    // TaskOptions + resumeFromLastCompleted
  PipelineContext,    // { pipelineId, stepIndex, totalSteps }
  PipelineStatus,     // what getStatus() returns
} from 'task-manager';

Development

# Install dependencies
npm install

# Build (emits to dist/)
npm run build

# Watch mode
npm run dev

# Run tests
npm test

# Run tests in watch mode
npm run test:watch