async-worker-queue-runner
v1.0.5
Published
A lightweight, flexible async queue runner with concurrency control, retries, and optional [worker_threads](https://nodejs.org/api/worker_threads.html) support. Perfect for processing tasks with automatic retry and backoff handling.
Readme
Async Queue Runner
A lightweight, flexible async queue runner with concurrency control, retries, and optional worker_threads support. Perfect for processing tasks with automatic retry and backoff handling.
✨ Features
- ✅ Concurrency limit
- 🔁 Retry with backoff
- 👷 Optional worker thread execution
- 🧵 Main thread fallback
- 📦 Type-safe & written in TypeScript
📦 Installation
pnpm add async-worker-queue-runnerOr with npm:
npm install async-worker-queue-runner🔰 Simple Usage (Main Thread)
import { AsyncQueueRunner } from "async-worker-queue-runner";
const queue = new AsyncQueueRunner<{ input: number }, number>(
async ({ input }) => {
if (Math.random() < 0.5) throw new Error("Random failure");
return input * 10;
},
2, // concurrency
{
useWorker: false,
maxRetries: 3,
backoffMs: (attempt) => attempt * 500, // 500ms, 1000ms, 1500ms
}
);
for (let i = 1; i <= 5; i++) {
queue
.add({ input: i })
.then((res) => console.log(`✅ [Main] ${i}: ${res}`))
.catch((err) => console.error(`❌ [Main] ${i}:`, err.message));
}🧠 Advanced Usage (Worker Threads)
🛠️ Create a worker file:
src/taskWorker.ts
// src/taskWorker.ts
import { parentPort, workerData } from "worker_threads";
async function run() {
const { input } = workerData;
// Simulate failure for testing retries
if (Math.random() < 0.5) throw new Error("Worker failed randomly");
const result = input * 10;
parentPort?.postMessage(result);
}
run().catch((err) => {
throw err;
});🚀 Run from
examples/worker-thread.ts
import path, { dirname } from "path";
import { fileURLToPath, pathToFileURL } from "url";
import { AsyncQueueRunner } from "async-worker-queue-runner";
const __filename = fileURLToPath(import.meta.url);
const __dirname = dirname(__filename);
const workerPath = pathToFileURL(
path.resolve(__dirname, "../dist/src/taskWorker.js") // compiled .js
).pathname;
const queue = new AsyncQueueRunner<{ input: number }, number>(workerPath, 2, {
useWorker: true,
maxRetries: 3,
backoffMs: (attempt) => attempt * 500,
});
for (let i = 1; i <= 5; i++) {
queue
.add({ input: i })
.then((res) => console.log(`✅ [Worker] ${i}: ${res}`))
.catch((err) => console.error(`❌ [Worker] ${i}:`, err.message));
}🧩 API
new AsyncQueueRunner(handler, concurrency, options?)
| Parameter | Type | Description |
| ----------- | ------------------------------------------------- | ----------------------------------------------------------------------------------------------- |
| handler | string | (data: TInput) => Promise<TResult> | The task handler function to process each task, or a path to a worker script when useWorker is true. |
| concurrency | number | Maximum number of tasks to run concurrently. |
| options | TaskOptions (optional) | Configuration object (see below). |
TaskOptions
| Option | Type | Default | Description |
| ----------- | ---------------------------------- | ------- | ------------------------------------------------------------------------------------------- |
| useWorker | boolean | false | Whether to run tasks in worker threads (requires handler as a path to worker script). |
| maxRetries| number | 3 | Number of times to retry a failed task before rejecting. |
| backoffMs | number | (attempt: number) => number | 1000 | Delay in milliseconds between retries, or a function returning delay based on attempt count.|
🎯 Methods
.add(taskData: TInput): Promise<TResult>
Adds a task to the queue. Returns a promise that resolves with the task result or rejects after retries fail.
🛠️ Example Options Usage
const queue = new AsyncQueueRunner(
async (data) => {
// task processing logic
},
4,
{
useWorker: false,
maxRetries: 5,
backoffMs: (attempt) => attempt * 200, // linear backoff in ms
}
);