npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

@chenpeiyuan/batch-executor

v1.1.11

Published

批量任务执行器

Readme

@chenpeiyuan/batch-executor

批量任务执行器,支持任务的批量添加、执行、暂停、恢复和停止等操作。

安装

# 使用 npm
npm install @chenpeiyuan/batch-executor

# 使用 yarn
yarn add @chenpeiyuan/batch-executor

# 使用 pnpm
pnpm add @chenpeiyuan/batch-executor

基本使用

快速开始

import { DefaultBatchExecutor, type Event, type ExecuteTask } from '@chenpeiyuan/batch-executor'

// 1. 定义任务类型
type MyTask = {
  id: string
  name: string
}

// 2. 定义任务执行函数
const executeTask: ExecuteTask<MyTask, string> = async (task, context) => {
  const { wrapExecute } = context
  console.log(`处理任务: ${task.name}`)
  await wrapExecute(() => new Promise(resolve => setTimeout(resolve, 500)))
  return `完成: ${task.name}`
}

// 3. 创建执行器
const executor = new DefaultBatchExecutor<MyTask, Event, string>(executeTask)

// 4. 添加任务
executor.addTask('1', { id: '1', name: '任务1' })
executor.addTask('2', { id: '2', name: '任务2' })
executor.addTask('3', { id: '3', name: '任务3' })

// 5. 监听事件
executor.addListener('start', () => console.log('执行器开始执行'))
executor.addListener('close', () => console.log('执行器执行结束'))
executor.addListener('success', (event) => {
  console.log('执行器执行成功', (event as any).result)
})
executor.addListener('failure', (event) => {
  console.log('执行器执行失败', (event as any).error)
})
executor.addListener('task:success', (event) => {
  console.log(`任务成功: ${(event as any).context.task.name}`)
})
executor.addListener('task:failure', (event) => {
  console.log(`任务失败: ${(event as any).context.task.name}`)
})

// 6. 启动执行
executor.start()

基本用法

import { DefaultBatchExecutor, type ExecuteTask } from '@chenpeiyuan/batch-executor'

type MyTask = {
  id: string
  name: string
}

const executeTask: ExecuteTask<MyTask, string> = async (task, context) => {
  console.log(`执行任务: ${task.name}`)
  return `结果: ${task.name}`
}

const executor = new DefaultBatchExecutor<MyTask, any, string>(executeTask)

// 添加任务
executor.addTask('1', { id: '1', name: '任务1' })
executor.addTask('2', { id: '2', name: '任务2' })

// 启动执行
executor.start()

高级功能

任务上下文 (Context)

任务执行函数的第二个参数 context 提供了丰富的控制能力:

import { DefaultBatchExecutor, type ExecuteTask, type Context } from '@chenpeiyuan/batch-executor'

type MyTask = {
  id: string
  name: string
}

const executeTask: ExecuteTask<MyTask, string> = async (task, context: Context) => {
  // 1. wrapExecute - 包装执行,支持暂停和停止
  await context.wrapExecute(async () => {
    // 这里的代码会响应暂停和停止操作
    await someAsyncOperation()
  })

  // 2. dispatch - 派发自定义事件
  context.dispatch('progress', { percent: 50, taskId: task.id })

  // 3. stop - 在任务内部停止执行
  if (someCondition) {
    context.stop()
  }

  // 4. wrapExecuteList - 嵌套执行子任务列表
  const subTasks = [{ id: 'sub1' }, { id: 'sub2' }]
  await context.wrapExecuteList(async (subTask) => {
    return processSubTask(subTask)
  }, subTasks, { delay: 0.1, limit: 1 })

  return `完成: ${task.name}`
}

嵌套任务

import { DefaultBatchExecutor, type ExecuteTask, type Context } from '@chenpeiyuan/batch-executor'

type ParentTask = {
  id: string
  name: string
  children: ChildTask[]
}

type ChildTask = {
  id: string
  name: string
}

const executeParentTask: ExecuteTask<ParentTask, string> = async (task, context: Context) => {
  console.log(`执行父任务: ${task.name}`)
  
  // 使用 wrapExecuteList 执行子任务
  const results: string[] = []
  await context.wrapExecuteList(async (childTask) => {
    console.log(`执行子任务: ${childTask.name}`)
    await new Promise(resolve => setTimeout(resolve, 200))
    results.push(`子任务结果: ${childTask.name}`)
    return results[results.length - 1]
  }, task.children, { delay: 0.1, limit: 1 })
  
  return `父任务结果: ${task.name}, 子任务: ${results.length}个`
}

const executor = new DefaultBatchExecutor<ParentTask, any, string>(executeParentTask)

