npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

chronobatch

v1.1.1

Published

A Spring Batch-inspired batch processing framework for Node.js

Downloads

362

Readme

ChronoBatch

A reliable batch processing framework for Node.js — chunk-oriented execution, automatic resume on failure, and per-chunk transactions. Built with TypeScript and PostgreSQL.


Table of Contents


Overview

chronobatch processes large datasets reliably using a chunk-oriented model:

Read N items → Process each → Write all N → Save checkpoint → Repeat

Each chunk is wrapped in a single database transaction. If anything fails mid-job, the next run automatically resumes from the last committed checkpoint — no data is reprocessed, no data is lost.


Core Concepts

| Concept | Description | |---|---| | Job | A named batch process made up of one or more Steps | | Step | One unit of work — owns a Reader, Processor, and Writer | | Chunk | A fixed-size batch of items processed together in one transaction | | Checkpoint | The reader's position saved after every successful chunk commit | | JobExecution | A single run of a Job, persisted to the database with its status | | StepExecution | A single run of a Step within a JobExecution |


Installation

npm install chronobatch

Dependencies

| Package | Purpose | |---|---| | pg | PostgreSQL client | | typescript | Language | | ts-node | Run TypeScript directly |


Configuration

ChronoBatch reads its database configuration automatically from a chronobatch.config.js file in your project root. You do not need to call initDb() or initSchema() manually.

chronobatch.config.js

Create this file in your project root:

/** @type {import('chronobatch').BatchConfig} */
module.exports = {
  db: {
    host:     process.env.DB_HOST     || 'localhost',
    port:     Number(process.env.DB_PORT || 5432),
    database: process.env.DB_NAME     || 'mydb',
    user:     process.env.DB_USER     || 'postgres',
    password: process.env.DB_PASSWORD || 'secret',
  },
  schema: process.env.DB_SCHEMA || 'public',  // optional, defaults to 'public'
};

Schema

The schema field controls which PostgreSQL schema holds the ChronoBatch tables (batch_job, batch_job_execution, etc.). Defaults to public if omitted.

If the schema does not exist, it is created automatically on first use.

Config resolution order

The framework resolves config in this order:

  1. chronobatch.config.js in the project root
  2. chronobatch.config.ts in the project root (requires ts-node)
  3. Environment variables: DB_HOST DB_PORT DB_NAME DB_USER DB_PASSWORD DB_SCHEMA

Quick Start

import { JobRepository, JobLauncher, closeDb } from 'chronobatch';

// Config is read automatically from chronobatch.config.js.
// The DB pool and schema tables are created on first use — nothing to bootstrap manually.

const repo = new JobRepository();
const launcher = new JobLauncher(repo);

await launcher.run({
  name: 'my-job',
  steps: [{
    name:      'my-step',
    chunkSize: 100,
    reader,
    processor,
    writer,
  }],
});

await closeDb();

Building Blocks

ItemReader

Reads one item at a time. The engine calls read() in a loop until it returns null (exhausted) or fills a chunk.

import { ItemReader } from 'chronobatch';

class MyReader implements ItemReader<MyItem> {
  private lastId = 0;

  async open(checkpoint: Record<string, any> | null): Promise<void> {
    // checkpoint is null on first run; on resume it contains whatever
    // getCheckpoint() returned after the last successful commit.
    if (checkpoint?.lastId) {
      this.lastId = checkpoint.lastId as number;
    }
  }

  async read(): Promise<MyItem | null> {
    // Return the next item, or null when done.
    const row = await db.query(
      'SELECT * FROM source WHERE id > $1 ORDER BY id LIMIT 1',
      [this.lastId],
    );
    if (!row) return null;
    this.lastId = row.id;
    return row;
  }

  getCheckpoint(): Record<string, any> {
    // Return the current position as a plain object.
    // Saved to the database after every successful chunk commit.
    return { lastId: this.lastId };
  }

  async close(): Promise<void> {}
}

Checkpoint contract:

  • open(null) — first run, start from the beginning
  • open({ lastId: 500 }) — resume, skip items already processed
  • getCheckpoint() — called after every chunk write; the returned object is saved transactionally

ItemProcessor

Transforms one input item into one output item. Optional — omit it if no transformation is needed.

import { ItemProcessor } from 'chronobatch';

class MyProcessor implements ItemProcessor<SourceItem, TargetItem> {
  async process(item: SourceItem): Promise<TargetItem> {
    return {
      id:   item.id,
      name: item.name.toUpperCase(),
    };
  }
}

If retry is configured on the step, a failing process() call is automatically retried before considering it a skip or failure.


ItemWriter

Writes a full chunk of processed items at once.

import { ItemWriter } from 'chronobatch';
import { PoolClient } from 'pg';

