wtask
v1.0.1
Published
Lightweight Node.js worker_threads task runner with a simple start API
Downloads
11
Maintainers
Readme
wtask
轻量级 Node.js worker_threads 任务执行器,提供简洁的 start API 与线程池调度。
特性
- 简单 API:
start({ path, name, payload, timeout }) - 线程池:自动选择空闲 Worker,支持渐进式等待
- 零拷贝:通过
move()传递 Transferable 对象(ArrayBuffer、MessagePort) - 回调代理:子线程可通过回调函数向主线程上报进度
安装
npm i wtask
# 或
yarn add wtask快速上手
import { start, move } from 'wtask';
// 1) 在项目中编写任务文件(构建后应为 .js)
// tasks/calc.js
export default function (payload) {
const { a, b } = payload;
return a + b;
}
// 2) 在主线程发起任务
const result = await start({
path: './tasks/calc.js',
payload: { a: 1, b: 2 },
timeout: 3000,
});传输大数据(零拷贝)
const buffer = new ArrayBuffer(1024 * 1024);
const result = await start({
path: './tasks/data-processor.js',
payload: move({ buffer }, [buffer]),
});回调进度上报
// 任务文件 tasks/progress.js
export default async function (payload, report) {
for (let i = 1; i <= 3; i++) {
await new Promise(r => setTimeout(r, 200));
await report({ step: i });
}
return 'done';
}
// 主线程
const res = await start({
path: './tasks/progress.js',
payload: {},
callback: (p) => console.log('progress:', p),
callbackTs: 10_000,
});使用示例(更完整)
- start:基础、move、回调
import { start, move } from 'wtask';
// 基础示例
const basic = await start({
path: './tasks/calc.js',
name: 'add',
payload: { a: 2, b: 3 },
timeout: 3000,
});
// 使用 move 传输 ArrayBuffer(零拷贝)
const buf = new ArrayBuffer(2 ** 20);
const zeroCopy = await start({
path: './tasks/data-processor.js',
payload: move({ buf }, [buf]),
});
// 回调进度(子线程可多次上报)
const withProgress = await start({
path: './tasks/progress.js',
payload: { total: 100 },
callback: (p) => console.log('progress:', p),
callbackTs: 60_000,
});- startParent:在 Worker 内调用主线程(或线程池)执行任务
// 子线程任务文件(例如 tasks/child.js)
import { startParent, move } from 'wtask';
export default async function (payload) {
// 在 Worker 内部,通过主线程执行一个 CPU 密集任务
const tmp = new ArrayBuffer(1024);
const result = await startParent({
path: './tasks/heavy.js',
payload: move({ tmp }, [tmp]),
timeout: 10_000,
});
return { ok: true, result };
}- WorkerPool:自定义线程池与调度
import { WorkerPool, move } from 'wtask';
// 自定义池大小
const pool = new WorkerPool({ maxWorkers: 4, minWorkers: 2 });
// 并发启动多个任务
const buffers = Array.from({ length: 4 }, () => new ArrayBuffer(1 << 20));
const tasks = buffers.map((buffer, i) => pool.start({
path: './tasks/data-processor.js',
name: 'process',
payload: move({ id: i, buffer }, [buffer]),
timeout: 8000,
}));
const results = await Promise.all(tasks);
console.log('results:', results);
// 关闭线程池
pool.destroy();任务文件示例
// tasks/calc.js
export function add({ a, b }) {
return a + b;
}
// tasks/data-processor.js
export function process({ id, buffer }) {
// 使用 buffer.byteLength 做一些处理
return { id, size: buffer.byteLength };
}
// tasks/heavy.js
export default function ({ tmp }) {
// 模拟 CPU 密集计算
let s = 0;
for (let i = 0; i < 5e6; i++) s += i;
return { sum: s, size: tmp.byteLength };
}API 概览
start(option: TaskStartOptions): Promise<any>path: string任务文件路径(构建后 .js)name?: string导出函数名,默认defaultpayload?: any | TaskPayload参数;使用move()可传可转移对象timeout?: number主调用超时(毫秒);0 表示不超时callback?: Function子线程可通过第二个参数调用此回调callbackTs?: number回调代理超时(毫秒),默认 5 分钟
startParent(option: TaskStartOptions): Promise<any>- 与
start相同的参数语义,用于 Worker 环境内通过主线程执行任务
- 与
move<T>(data: T, transferList?: Transferable[]): TaskPayload<T>- 显式传入
transferList或让库递归自动检测(ArrayBuffer、MessagePort) - 注意:被转移的 ArrayBuffer 在发送端会变为 detached
- 显式传入
WorkerPoolnew WorkerPool(opts?: { maxWorkers?: number; minWorkers?: number })start(option: TaskStartOptions): Promise<any>在池内选择空闲 Worker 执行waitFreeNode(opts?): Promise<WorkerNode | null>等待可用节点pickFreeNode(opts?): WorkerNode | null同步挑选节点destroy(): void释放所有 Worker
Timeout 语义
- 范围:从消息送达选定 Worker 起,到收到 Resolve/Reject 回执止
- 不包含:等待空闲节点/扩容的耗时(waitFreeNode 阶段)
- 超时后仅拒绝当前等待,不中断 Worker 内部执行;迟到回执会被忽略
最佳实践
- 任务文件建议使用绝对路径或在构建后指向
.js - 大数据用
move(),避免复制 - 对可预估耗时的任务设置合理
timeout,对长尾/流式任务可设为 0 并在上层控制取消
类型定义(简要)
type TaskStartOptions = {
path: string;
name?: string;
payload?: any | TaskPayload<any>;
timeout?: number;
callback?: Function;
callbackTs?: number;
};许可 ISC
