npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

@batchactions/core

v0.0.6

Published

TypeScript batch job engine for ETL and bulk data processing with state management, retries, pause/resume, and lifecycle events

Readme

@batchactions/core

Core batch processing engine for the @batchactions ecosystem.

Use this package when you need low-level control of batch orchestration, state transitions, events, and infrastructure adapters.

Install

npm install @batchactions/core

What You Get

  • BatchEngine for streaming batch execution
  • Job lifecycle control: start, pause, resume, abort, process chunk
  • Domain events for job, batch, and record progress
  • Built-in sources: BufferSource, FilePathSource, StreamSource, UrlSource
  • Built-in state stores: InMemoryStateStore, FileStateStore
  • Distributed contracts: DistributedStateStore, BatchReservation, helpers

Quick Start

import { BatchEngine, BufferSource, InMemoryStateStore } from '@batchactions/core';

const engine = new BatchEngine({
  stateStore: new InMemoryStateStore(),
  batchSize: 100,
  maxConcurrentBatches: 2,
  continueOnError: true,
});

engine.from(new BufferSource(JSON.stringify([{ id: 1 }, { id: 2 }])), {
  async *parse(chunk) {
    const rows = JSON.parse(chunk.toString()) as Array<Record<string, unknown>>;
    for (const row of rows) {
      yield row;
    }
  },
});

await engine.start(async (record) => {
  await processRecord(record);
});

In-Memory Records

When your data is already in memory (database results, API responses, etc.), skip the source/parser pipeline entirely:

import { BatchEngine } from '@batchactions/core';

const accounts = [
  { id: 'acc-1', channel: 'email', status: 'active' },
  { id: 'acc-2', channel: 'sms', status: 'active' },
];

const engine = new BatchEngine({
  batchSize: 50,
  maxConcurrentBatches: 4,
  continueOnError: true,
  maxRetries: 2,
});

engine.fromRecords(accounts);

await engine.start(async (record) => {
  await healthCheck(record);
});

Also works with async generators for lazy evaluation:

async function* fetchPages(): AsyncIterable<Record<string, unknown>> {
  let page = 1;
  while (true) {
    const results = await api.getPage(page++);
    if (results.length === 0) break;
    for (const item of results) yield item;
  }
}

engine.fromRecords(fetchPages());

Serverless / Chunked Processing

Use processChunk() with restore() to process records incrementally across multiple invocations — ideal for serverless environments with execution time limits (AWS Lambda, Vercel Functions, Cloudflare Workers).

Basic Pattern

import { BatchEngine, InMemoryStateStore } from '@batchactions/core';

const stateStore = new InMemoryStateStore(); // or FileStateStore, SequelizeStateStore, etc.

export async function handler(event: { jobId?: string }) {
  let engine: BatchEngine;

  if (event.jobId) {
    // Subsequent invocation: restore from persisted state
    const restored = await BatchEngine.restore(event.jobId, { stateStore, batchSize: 100 });
    if (!restored) throw new Error('Job not found');
    engine = restored;
  } else {
    // First invocation: create a new engine
    engine = new BatchEngine({ stateStore, batchSize: 100, continueOnError: true });
  }

  // The source must be re-fed on every invocation (it will be re-streamed)
  engine.from(source, parser);

  const result = await engine.processChunk(
    async (record) => { await processRecord(record); },
    { maxDurationMs: 25_000 },  // stop before Lambda's 30s timeout
  );

  if (!result.done) {
    // Schedule the next invocation with the job ID
    await scheduleNext({ jobId: result.jobId });
  }

  return { done: result.done, processed: result.totalProcessed };
}

maxRecords vs maxDurationMs

| Option | When to use | Boundary behavior | |---|---|---| | maxRecords | Predictable chunk sizes (e.g., "process 500 records per invocation") | Stops after completing the batch that crosses the record limit | | maxDurationMs | Time-limited environments (e.g., "stop before Lambda timeout") | Checks at batch boundaries — actual duration may exceed the limit by one batch |

Both options can be combined. The chunk stops when either limit is reached.

Important Notes

  • Re-streaming requirement: The source must be re-fed on every invocation via from() or fromRecords(). The engine re-streams from the beginning and skips completed batches automatically.
  • Batch-level boundaries: Chunk limits are checked between batches, not between records. A batch always completes before the chunk stops. Use a smaller batchSize for finer-grained control.
  • Use a persistent StateStore: InMemoryStateStore loses state between process restarts. For production serverless use, pair with @batchactions/state-sequelize or @batchactions/state-prisma.
  • done flag: The ChunkResult.done property reliably indicates whether all records have been processed. Progress metrics (totalRecords, percentage) may show partial totals until the source has been fully consumed.

Main Exports

  • BatchEngine
  • BatchSplitter
  • EventBus, JobContext
  • JobStatus, BatchStatus
  • BufferSource, FilePathSource, StreamSource, UrlSource
  • InMemoryStateStore, FileStateStore
  • isDistributedStateStore

For full typed exports, see packages/core/src/index.ts.

Compatibility

  • Node.js >= 20.0.0

Related Packages

  • @batchactions/import: high-level import facade
  • @batchactions/distributed: multi-worker orchestration
  • @batchactions/state-sequelize: SQL persistence adapter (Sequelize)
  • @batchactions/state-prisma: SQL persistence adapter (Prisma v6/v7)

Links

  • Repository: https://github.com/vgpastor/batchactions/tree/main/packages/core
  • Issues: https://github.com/vgpastor/batchactions/issues
  • Contributing guide: https://github.com/vgpastor/batchactions/blob/main/CONTRIBUTING.md

License

MIT