class MyWriter implements ItemWriter<TargetItem> {
  async write(items: TargetItem[], txClient?: PoolClient): Promise<void> {
    for (const item of items) {
      // Use txClient so your INSERTs are part of the framework's
      // per-chunk transaction. If the write fails, everything rolls back.
      await txClient!.query(
        'INSERT INTO target (id, name) VALUES ($1, $2)',
        [item.id, item.name],
      );
    }
  }
}

Important: Always use the txClient parameter for your database writes. This makes your inserts atomic with the checkpoint save — either both commit or both roll back. If you write to an external system (HTTP, S3, etc.) that cannot participate in a Postgres transaction, ensure your writes are idempotent so replays are safe.


Defining a Job

import { JobConfig } from 'chronobatch';

const job: JobConfig = {
  name: 'user-migration',          // unique name, used to track executions
  parameters: { env: 'prod' },     // optional; stored with the execution record

  steps: [
    {
      name:      'migrate-users',
      chunkSize: 100,               // items per chunk / transaction
      reader:    new UserReader(),
      processor: new UserTransformer(),
      writer:    new UserWriter(),

      retry: {
        maxAttempts: 3,             // retry processor + writer up to 3 times
        backoffMs:   500,           // 500ms, 1000ms, 1500ms ... (linear backoff)
      },

      skip: {
        skipLimit: 10,              // skip up to 10 bad items before failing
      },

      allowStartIfComplete: false,  // set true to re-run even if already done
    },
  ],
};

Multi-step jobs — steps run sequentially; if one fails, subsequent steps do not run:

const job: JobConfig = {
  name: 'etl-pipeline',
  steps: [
    { name: 'extract',   chunkSize: 500, reader: extractReader,  writer: stagingWriter },
    { name: 'transform', chunkSize: 200, reader: stagingReader,  processor: transformer, writer: targetWriter },
    { name: 'cleanup',   chunkSize: 1000, reader: stagingReader, writer: deleteWriter },
  ],
};

Running a Job

const launcher = new JobLauncher(repo);

// Run — if the last execution failed, resumes automatically from checkpoint
await launcher.run(job);

// Pass runtime parameters
await launcher.run(job, { parameters: { date: '2024-01-01' } });

// Force a clean run even if the last execution failed
await launcher.run(job, { onFailedExecution: 'restart' });

Automatic Resume

By default, calling run() on a job whose last execution is FAILED will:

  1. Load the previous execution's step records
  2. Skip steps that already COMPLETED
  3. Resume the failed step from its last saved checkpoint
  4. Continue with any remaining steps
First run:
  [migrate-users] Chunk #1 committed — read: 100, written: 100
  [migrate-users] Chunk #2 committed — read: 200, written: 200
  [migrate-users] Chunk #3 → ERROR  → ROLLBACK
  [Job: user-migration] Failed

Second run (just call run() again):
  [Job: user-migration] Previous execution #1 failed — resuming automatically
  [migrate-users] Resuming from checkpoint: {"lastId": 200}
  [migrate-users] Chunk #3 committed — read: 300, written: 300
  ...
  [Job: user-migration] Resumed and completed

Fault Tolerance

Retry

Retry is applied to both the processor and the writer.

retry: {
  maxAttempts: 3,       // total attempts including the first
  backoffMs:   1000,    // delay between attempts; multiplied by attempt number
                        // attempt 1 fails → wait 1000ms
                        // attempt 2 fails → wait 2000ms
                        // attempt 3 fails → step fails

  retryOn: (err) => err.message.includes('deadlock'), // optional: only retry specific errors
}

Skip

Skip drops individual items that fail processing instead of failing the whole step.

skip: {
  skipLimit: 5,         // allow up to 5 items to be skipped
                        // if the 6th item also fails, the step fails

  skipOn: (err) => err instanceof ValidationError, // optional: only skip specific errors
}

When an item is skipped it is counted in step_execution.skip_count and a warning is logged. The chunk continues processing the remaining items.

Note: Retry and skip can be combined. Retry is attempted first; only if all retries are exhausted is the item considered for skipping.


Checkpoint & Resume

The checkpoint is the reader's saved position. It is stored in batch_execution_context transactionally with the chunk write — they either both commit or both roll back.

Checkpoint flow:

Read chunk (ids 201–300)
  ↓
Process items
  ↓
BEGIN transaction
  writer.write(items, txClient)          ← your INSERTs
  saveCheckpoint({ lastId: 300 })        ← position saved
  UPDATE batch_step_execution SET ...    ← counters updated
COMMIT

If the process crashes between commits, batch_execution_context still holds the last safe position ({ lastId: 200 }) and the next run resumes from id 201.

Implementing a resumable reader:

async open(checkpoint: Record<string, any> | null): Promise<void> {
  this.lastId = (checkpoint?.lastId as number) ?? 0;
}

async read(): Promise<Row | null> {
  const res = await client.query(
    'SELECT * FROM source WHERE id > $1 ORDER BY id LIMIT 1',
    [this.lastId],
  );
  if (res.rows.length === 0) return null;
  this.lastId = res.rows[0].id;
  return res.rows[0];
}

