@chenpeiyuan/batch-executor
v1.1.11
Published
批量任务执行器
Readme
@chenpeiyuan/batch-executor
批量任务执行器,支持任务的批量添加、执行、暂停、恢复和停止等操作。
安装
# 使用 npm
npm install @chenpeiyuan/batch-executor
# 使用 yarn
yarn add @chenpeiyuan/batch-executor
# 使用 pnpm
pnpm add @chenpeiyuan/batch-executor基本使用
快速开始
import { DefaultBatchExecutor, type Event, type ExecuteTask } from '@chenpeiyuan/batch-executor'
// 1. 定义任务类型
type MyTask = {
id: string
name: string
}
// 2. 定义任务执行函数
const executeTask: ExecuteTask<MyTask, string> = async (task, context) => {
const { wrapExecute } = context
console.log(`处理任务: ${task.name}`)
await wrapExecute(() => new Promise(resolve => setTimeout(resolve, 500)))
return `完成: ${task.name}`
}
// 3. 创建执行器
const executor = new DefaultBatchExecutor<MyTask, Event, string>(executeTask)
// 4. 添加任务
executor.addTask('1', { id: '1', name: '任务1' })
executor.addTask('2', { id: '2', name: '任务2' })
executor.addTask('3', { id: '3', name: '任务3' })
// 5. 监听事件
executor.addListener('start', () => console.log('执行器开始执行'))
executor.addListener('close', () => console.log('执行器执行结束'))
executor.addListener('success', (event) => {
console.log('执行器执行成功', (event as any).result)
})
executor.addListener('failure', (event) => {
console.log('执行器执行失败', (event as any).error)
})
executor.addListener('task:success', (event) => {
console.log(`任务成功: ${(event as any).context.task.name}`)
})
executor.addListener('task:failure', (event) => {
console.log(`任务失败: ${(event as any).context.task.name}`)
})
// 6. 启动执行
executor.start()基本用法
import { DefaultBatchExecutor, type ExecuteTask } from '@chenpeiyuan/batch-executor'
type MyTask = {
id: string
name: string
}
const executeTask: ExecuteTask<MyTask, string> = async (task, context) => {
console.log(`执行任务: ${task.name}`)
return `结果: ${task.name}`
}
const executor = new DefaultBatchExecutor<MyTask, any, string>(executeTask)
// 添加任务
executor.addTask('1', { id: '1', name: '任务1' })
executor.addTask('2', { id: '2', name: '任务2' })
// 启动执行
executor.start()高级功能
任务上下文 (Context)
任务执行函数的第二个参数 context 提供了丰富的控制能力:
import { DefaultBatchExecutor, type ExecuteTask, type Context } from '@chenpeiyuan/batch-executor'
type MyTask = {
id: string
name: string
}
const executeTask: ExecuteTask<MyTask, string> = async (task, context: Context) => {
// 1. wrapExecute - 包装执行,支持暂停和停止
await context.wrapExecute(async () => {
// 这里的代码会响应暂停和停止操作
await someAsyncOperation()
})
// 2. dispatch - 派发自定义事件
context.dispatch('progress', { percent: 50, taskId: task.id })
// 3. stop - 在任务内部停止执行
if (someCondition) {
context.stop()
}
// 4. wrapExecuteList - 嵌套执行子任务列表
const subTasks = [{ id: 'sub1' }, { id: 'sub2' }]
await context.wrapExecuteList(async (subTask) => {
return processSubTask(subTask)
}, subTasks, { delay: 0.1, limit: 1 })
return `完成: ${task.name}`
}嵌套任务
import { DefaultBatchExecutor, type ExecuteTask, type Context } from '@chenpeiyuan/batch-executor'
type ParentTask = {
id: string
name: string
children: ChildTask[]
}
type ChildTask = {
id: string
name: string
}
const executeParentTask: ExecuteTask<ParentTask, string> = async (task, context: Context) => {
console.log(`执行父任务: ${task.name}`)
// 使用 wrapExecuteList 执行子任务
const results: string[] = []
await context.wrapExecuteList(async (childTask) => {
console.log(`执行子任务: ${childTask.name}`)
await new Promise(resolve => setTimeout(resolve, 200))
results.push(`子任务结果: ${childTask.name}`)
return results[results.length - 1]
}, task.children, { delay: 0.1, limit: 1 })
return `父任务结果: ${task.name}, 子任务: ${results.length}个`
}
const executor = new DefaultBatchExecutor<ParentTask, any, string>(executeParentTask)
executor.addTask('1', {
id: '1',
name: '父任务1',
children: [
{ id: '1-1', name: '子任务1-1' },
{ id: '1-2', name: '子任务1-2' }
]
})
executor.start()停止执行
import { DefaultBatchExecutor, type ExecuteTask } from '@chenpeiyuan/batch-executor'
type MyTask = {
id: string
name: string
}
const executeTask: ExecuteTask<MyTask, string> = async (task, context) => {
const { wrapExecute } = context
console.log(`执行任务: ${task.name}`)
// 模拟长时间运行的任务
for (let i = 0; i < 5; i++) {
await wrapExecute(() => new Promise(resolve => setTimeout(resolve, 200)))
console.log(`任务 ${task.name} 执行中... ${i + 1}/5`)
}
return `结果: ${task.name}`
}
const executor = new DefaultBatchExecutor<MyTask, any, string>(executeTask)
executor.addTask('1', { id: '1', name: '任务1' })
executor.addTask('2', { id: '2', name: '任务2' })
executor.addTask('3', { id: '3', name: '任务3' })
executor.start()
// 3秒后停止执行
setTimeout(() => {
console.log('停止执行')
executor.stop()
}, 3000)暂停和恢复
import { DefaultBatchExecutor, type ExecuteTask } from '@chenpeiyuan/batch-executor'
type MyTask = {
id: string
name: string
}
const executeTask: ExecuteTask<MyTask, string> = async (task, context) => {
const { wrapExecute } = context
console.log(`执行任务: ${task.name}`)
for (let i = 0; i < 5; i++) {
await wrapExecute(() => new Promise(resolve => setTimeout(resolve, 200)))
console.log(`任务 ${task.name} 执行中... ${i + 1}/5`)
}
return `结果: ${task.name}`
}
const executor = new DefaultBatchExecutor<MyTask, any, string>(executeTask)
executor.addTask('1', { id: '1', name: '任务1' })
executor.addTask('2', { id: '2', name: '任务2' })
executor.start()
// 2秒后暂停
setTimeout(() => {
console.log('暂停执行')
executor.pause()
}, 2000)
// 5秒后恢复
setTimeout(() => {
console.log('恢复执行')
executor.resume()
}, 5000)配置执行间隔
import { DefaultBatchExecutor, type ExecuteTask } from '@chenpeiyuan/batch-executor'
type MyTask = {
id: string
name: string
}
const executeTask: ExecuteTask<MyTask, string> = async (task) => {
console.log(`执行任务: ${task.name}`)
return `结果: ${task.name}`
}
// 固定间隔
const executor = new DefaultBatchExecutor<MyTask, any, string>(
executeTask,
{ interval: 1 } // 每个任务之间间隔1秒
)
// 动态间隔
const executor2 = new DefaultBatchExecutor<MyTask, any, string>(
executeTask,
{
interval: () => Math.random() * 2 // 每个任务之间随机间隔0-2秒
}
)使用监视器 (Monitor)
import { DefaultBatchExecutor, DefaultMonitor, type ExecuteTask, type Event } from '@chenpeiyuan/batch-executor'
// 自定义监视器
class MyMonitor extends DefaultMonitor<Event> {
override onStart(event: Event) {
console.log('执行器启动', event)
}
override onClose(event: Event) {
console.log('执行器关闭', event)
}
override onTaskSuccess(event: Event) {
console.log('任务成功', (event as any).context.task)
}
override onTaskFailure(event: Event) {
console.log('任务失败', (event as any).error)
}
// 支持自定义事件
onTaskMessage(event: Event) {
console.log('任务消息', (event as any).message)
}
}
const executeTask: ExecuteTask<any, string> = async (task, context) => {
context.dispatch('message', { message: '处理中...' })
return 'done'
}
const executor = new DefaultBatchExecutor<any, Event, string>(executeTask)
executor.addMonitor(new MyMonitor())
executor.addTask('1', { id: '1' })
executor.start()重试执行 (RetriableExecution)
import { RetriableExecution } from '@chenpeiyuan/batch-executor'
// 带重试的执行
const execution = new RetriableExecution(
async () => {
const response = await fetch('https://api.example.com/data')
if (!response.ok) {
throw new Error('请求失败')
}
return response.json()
},
{
limit: 3, // 最多重试3次
delay: 1, // 每次重试间隔1秒
retry: (error, result) => {
// 自定义重试条件
if (error) return true // 有错误时重试
return result === null // 结果为null时重试
}
}
)
const result = await execution.execute()API 文档
DefaultBatchExecutor
构造函数
new DefaultBatchExecutor<Task, Event, Result>(
executeTask: ExecuteTask<Task, Result>,
config?: DefaultBatchExecutorConfig
)配置选项 (DefaultBatchExecutorConfig):
| 属性 | 类型 | 默认值 | 描述 |
|------|------|--------|------|
| interval | number \| (() => number) | 0 | 任务执行间隔(秒) |
| dispatcher | Dispatcher<Event> | new DefaultDispatcher() | 事件调度器 |
| taskManager | TaskManager<Task> | new DefaultTaskManager() | 任务管理器 |
| stopController | StopController | new DefaultStopController() | 停止控制器 |
| startController | StartController | new DefaultStartController() | 启动控制器 |
| pauseResumeController | PauseResumeController | new DefaultPauseResumeController() | 暂停恢复控制器 |
| runner | Runner<Task, Result> | new SequenceRunner() | 任务运行器 |
方法
| 方法 | 返回类型 | 描述 |
|------|----------|------|
| start() | boolean | 启动执行 |
| stop() | boolean | 停止执行 |
| pause() | boolean | 暂停执行 |
| resume() | boolean | 恢复执行 |
| addTask(id, task) | boolean | 添加任务 |
| delTask(id) | boolean | 删除任务 |
| altTask(id, item) | boolean | 更新任务 |
| getTask(id) | Task \| undefined | 获取单个任务 |
| getTasks() | Task[] | 获取所有任务 |
| clearTasks() | void | 清空所有任务 |
| addListener(type, listener) | void | 添加事件监听器 |
| delListener(listener) | void | 移除事件监听器 |
| addMonitor(monitor) | void | 添加监视器 |
| delMonitor(monitor) | void | 移除监视器 |
Context
任务执行上下文,提供以下方法:
| 方法 | 类型 | 描述 |
|------|------|------|
| stop() | () => void | 停止执行后续任务 |
| dispatch(type, data?) | (type: string, data?: object) => void | 派发自定义事件 |
| wrapExecute(execute) | (execute: () => Promise<T>) => Promise<void> | 包装执行,支持暂停/停止 |
| wrapExecuteList(execute, list, config?) | (execute, list, config?) => Promise<void> | 嵌套执行子任务列表 |
事件
执行器事件
| 事件 | 描述 | 事件数据 |
|------|------|----------|
| start | 执行器开始执行 | { taskCount, taskList, startTime } |
| close | 执行器执行结束 | { result, startTime, closeTime, totalTime } |
| success | 执行器执行成功 | { result, startTime, closeTime, totalTime } |
| failure | 执行器执行失败 | { error, startTime, closeTime, totalTime } |
| stop | 执行器停止 | { type: 'stop', time } |
| pause | 执行器暂停 | { type: 'pause', time } |
| resume | 执行器恢复 | { type: 'resume', time } |
任务事件
| 事件 | 描述 | 事件数据 |
|------|------|----------|
| task:start | 任务开始 | { context: { task, index }, startTime } |
| task:success | 任务成功 | { context: { task, index }, result, startTime, closeTime, totalTime } |
| task:failure | 任务失败 | { context: { task, index }, error, startTime, closeTime, totalTime } |
| task:close | 任务结束 | { context: { task, index }, startTime, closeTime, totalTime } |
任务管理事件
| 事件 | 描述 |
|------|------|
| addTask | 添加任务 |
| delTask | 删除任务 |
| altTask | 更新任务 |
导出模块
// 核心类
export { default as DefaultBatchExecutor, type DefaultBatchExecutorConfig } from './DefaultBatchExecutor'
export { default as SequenceRunner, type SequenceRunnerConfig } from './SequenceRunner'
// 控制器
export { default as DefaultStopController } from './DefaultStopController'
export { default as DefaultStartController } from './DefaultStartController'
export { default as DefaultPauseResumeController } from './DefaultPauseResumeController'
// 事件相关
export { default as DefaultDispatcher } from './DefaultDispatcher'
export { default as DefaultMonitor } from './DefaultMonitor'
// 任务管理
export { default as DefaultTaskManager } from './DefaultTaskManager'
// 执行器
export { default as DefaultExecution, type DefaultExecutionConfig } from './DefaultExecution'
export { default as SecureExecution, type SecureExecutionConfig } from './SecureExecution'
export { default as RetriableExecution, type RetriableExecutionConfig } from './RetriableExecution'
export { default as NotifiableExecution, type NotifiableExecutionConfig } from './NotifiableExecution'
// 类型导出
export type * from './types'运行测试
# 运行测试
pnpm test
# 运行测试并监听文件变化
pnpm test:watch许可证
ISC