executor.addTask('1', {
  id: '1',
  name: '父任务1',
  children: [
    { id: '1-1', name: '子任务1-1' },
    { id: '1-2', name: '子任务1-2' }
  ]
})

executor.start()

停止执行

import { DefaultBatchExecutor, type ExecuteTask } from '@chenpeiyuan/batch-executor'

type MyTask = {
  id: string
  name: string
}

const executeTask: ExecuteTask<MyTask, string> = async (task, context) => {
  const { wrapExecute } = context
  console.log(`执行任务: ${task.name}`)
  
  // 模拟长时间运行的任务
  for (let i = 0; i < 5; i++) {
    await wrapExecute(() => new Promise(resolve => setTimeout(resolve, 200)))
    console.log(`任务 ${task.name} 执行中... ${i + 1}/5`)
  }
  
  return `结果: ${task.name}`
}

const executor = new DefaultBatchExecutor<MyTask, any, string>(executeTask)

executor.addTask('1', { id: '1', name: '任务1' })
executor.addTask('2', { id: '2', name: '任务2' })
executor.addTask('3', { id: '3', name: '任务3' })

executor.start()

// 3秒后停止执行
setTimeout(() => {
  console.log('停止执行')
  executor.stop()
}, 3000)

暂停和恢复

import { DefaultBatchExecutor, type ExecuteTask } from '@chenpeiyuan/batch-executor'

type MyTask = {
  id: string
  name: string
}

const executeTask: ExecuteTask<MyTask, string> = async (task, context) => {
  const { wrapExecute } = context
  console.log(`执行任务: ${task.name}`)
  
  for (let i = 0; i < 5; i++) {
    await wrapExecute(() => new Promise(resolve => setTimeout(resolve, 200)))
    console.log(`任务 ${task.name} 执行中... ${i + 1}/5`)
  }
  
  return `结果: ${task.name}`
}

const executor = new DefaultBatchExecutor<MyTask, any, string>(executeTask)

executor.addTask('1', { id: '1', name: '任务1' })
executor.addTask('2', { id: '2', name: '任务2' })

executor.start()

// 2秒后暂停
setTimeout(() => {
  console.log('暂停执行')
  executor.pause()
}, 2000)

// 5秒后恢复
setTimeout(() => {
  console.log('恢复执行')
  executor.resume()
}, 5000)

配置执行间隔

import { DefaultBatchExecutor, type ExecuteTask } from '@chenpeiyuan/batch-executor'

type MyTask = {
  id: string
  name: string
}

const executeTask: ExecuteTask<MyTask, string> = async (task) => {
  console.log(`执行任务: ${task.name}`)
  return `结果: ${task.name}`
}

// 固定间隔
const executor = new DefaultBatchExecutor<MyTask, any, string>(
  executeTask,
  { interval: 1 } // 每个任务之间间隔1秒
)

// 动态间隔
const executor2 = new DefaultBatchExecutor<MyTask, any, string>(
  executeTask,
  { 
    interval: () => Math.random() * 2 // 每个任务之间随机间隔0-2秒
  }
)

使用监视器 (Monitor)

import { DefaultBatchExecutor, DefaultMonitor, type ExecuteTask, type Event } from '@chenpeiyuan/batch-executor'

// 自定义监视器
class MyMonitor extends DefaultMonitor<Event> {
  override onStart(event: Event) {
    console.log('执行器启动', event)
  }

  override onClose(event: Event) {
    console.log('执行器关闭', event)
  }

  override onTaskSuccess(event: Event) {
    console.log('任务成功', (event as any).context.task)
  }

  override onTaskFailure(event: Event) {
    console.log('任务失败', (event as any).error)
  }

  // 支持自定义事件
  onTaskMessage(event: Event) {
    console.log('任务消息', (event as any).message)
  }
}

const executeTask: ExecuteTask<any, string> = async (task, context) => {
  context.dispatch('message', { message: '处理中...' })
  return 'done'
}

const executor = new DefaultBatchExecutor<any, Event, string>(executeTask)
executor.addMonitor(new MyMonitor())

executor.addTask('1', { id: '1' })
executor.start()

重试执行 (RetriableExecution)

import { RetriableExecution } from '@chenpeiyuan/batch-executor'

// 带重试的执行
const execution = new RetriableExecution(
  async () => {
    const response = await fetch('https://api.example.com/data')
    if (!response.ok) {
      throw new Error('请求失败')
    }
    return response.json()
  },
  {
    limit: 3,        // 最多重试3次
    delay: 1,        // 每次重试间隔1秒
    retry: (error, result) => {
      // 自定义重试条件
      if (error) return true  // 有错误时重试
      return result === null  // 结果为null时重试
    }
  }
)

