@gantryland/task-combinators
v0.5.0
Published
Task.pipe operator factories for @gantryland/task
Readme
@gantryland/task-combinators
Composable operators for Task.pipe(...) pipelines.
All combinators preserve plain async signatures: (...args) => Promise<T>.
Installation
npm install @gantryland/task @gantryland/task-combinatorsQuick Start
import { Task } from "@gantryland/task";
import { map, retry, timeout } from "@gantryland/task-combinators";
type User = { id: string; active: boolean };
const usersTask = new Task<User[]>(() =>
fetch("/api/users").then((r) => r.json()),
).pipe(
map((users) => users.filter((u) => u.active)),
retry(2),
timeout(5_000),
);
await usersTask.run();Exports
| Export | Kind | What it does |
| --- | --- | --- |
| TimeoutError | Error class | Timeout failure type used by timeout. |
| map | Value transform | Maps successful values. |
| flatMap | Async transform | Chains async transforms from successful values. |
| tap | Side effect | Runs side effects on success and returns original value. |
| tapError | Error side effect | Runs side effects for non-abort errors, then rethrows. |
| tapAbort | Abort side effect | Runs side effects only for AbortError, then rethrows. |
| mapError | Error transform | Maps non-abort errors to a new Error. |
| catchError | Recovery | Recovers from non-abort failures with fallback value or async fallback. |
| retry | Retry policy | Retries non-abort failures for a fixed retry count. |
| retryWhen | Retry policy | Retries while a predicate allows it. |
| backoff | Retry policy | Retries with fixed or computed delay strategy. |
| timeout | Timeout policy | Rejects with TimeoutError after the configured deadline. |
| timeoutWith | Timeout fallback | Runs fallback only on timeout failures. |
| zip | Coordination | Runs task functions in parallel and resolves tuple results. |
| race | Coordination | Settles with the first result or error. |
| sequence | Coordination | Runs task functions sequentially and resolves tuple results. |
| debounce | Scheduling | Runs only the latest call in a debounce window. |
| throttle | Scheduling | Reuses first in-window in-flight promise. |
| queue | Scheduling | Limits concurrent executions with optional concurrency. |
API Reference
TaskFn<T, Args> represents (...args: Args) => Promise<T>.
These combinators are designed to plug into Task.pipe(...).
Core Composition
| Export | Signature | Description |
| --- | --- | --- |
| map | map(fn) | Maps successful values. |
| flatMap | flatMap(fn) | Chains sync or async value transforms. |
| tap | tap(fn) | Runs success side effects and returns original value. |
Error Handling
| Export | Signature | Description |
| --- | --- | --- |
| tapError | tapError(fn) | Runs side effects for non-abort errors, then rethrows. |
| tapAbort | tapAbort(fn) | Runs side effects for AbortError, then rethrows. |
| mapError | mapError(fn) | Maps non-abort errors to a new Error. |
| catchError | catchError(fallback) | Recovers from non-abort errors with value or async fallback. |
Retry And Timeouts
| Export | Signature | Description |
| --- | --- | --- |
| retry | retry(attempts, options?) | Retries on non-abort errors; attempts is retry count. |
| retryWhen | retryWhen(shouldRetry, options?) | Retries while predicate returns true. |
| backoff | backoff(options) | Retry helper using fixed or computed delay. |
| timeout | timeout(ms) | Rejects with TimeoutError after ms. |
| timeoutWith | timeoutWith(ms, fallback) | Uses fallback only when timeout occurs. |
| TimeoutError | new TimeoutError(message?) | Error type thrown by timeout. |
retry options:
{ onRetry?: (err: unknown, attempt: number) => void }attempts must be a non-negative finite integer.
retryWhen options:
{
maxAttempts?: number;
delayMs?: (attempt: number, err: unknown) => number;
onRetry?: (err: unknown, attempt: number) => void;
}maxAttemptsmust be a non-negative finite integer when provided.delayMsreturn values must be non-negative finite numbers.
backoff options:
{
attempts: number;
delayMs: number | ((attempt: number, err: unknown) => number);
shouldRetry?: (err: unknown) => boolean;
}attemptsmust be a non-negative finite integer.delayMsvalues must be non-negative finite numbers.
timeout constraints:
msmust be a non-negative finite number.
Coordination And Scheduling
| Export | Signature | Description |
| --- | --- | --- |
| zip | zip(...taskFns) | Runs task functions in parallel and resolves tuple results. |
| race | race(...taskFns) | Settles with first result or error. |
| sequence | sequence(...taskFns) | Runs task functions sequentially and resolves tuple results. |
| debounce | debounce({ waitMs }) | Runs only the latest call in a debounce window. |
| throttle | throttle({ windowMs }) | Reuses first in-window in-flight promise. |
| queue | queue({ concurrency? }) | Limits concurrent executions, default 1. |
debounce and throttle constraints:
waitMsmust be a non-negative finite number.windowMsmust be a non-negative finite number.
queue concurrency must be a positive finite integer.
Practical Use Cases
Example: Harden a Network Request
import { Task } from "@gantryland/task";
import { retry, timeout } from "@gantryland/task-combinators";
const getUsers = new Task(() => fetch("/api/users").then((r) => r.json())).pipe(
retry(2),
timeout(4_000),
);Example: Debounced Search
import { debounce } from "@gantryland/task-combinators";
const searchUsers = debounce<{ id: string }[], [string]>({ waitMs: 250 })(
(q) => fetch(`/api/users?q=${encodeURIComponent(q)}`).then((r) => r.json()),
);Example: Controlled Background Work
import { queue } from "@gantryland/task-combinators";
const syncItem = queue<void, [string]>({ concurrency: 2 })(async (id) => {
await fetch(`/api/sync/${id}`, { method: "POST" });
});Runtime Semantics
AbortErroris treated as cancellation and is never swallowed.timeoutcontrols only the wrapper promise boundary and does not abort transport.timeoutWithruns fallback only for timeout failures.retryandretryWhennormalize terminal non-Errorfailures.debounce,throttle, andqueueare promise-level schedulers.- For
throttle, in-window calls reuse the active in-flight promise and do not apply newer args.