getCheckpoint(): Record<string, any> {
  return { lastId: this.lastId };
}

Per-Chunk Transactions

Every chunk executes inside a single Postgres transaction:

BEGIN
  ← writer INSERTs (via txClient)
  ← checkpoint upsert
  ← step counter UPDATE
COMMIT  /  ROLLBACK on any error

This means:

  • A partial write never leaves the database in a half-written state
  • The checkpoint only advances when the write fully succeeds
  • On failure, the framework retries (if configured) or marks the step FAILED — the next run() will resume from the last committed checkpoint

CLI

ChronoBatch ships with a built-in CLI to monitor and inspect job executions.

Setup

The CLI uses the same chronobatch.config.js your jobs use — no extra setup required.

/** @type {import('chronobatch').BatchConfig} */
module.exports = {
  db: {
    host:     process.env.DB_HOST     || 'localhost',
    port:     Number(process.env.DB_PORT || 5432),
    database: process.env.DB_NAME     || 'mydb',
    user:     process.env.DB_USER     || 'postgres',
    password: process.env.DB_PASSWORD || 'secret',
  },
  schema: process.env.DB_SCHEMA || 'public',
};

Or pass config via environment variables directly — DB_HOST DB_PORT DB_NAME DB_USER DB_PASSWORD DB_SCHEMA.

Commands

| Command | Description | |---|---| | chronobatch jobs | List all jobs and their last execution status | | chronobatch executions <job-name> | Show execution history for a job | | chronobatch status <execution-id> | Step-level breakdown for one execution | | chronobatch monitor | Live auto-refreshing dashboard (press Q to quit) |

Examples

# List all jobs
chronobatch jobs

# View execution history for a specific job
chronobatch executions user-migration

# Inspect a specific execution by ID
chronobatch status 3

# Launch the live monitor dashboard
chronobatch monitor

Development (without building)

npm run cli -- jobs
npm run cli -- executions user-migration
npm run cli -- status 3
npm run cli -- monitor

API Reference

JobRepository

// Auto mode — reads chronobatch.config.js, initialises pool and schema automatically
const repo = new JobRepository();

// Manual mode — provide an explicit pool and schema
const repo = new JobRepository(pool, 'my_schema');

The framework tables are created automatically on first use (idempotent — safe to call on an existing database).


JobLauncher

const launcher = new JobLauncher(repo);

await launcher.run(job: JobConfig, options?: RunOptions): Promise<JobExecutionRecord>

RunOptions

| Option | Type | Default | Description | |---|---|---|---| | parameters | Record<string, any> | — | Runtime parameters merged with job.parameters | | onFailedExecution | 'resume' \| 'restart' | 'resume' | What to do when the last execution failed |


closeDb(): Promise<void>

Drains and closes the connection pool. Call once at the end of your process.

await closeDb();

loadConfig(): ResolvedConfig

Returns the resolved config ({ db, schema }) without initialising anything. Useful for inspecting what config will be used.

import { loadConfig } from 'chronobatch';

const { db, schema } = loadConfig();
console.log('Using schema:', schema);

initDb(config: PoolConfig): void

Manually initialise the connection pool. Only needed if you want full control over pool creation without a chronobatch.config.js.

import { initDb, closeDb } from 'chronobatch';

initDb({ host: 'localhost', database: 'mydb', user: 'postgres', password: 'secret' });
// ... run jobs ...
await closeDb();

Note: If you call initDb() manually, do not also rely on auto-init via new JobRepository() in the same process — initDb() will throw if the pool is already set.


StepConfig<I, O>

| Field | Type | Required | Description | |---|---|---|---| | name | string | Yes | Unique name within the job | | chunkSize | number | Yes | Items read and written per transaction | | reader | ItemReader<I> | Yes | Data source | | processor | ItemProcessor<I, O> | No | Transformation (identity if omitted) | | writer | ItemWriter<O> | Yes | Data sink | | retry | RetryConfig | No | Retry policy for processor and writer | | skip | SkipConfig | No | Skip policy for failed items | | allowStartIfComplete | boolean | No | Re-run step even if it completed (default false) |


RetryConfig

| Field | Type | Default | Description | |---|---|---|---| | maxAttempts | number | — | Total attempts including first try | | backoffMs | number | 1000 | Base delay; multiplied by attempt number | | retryOn | (err: Error) => boolean | — | Predicate to filter retryable errors |


SkipConfig

| Field | Type | Default | Description | |---|---|---|---| | skipLimit | number | — | Max items that can be skipped before the step fails | | skipOn | (err: Error) => boolean | — | Predicate to filter skippable errors |


Database Tables

| Table | Purpose | |---|---| | batch_job | One row per unique job name | | batch_job_execution | One row per job run (status, timestamps, parameters) | | batch_step_execution | One row per step run (read/write/skip counts) | | batch_execution_context | Checkpoint data per step execution |

All tables are created under the configured schema (default: public).