chronobatch
v1.1.1
Published
A Spring Batch-inspired batch processing framework for Node.js
Downloads
362
Maintainers
Readme
ChronoBatch
A reliable batch processing framework for Node.js — chunk-oriented execution, automatic resume on failure, and per-chunk transactions. Built with TypeScript and PostgreSQL.
Table of Contents
- Overview
- Core Concepts
- Installation
- Configuration
- Quick Start
- Building Blocks
- Defining a Job
- Running a Job
- Fault Tolerance
- Checkpoint & Resume
- Per-Chunk Transactions
- CLI
- API Reference
Overview
chronobatch processes large datasets reliably using a chunk-oriented model:
Read N items → Process each → Write all N → Save checkpoint → RepeatEach chunk is wrapped in a single database transaction. If anything fails mid-job, the next run automatically resumes from the last committed checkpoint — no data is reprocessed, no data is lost.
Core Concepts
| Concept | Description | |---|---| | Job | A named batch process made up of one or more Steps | | Step | One unit of work — owns a Reader, Processor, and Writer | | Chunk | A fixed-size batch of items processed together in one transaction | | Checkpoint | The reader's position saved after every successful chunk commit | | JobExecution | A single run of a Job, persisted to the database with its status | | StepExecution | A single run of a Step within a JobExecution |
Installation
npm install chronobatchDependencies
| Package | Purpose |
|---|---|
| pg | PostgreSQL client |
| typescript | Language |
| ts-node | Run TypeScript directly |
Configuration
ChronoBatch reads its database configuration automatically from a chronobatch.config.js file in your project root. You do not need to call initDb() or initSchema() manually.
chronobatch.config.js
Create this file in your project root:
/** @type {import('chronobatch').BatchConfig} */
module.exports = {
db: {
host: process.env.DB_HOST || 'localhost',
port: Number(process.env.DB_PORT || 5432),
database: process.env.DB_NAME || 'mydb',
user: process.env.DB_USER || 'postgres',
password: process.env.DB_PASSWORD || 'secret',
},
schema: process.env.DB_SCHEMA || 'public', // optional, defaults to 'public'
};Schema
The schema field controls which PostgreSQL schema holds the ChronoBatch tables (batch_job, batch_job_execution, etc.). Defaults to public if omitted.
If the schema does not exist, it is created automatically on first use.
Config resolution order
The framework resolves config in this order:
chronobatch.config.jsin the project rootchronobatch.config.tsin the project root (requirests-node)- Environment variables:
DB_HOSTDB_PORTDB_NAMEDB_USERDB_PASSWORDDB_SCHEMA
Quick Start
import { JobRepository, JobLauncher, closeDb } from 'chronobatch';
// Config is read automatically from chronobatch.config.js.
// The DB pool and schema tables are created on first use — nothing to bootstrap manually.
const repo = new JobRepository();
const launcher = new JobLauncher(repo);
await launcher.run({
name: 'my-job',
steps: [{
name: 'my-step',
chunkSize: 100,
reader,
processor,
writer,
}],
});
await closeDb();Building Blocks
ItemReader
Reads one item at a time. The engine calls read() in a loop until it returns null (exhausted) or fills a chunk.
import { ItemReader } from 'chronobatch';
class MyReader implements ItemReader<MyItem> {
private lastId = 0;
async open(checkpoint: Record<string, any> | null): Promise<void> {
// checkpoint is null on first run; on resume it contains whatever
// getCheckpoint() returned after the last successful commit.
if (checkpoint?.lastId) {
this.lastId = checkpoint.lastId as number;
}
}
async read(): Promise<MyItem | null> {
// Return the next item, or null when done.
const row = await db.query(
'SELECT * FROM source WHERE id > $1 ORDER BY id LIMIT 1',
[this.lastId],
);
if (!row) return null;
this.lastId = row.id;
return row;
}
getCheckpoint(): Record<string, any> {
// Return the current position as a plain object.
// Saved to the database after every successful chunk commit.
return { lastId: this.lastId };
}
async close(): Promise<void> {}
}Checkpoint contract:
open(null)— first run, start from the beginningopen({ lastId: 500 })— resume, skip items already processedgetCheckpoint()— called after every chunk write; the returned object is saved transactionally
ItemProcessor
Transforms one input item into one output item. Optional — omit it if no transformation is needed.
import { ItemProcessor } from 'chronobatch';
class MyProcessor implements ItemProcessor<SourceItem, TargetItem> {
async process(item: SourceItem): Promise<TargetItem> {
return {
id: item.id,
name: item.name.toUpperCase(),
};
}
}If retry is configured on the step, a failing process() call is automatically retried before considering it a skip or failure.
ItemWriter
Writes a full chunk of processed items at once.
import { ItemWriter } from 'chronobatch';
import { PoolClient } from 'pg';
class MyWriter implements ItemWriter<TargetItem> {
async write(items: TargetItem[], txClient?: PoolClient): Promise<void> {
for (const item of items) {
// Use txClient so your INSERTs are part of the framework's
// per-chunk transaction. If the write fails, everything rolls back.
await txClient!.query(
'INSERT INTO target (id, name) VALUES ($1, $2)',
[item.id, item.name],
);
}
}
}Important: Always use the
txClientparameter for your database writes. This makes your inserts atomic with the checkpoint save — either both commit or both roll back. If you write to an external system (HTTP, S3, etc.) that cannot participate in a Postgres transaction, ensure your writes are idempotent so replays are safe.
Defining a Job
import { JobConfig } from 'chronobatch';
const job: JobConfig = {
name: 'user-migration', // unique name, used to track executions
parameters: { env: 'prod' }, // optional; stored with the execution record
steps: [
{
name: 'migrate-users',
chunkSize: 100, // items per chunk / transaction
reader: new UserReader(),
processor: new UserTransformer(),
writer: new UserWriter(),
retry: {
maxAttempts: 3, // retry processor + writer up to 3 times
backoffMs: 500, // 500ms, 1000ms, 1500ms ... (linear backoff)
},
skip: {
skipLimit: 10, // skip up to 10 bad items before failing
},
allowStartIfComplete: false, // set true to re-run even if already done
},
],
};Multi-step jobs — steps run sequentially; if one fails, subsequent steps do not run:
const job: JobConfig = {
name: 'etl-pipeline',
steps: [
{ name: 'extract', chunkSize: 500, reader: extractReader, writer: stagingWriter },
{ name: 'transform', chunkSize: 200, reader: stagingReader, processor: transformer, writer: targetWriter },
{ name: 'cleanup', chunkSize: 1000, reader: stagingReader, writer: deleteWriter },
],
};Running a Job
const launcher = new JobLauncher(repo);
// Run — if the last execution failed, resumes automatically from checkpoint
await launcher.run(job);
// Pass runtime parameters
await launcher.run(job, { parameters: { date: '2024-01-01' } });
// Force a clean run even if the last execution failed
await launcher.run(job, { onFailedExecution: 'restart' });Automatic Resume
By default, calling run() on a job whose last execution is FAILED will:
- Load the previous execution's step records
- Skip steps that already
COMPLETED - Resume the failed step from its last saved checkpoint
- Continue with any remaining steps
First run:
[migrate-users] Chunk #1 committed — read: 100, written: 100
[migrate-users] Chunk #2 committed — read: 200, written: 200
[migrate-users] Chunk #3 → ERROR → ROLLBACK
[Job: user-migration] Failed
Second run (just call run() again):
[Job: user-migration] Previous execution #1 failed — resuming automatically
[migrate-users] Resuming from checkpoint: {"lastId": 200}
[migrate-users] Chunk #3 committed — read: 300, written: 300
...
[Job: user-migration] Resumed and completedFault Tolerance
Retry
Retry is applied to both the processor and the writer.
retry: {
maxAttempts: 3, // total attempts including the first
backoffMs: 1000, // delay between attempts; multiplied by attempt number
// attempt 1 fails → wait 1000ms
// attempt 2 fails → wait 2000ms
// attempt 3 fails → step fails
retryOn: (err) => err.message.includes('deadlock'), // optional: only retry specific errors
}Skip
Skip drops individual items that fail processing instead of failing the whole step.
skip: {
skipLimit: 5, // allow up to 5 items to be skipped
// if the 6th item also fails, the step fails
skipOn: (err) => err instanceof ValidationError, // optional: only skip specific errors
}When an item is skipped it is counted in step_execution.skip_count and a warning is logged. The chunk continues processing the remaining items.
Note: Retry and skip can be combined. Retry is attempted first; only if all retries are exhausted is the item considered for skipping.
Checkpoint & Resume
The checkpoint is the reader's saved position. It is stored in batch_execution_context transactionally with the chunk write — they either both commit or both roll back.
Checkpoint flow:
Read chunk (ids 201–300)
↓
Process items
↓
BEGIN transaction
writer.write(items, txClient) ← your INSERTs
saveCheckpoint({ lastId: 300 }) ← position saved
UPDATE batch_step_execution SET ... ← counters updated
COMMITIf the process crashes between commits, batch_execution_context still holds the last safe position ({ lastId: 200 }) and the next run resumes from id 201.
Implementing a resumable reader:
async open(checkpoint: Record<string, any> | null): Promise<void> {
this.lastId = (checkpoint?.lastId as number) ?? 0;
}
async read(): Promise<Row | null> {
const res = await client.query(
'SELECT * FROM source WHERE id > $1 ORDER BY id LIMIT 1',
[this.lastId],
);
if (res.rows.length === 0) return null;
this.lastId = res.rows[0].id;
return res.rows[0];
}
getCheckpoint(): Record<string, any> {
return { lastId: this.lastId };
}Per-Chunk Transactions
Every chunk executes inside a single Postgres transaction:
BEGIN
← writer INSERTs (via txClient)
← checkpoint upsert
← step counter UPDATE
COMMIT / ROLLBACK on any errorThis means:
- A partial write never leaves the database in a half-written state
- The checkpoint only advances when the write fully succeeds
- On failure, the framework retries (if configured) or marks the step FAILED — the next
run()will resume from the last committed checkpoint
CLI
ChronoBatch ships with a built-in CLI to monitor and inspect job executions.
Setup
The CLI uses the same chronobatch.config.js your jobs use — no extra setup required.
/** @type {import('chronobatch').BatchConfig} */
module.exports = {
db: {
host: process.env.DB_HOST || 'localhost',
port: Number(process.env.DB_PORT || 5432),
database: process.env.DB_NAME || 'mydb',
user: process.env.DB_USER || 'postgres',
password: process.env.DB_PASSWORD || 'secret',
},
schema: process.env.DB_SCHEMA || 'public',
};Or pass config via environment variables directly — DB_HOST DB_PORT DB_NAME DB_USER DB_PASSWORD DB_SCHEMA.
Commands
| Command | Description |
|---|---|
| chronobatch jobs | List all jobs and their last execution status |
| chronobatch executions <job-name> | Show execution history for a job |
| chronobatch status <execution-id> | Step-level breakdown for one execution |
| chronobatch monitor | Live auto-refreshing dashboard (press Q to quit) |
Examples
# List all jobs
chronobatch jobs
# View execution history for a specific job
chronobatch executions user-migration
# Inspect a specific execution by ID
chronobatch status 3
# Launch the live monitor dashboard
chronobatch monitorDevelopment (without building)
npm run cli -- jobs
npm run cli -- executions user-migration
npm run cli -- status 3
npm run cli -- monitorAPI Reference
JobRepository
// Auto mode — reads chronobatch.config.js, initialises pool and schema automatically
const repo = new JobRepository();
// Manual mode — provide an explicit pool and schema
const repo = new JobRepository(pool, 'my_schema');The framework tables are created automatically on first use (idempotent — safe to call on an existing database).
JobLauncher
const launcher = new JobLauncher(repo);
await launcher.run(job: JobConfig, options?: RunOptions): Promise<JobExecutionRecord>RunOptions
| Option | Type | Default | Description |
|---|---|---|---|
| parameters | Record<string, any> | — | Runtime parameters merged with job.parameters |
| onFailedExecution | 'resume' \| 'restart' | 'resume' | What to do when the last execution failed |
closeDb(): Promise<void>
Drains and closes the connection pool. Call once at the end of your process.
await closeDb();loadConfig(): ResolvedConfig
Returns the resolved config ({ db, schema }) without initialising anything. Useful for inspecting what config will be used.
import { loadConfig } from 'chronobatch';
const { db, schema } = loadConfig();
console.log('Using schema:', schema);initDb(config: PoolConfig): void
Manually initialise the connection pool. Only needed if you want full control over pool creation without a chronobatch.config.js.
import { initDb, closeDb } from 'chronobatch';
initDb({ host: 'localhost', database: 'mydb', user: 'postgres', password: 'secret' });
// ... run jobs ...
await closeDb();Note: If you call
initDb()manually, do not also rely on auto-init vianew JobRepository()in the same process —initDb()will throw if the pool is already set.
StepConfig<I, O>
| Field | Type | Required | Description |
|---|---|---|---|
| name | string | Yes | Unique name within the job |
| chunkSize | number | Yes | Items read and written per transaction |
| reader | ItemReader<I> | Yes | Data source |
| processor | ItemProcessor<I, O> | No | Transformation (identity if omitted) |
| writer | ItemWriter<O> | Yes | Data sink |
| retry | RetryConfig | No | Retry policy for processor and writer |
| skip | SkipConfig | No | Skip policy for failed items |
| allowStartIfComplete | boolean | No | Re-run step even if it completed (default false) |
RetryConfig
| Field | Type | Default | Description |
|---|---|---|---|
| maxAttempts | number | — | Total attempts including first try |
| backoffMs | number | 1000 | Base delay; multiplied by attempt number |
| retryOn | (err: Error) => boolean | — | Predicate to filter retryable errors |
SkipConfig
| Field | Type | Default | Description |
|---|---|---|---|
| skipLimit | number | — | Max items that can be skipped before the step fails |
| skipOn | (err: Error) => boolean | — | Predicate to filter skippable errors |
Database Tables
| Table | Purpose |
|---|---|
| batch_job | One row per unique job name |
| batch_job_execution | One row per job run (status, timestamps, parameters) |
| batch_step_execution | One row per step run (read/write/skip counts) |
| batch_execution_context | Checkpoint data per step execution |
All tables are created under the configured schema (default: public).
