async-flex-loop
v1.0.0
Published
Flexible async array processing with dynamic concurrency
Maintainers
Readme
async-flex-loop
Flexible async array processing with dynamic concurrency control, retry logic, timeouts, and lifecycle callbacks.
Features
- 🚦 Concurrency control — limit how many tasks run simultaneously
- 🔄 Retry with backoff — automatic retries with exponential backoff
- ⏱️ Per-task timeout — abort tasks that take too long
- ⏸️ Pause / Resume — full lifecycle control at any time
- 📬 Dynamic push — add items while the queue is running
- 📊 Result collection — ordered results like
Promise.allSettled() - 🎯 Lifecycle callbacks —
onError,onTaskComplete,onProgress,onIdle - 🔑 Callback context — callbacks receive the queue instance as
this - 📦 Dual format — ships as both ESM and CJS
Installation
npm install async-flex-loop
# or
yarn add async-flex-loop
# or
bun add async-flex-loopQuick Start
import { AsyncFlexLoop } from "async-flex-loop";
const urls = ["https://api.example.com/1", "https://api.example.com/2", "https://api.example.com/3"];
const queue = new AsyncFlexLoop(
urls,
async (url) => {
const res = await fetch(url);
return res.json();
},
{ concurrency: 2 },
);
await queue.onIdle();
const results = await queue.getResults();
console.log(results); // [data1, data2, data3] — in original orderAPI Reference
Constructor
new AsyncFlexLoop<InputType, ResponseType>(
initialItems: InputType[],
handler: (item: InputType, index: number) => Promise<ResponseType>,
options?: AsyncFlexLoopOptions
)| Parameter | Type | Description |
| -------------- | ----------------------------- | ----------------------------------- |
| initialItems | InputType[] | Items to process |
| handler | (item, index) => Promise<R> | Async function called for each item |
| options | AsyncFlexLoopOptions | Optional configuration (see below) |
Options
| Option | Type | Default | Description |
| ---------------- | ---------- | ----------- | --------------------------------------------------------------- |
| concurrency | number | Infinity | Maximum number of concurrent tasks |
| autoStart | boolean | true | Start processing immediately on construction |
| retry | number | 0 | Number of retry attempts on failure |
| retryDelay | number | 0 | Base delay between retries (ms) |
| retryBackoff | number | 1 | Exponential backoff multiplier (retryDelay * backoff^attempt) |
| delayAfterTask | number | 0 | Delay after each task completes (ms) |
| timeout | number | undefined | Per-task timeout in ms — timed-out tasks are never retried |
| yieldLoop | boolean | true | Yield to event loop between tasks to avoid blocking |
| throwOnError | boolean | true | Reject onIdle() and stop queue on unrecoverable error |
| onError | function | undefined | Called when a task fails (after all retries) |
| onTaskComplete | function | undefined | Called after every task (success or failure) |
| onProgress | function | undefined | Called after each result is recorded |
| onIdle | function | undefined | Called when the queue becomes idle |
Methods
Queue Control
| Method | Description |
| ---------- | ------------------------------------------------------------- |
| start() | Start processing. No-op if already running; resumes if paused |
| pause() | Pause after current in-flight tasks finish |
| resume() | Resume from a paused state |
| clear() | Remove all pending (unstarted) items from the queue |
Adding Items
const indices = queue.push(item1, item2, item3);
// Returns the assigned index for each item- Auto-resumes from
Idlestate when new items are pushed - Respects
Pausedstate — items are queued but not processed
Waiting & Results
| Method | Returns | Description |
| ---------------------------- | ---------------------------------------- | ----------------------------------------------------------- |
| onIdle() | Promise<void> | Resolves when queue is empty and all tasks have completed |
| getResults() | Promise<(ResponseType \| undefined)[]> | Ordered results; undefined for failed tasks |
| getRawResults() | Promise<TaskResult[]> | Full results with error details (like Promise.allSettled) |
| getCompletedResults() | ResponseType[] | Successful values only (does not wait for idle) |
| getCompletedResultsAsync() | Promise<ResponseType[]> | Successful values only (waits for idle) |
Introspection
| Method | Returns | Description |
| --------------------- | ------------------------ | ------------------------------------- |
| getState() | QueueState | Current state enum value |
| isRunning() | boolean | true if state is Processing |
| getPendingCount() | number | Items remaining in the queue |
| getProcessedCount() | number | Items completed (success or failure) |
| getNextItem() | InputType \| undefined | Peek at the next item to be processed |
| getStats() | QueueStats | Processing statistics |
Queue States
import { QueueState } from "async-flex-loop";| State | Description |
| ------------ | ---------------------------------------------- |
| Pending | Created but not yet started |
| Processing | Actively processing items |
| Paused | Paused — in-flight tasks finish, new ones wait |
| Idle | Queue empty and all tasks completed |
Pending → Processing → Idle
↕
PausedExamples
Concurrency Control
const queue = new AsyncFlexLoop(
Array.from({ length: 100 }, (_, i) => i),
async (item) => {
await fetch(`https://api.example.com/items/${item}`);
return item;
},
{ concurrency: 5 }, // max 5 requests at a time
);
await queue.onIdle();Retry with Exponential Backoff
const queue = new AsyncFlexLoop(
["task1", "task2", "task3"],
async (task) => {
const res = await fetch(`https://flaky-api.com/${task}`);
if (!res.ok) throw new Error(`HTTP ${res.status}`);
return res.json();
},
{
retry: 3, // up to 3 retries
retryDelay: 500, // start with 500ms delay
retryBackoff: 2, // double delay each attempt: 500 → 1000 → 2000ms
},
);Per-Task Timeout
import { TimeoutError } from "async-flex-loop";
const queue = new AsyncFlexLoop(items, async (item) => slowOperation(item), {
timeout: 3000, // 3s per task
throwOnError: false, // continue on timeout instead of stopping
onError(error, item, index) {
if (error instanceof TimeoutError) {
console.warn(`Item ${index} timed out`);
}
},
});Pause & Resume
const queue = new AsyncFlexLoop(largeDataset, processItem, { concurrency: 3 });
// Pause after 2 seconds
setTimeout(() => queue.pause(), 2000);
// Resume after another 2 seconds
setTimeout(() => queue.resume(), 4000);
await queue.onIdle();Dynamic Push
const queue = new AsyncFlexLoop([], processItem, { autoStart: false });
// Add items later
queue.push("a", "b", "c");
queue.start();
// Add more while running
setTimeout(() => queue.push("d", "e"), 500);
await queue.onIdle();Callbacks with Queue Access
All callbacks receive the AsyncFlexLoop instance as this, giving access to queue methods:
const queue = new AsyncFlexLoop(items, processItem, {
throwOnError: false,
onError(error, item, index) {
console.error(`[${index}] Failed: ${error.message}`);
console.log(`${this.getPendingCount()} items remaining`);
// Pause if too many errors
const stats = this.getStats();
if (stats.totalFailed > 5) {
this.pause();
}
},
onIdle() {
const stats = this.getStats();
console.log(`Done! Success rate: ${(stats.successRate * 100).toFixed(1)}%`);
},
});⚠️ Use regular
functionsyntax (not arrow functions) to accessthis.
Collect Results
const queue = new AsyncFlexLoop([1, 2, 3, 4, 5], async (n) => n * 2, {
throwOnError: false,
});
// Wait and get all results (undefined for failures)
const results = await queue.getResults();
// [2, 4, 6, 8, 10]
// Get raw results with status (like Promise.allSettled)
const raw = await queue.getRawResults();
// [
// { status: "fulfilled", value: 2, index: 0 },
// { status: "rejected", reason: Error, item: 2, index: 1 },
// ...
// ]
// Get only successful values (no waiting)
const successes = queue.getCompletedResults();Statistics
await queue.onIdle();
const stats = queue.getStats();
console.log(stats);
// {
// totalProcessed: 100,
// totalSuccess: 97,
// totalFailed: 3,
// avgProcessingTime: 142, // ms
// successRate: 0.97,
// }Error Types
import { TimeoutError, MaxRetryError, QueueAbortError } from "async-flex-loop";| Error | When thrown |
| ----------------- | ------------------------------------------------------------------------------------ |
| TimeoutError | Task exceeds timeout ms. Has .item, .index, .timeoutMs |
| MaxRetryError | Task fails after all retries. Has .item, .index, .retryCount, .originalError |
| QueueAbortError | Queue is manually aborted |
Advanced Utilities
These are exported for advanced use cases:
import { delay, yieldToEventLoop, calculateRetryDelay, withTimeout } from "async-flex-loop";
// Sleep for N milliseconds
await delay(500);
// Yield control back to the event loop
await yieldToEventLoop();
// Calculate backoff delay for a given attempt
const ms = calculateRetryDelay(baseDelay, backoff, attemptNumber);
// Wrap a promise with a timeout
const result = await withTimeout(myPromise, 3000, item, index);TypeScript
async-flex-loop is written in TypeScript and ships with full type declarations.
import type { AsyncFlexLoopOptions, TaskResult, QueueItem, QueueStats } from "async-flex-loop";
// Fully typed
const queue = new AsyncFlexLoop<string, { id: number }>(
["a", "b", "c"],
async (item): Promise<{ id: number }> => ({ id: item.charCodeAt(0) }),
{ concurrency: 2 },
);License
MIT © TanMacDuc