const result = await execution.execute()

API 文档

DefaultBatchExecutor

构造函数

new DefaultBatchExecutor<Task, Event, Result>(
  executeTask: ExecuteTask<Task, Result>,
  config?: DefaultBatchExecutorConfig
)

配置选项 (DefaultBatchExecutorConfig):

| 属性 | 类型 | 默认值 | 描述 | |------|------|--------|------| | interval | number \| (() => number) | 0 | 任务执行间隔(秒) | | dispatcher | Dispatcher<Event> | new DefaultDispatcher() | 事件调度器 | | taskManager | TaskManager<Task> | new DefaultTaskManager() | 任务管理器 | | stopController | StopController | new DefaultStopController() | 停止控制器 | | startController | StartController | new DefaultStartController() | 启动控制器 | | pauseResumeController | PauseResumeController | new DefaultPauseResumeController() | 暂停恢复控制器 | | runner | Runner<Task, Result> | new SequenceRunner() | 任务运行器 |

方法

| 方法 | 返回类型 | 描述 | |------|----------|------| | start() | boolean | 启动执行 | | stop() | boolean | 停止执行 | | pause() | boolean | 暂停执行 | | resume() | boolean | 恢复执行 | | addTask(id, task) | boolean | 添加任务 | | delTask(id) | boolean | 删除任务 | | altTask(id, item) | boolean | 更新任务 | | getTask(id) | Task \| undefined | 获取单个任务 | | getTasks() | Task[] | 获取所有任务 | | clearTasks() | void | 清空所有任务 | | addListener(type, listener) | void | 添加事件监听器 | | delListener(listener) | void | 移除事件监听器 | | addMonitor(monitor) | void | 添加监视器 | | delMonitor(monitor) | void | 移除监视器 |

Context

任务执行上下文,提供以下方法:

| 方法 | 类型 | 描述 | |------|------|------| | stop() | () => void | 停止执行后续任务 | | dispatch(type, data?) | (type: string, data?: object) => void | 派发自定义事件 | | wrapExecute(execute) | (execute: () => Promise<T>) => Promise<void> | 包装执行,支持暂停/停止 | | wrapExecuteList(execute, list, config?) | (execute, list, config?) => Promise<void> | 嵌套执行子任务列表 |

事件

执行器事件

| 事件 | 描述 | 事件数据 | |------|------|----------| | start | 执行器开始执行 | { taskCount, taskList, startTime } | | close | 执行器执行结束 | { result, startTime, closeTime, totalTime } | | success | 执行器执行成功 | { result, startTime, closeTime, totalTime } | | failure | 执行器执行失败 | { error, startTime, closeTime, totalTime } | | stop | 执行器停止 | { type: 'stop', time } | | pause | 执行器暂停 | { type: 'pause', time } | | resume | 执行器恢复 | { type: 'resume', time } |

任务事件

| 事件 | 描述 | 事件数据 | |------|------|----------| | task:start | 任务开始 | { context: { task, index }, startTime } | | task:success | 任务成功 | { context: { task, index }, result, startTime, closeTime, totalTime } | | task:failure | 任务失败 | { context: { task, index }, error, startTime, closeTime, totalTime } | | task:close | 任务结束 | { context: { task, index }, startTime, closeTime, totalTime } |

任务管理事件

| 事件 | 描述 | |------|------| | addTask | 添加任务 | | delTask | 删除任务 | | altTask | 更新任务 |

导出模块

// 核心类
export { default as DefaultBatchExecutor, type DefaultBatchExecutorConfig } from './DefaultBatchExecutor'
export { default as SequenceRunner, type SequenceRunnerConfig } from './SequenceRunner'

// 控制器
export { default as DefaultStopController } from './DefaultStopController'
export { default as DefaultStartController } from './DefaultStartController'
export { default as DefaultPauseResumeController } from './DefaultPauseResumeController'

// 事件相关
export { default as DefaultDispatcher } from './DefaultDispatcher'
export { default as DefaultMonitor } from './DefaultMonitor'

// 任务管理
export { default as DefaultTaskManager } from './DefaultTaskManager'

// 执行器
export { default as DefaultExecution, type DefaultExecutionConfig } from './DefaultExecution'
export { default as SecureExecution, type SecureExecutionConfig } from './SecureExecution'
export { default as RetriableExecution, type RetriableExecutionConfig } from './RetriableExecution'
export { default as NotifiableExecution, type NotifiableExecutionConfig } from './NotifiableExecution'

// 类型导出
export type * from './types'

运行测试

# 运行测试
pnpm test

# 运行测试并监听文件变化
pnpm test:watch

许可证

ISC