@fideus-labs/worker-pool
v1.0.0
Published
A Web Worker pool with bounded concurrency. ChunkQueue-compatible add/onIdle and batch runTasks/cancel interfaces.
Maintainers
Readme
@fideus-labs/worker-pool
A Web Worker pool with bounded concurrency, plus a companion @fideus-labs/fizarrita package that accelerates zarrita codec operations on Web Workers.
Features
- Bounded concurrency — at most
poolSizeworkers run simultaneously. - Worker recycling — workers are reused (LIFO) across tasks instead of being re-created.
- ChunkQueue interface —
add()+onIdle(), compatible with zarrita.js and p-queue patterns. - Batch interface —
runTasks()with progress reporting and cancellation. - Zero runtime dependencies.
Installation
pnpm add @fideus-labs/worker-poolUsage
Task function contract
Every task function receives an available Worker (or null when the pool
needs a new worker created) and must return an object with the worker to
recycle and the result:
type WorkerPoolTask<T> = (
worker: Worker | null
) => Promise<{ worker: Worker; result: T }>ChunkQueue interface (add / onIdle)
import { WorkerPool } from '@fideus-labs/worker-pool'
const workerUrl = new URL('./my-worker.js', import.meta.url).href
function createTask(input: number) {
return (worker: Worker | null) => {
const w = worker ?? new Worker(workerUrl, { type: 'module' })
return new Promise<{ worker: Worker; result: number }>((resolve) => {
w.onmessage = (e) => resolve({ worker: w, result: e.data })
w.postMessage(input)
})
}
}
const pool = new WorkerPool(4) // 4 concurrent workers
pool.add(createTask(1))
pool.add(createTask(2))
pool.add(createTask(3))
const results = await pool.onIdle<number>()
// results: [result1, result2, result3] — in add() order
pool.terminateWorkers()Batch interface (runTasks)
Submit an array of tasks at once with optional progress reporting and cancellation:
const pool = new WorkerPool(2)
const tasks = inputs.map((input) => createTask(input))
const { promise, runId } = pool.runTasks(tasks, (completed, total) => {
console.log(`${completed}/${total}`)
})
// Cancel if needed:
// pool.cancel(runId)
const results = await promise
pool.terminateWorkers()API
new WorkerPool(poolSize: number)
Create a pool with at most poolSize concurrent workers.
pool.add<T>(fn: WorkerPoolTask<T>): void
Enqueue a task. Tasks are started when onIdle() is called.
pool.onIdle<T>(): Promise<T[]>
Execute all enqueued tasks and wait for completion. Returns results in the order tasks were added.
pool.runTasks<T>(taskFns, progressCallback?): { promise, runId }
Submit a batch of tasks. The promise resolves with ordered results. The
optional progressCallback is invoked as
(completedTasks: number, totalTasks: number) => void after each task
completes.
pool.cancel(runId: number): void
Cancel a pending runTasks batch. The promise rejects with
'Remaining tasks canceled'.
pool.terminateWorkers(): void
Terminate all idle workers. The pool can still be used after this — new workers will be created as needed.
zarrita.js Integration
The @fideus-labs/fizarrita package provides getWorker and setWorker as
drop-in replacements for zarrita's get and set, offloading codec
encode/decode to Web Workers via the worker pool.
Installation
pnpm add @fideus-labs/fizarrita @fideus-labs/worker-pool zarritaBasic usage
import { WorkerPool } from '@fideus-labs/worker-pool'
import { getWorker, setWorker } from '@fideus-labs/fizarrita'
import * as zarr from 'zarrita'
const pool = new WorkerPool(4)
// Open an array
const store = new zarr.FetchStore('https://example.com/data.zarr')
const arr = await zarr.open(store, { kind: 'array' })
// Read with codec decode offloaded to workers
const chunk = await getWorker(arr, null, { pool })
// Write with codec encode offloaded to workers
await setWorker(arr, null, chunk, { pool })
pool.terminateWorkers()SharedArrayBuffer support
Both getWorker and setWorker support a useSharedArrayBuffer option for
additional performance:
// Read — output allocated on SharedArrayBuffer, workers decode directly
// into shared memory (eliminates one transfer + one copy per chunk)
const chunk = await getWorker(arr, null, {
pool,
useSharedArrayBuffer: true,
})
// chunk.data.buffer instanceof SharedArrayBuffer === true
// The chunk can be shared with other workers without copying.
// Write — intermediate buffers use SharedArrayBuffer for zero-transfer
// sharing between main thread and codec workers
await setWorker(arr, null, chunk, {
pool,
useSharedArrayBuffer: true,
})getWorker with SAB:
- Output TypedArray is backed by
SharedArrayBuffer - Codec workers decode chunks AND write directly into the shared output
buffer via the
decode_intomessage protocol - Eliminates 1 ArrayBuffer transfer (worker to main) and 1 main-thread
set_from_chunkcopy per chunk - Fill-value chunks are still handled on the main thread
setWorker with SAB:
- Intermediate chunk buffers for partial updates use
SharedArrayBuffer - Reduces ArrayBuffer transfers between main thread and codec workers during the decode-modify-encode cycle
COOP/COEP headers
SharedArrayBuffer requires the page to be served with these HTTP headers:
Cross-Origin-Opener-Policy: same-origin
Cross-Origin-Embedder-Policy: require-corpIf these headers are missing, useSharedArrayBuffer: true will throw with a
descriptive error message.
Vite example:
// vite.config.ts
export default defineConfig({
server: {
headers: {
'Cross-Origin-Opener-Policy': 'same-origin',
'Cross-Origin-Embedder-Policy': 'require-corp',
},
},
})getWorker options
| Option | Type | Description |
|---|---|---|
| pool | WorkerPool | Required. The worker pool to use. |
| workerUrl | string \| URL | URL of the codec worker script. Uses built-in default if omitted. |
| opts | StoreOpts | Pass-through options for the store's get method (e.g., RequestInit). |
| useSharedArrayBuffer | boolean | Allocate output on SharedArrayBuffer with decode-into-shared optimization. |
setWorker options
| Option | Type | Description |
|---|---|---|
| pool | WorkerPool | Required. The worker pool to use. |
| workerUrl | string \| URL | URL of the codec worker script. Uses built-in default if omitted. |
| useSharedArrayBuffer | boolean | Use SharedArrayBuffer for intermediate chunk buffers during partial updates. |
Worker message protocol
The codec worker handles four message types:
| Request | Response | Description |
|---|---|---|
| init | init_ok | Register codec metadata (sent once per worker per unique array config) |
| decode | decoded | Decode raw bytes, transfer decoded ArrayBuffer back |
| decode_into | decode_into_ok | Decode raw bytes and write directly into SharedArrayBuffer output |
| encode | encoded | Encode chunk data, transfer encoded bytes back |
Benchmark
The repository includes a benchmark app that compares vanilla zarrita get/set
with getWorker/setWorker (with and without SharedArrayBuffer):
pnpm bench
# Opens at http://localhost:5174The benchmark supports both synthetic in-memory arrays and remote OME-Zarr datasets from AWS S3.
Development
pnpm install
pnpm dev # Start test app dev server (port 5173)
pnpm bench # Start benchmark app (port 5174)
pnpm test # Run Playwright browser tests
pnpm test:ui # Interactive Playwright UI