@ananth_1725/cluster-hub
v2.0.0
Published
Node.js multi-core processing library for CPU-bound tasks, batch jobs, ETL pipelines, and high-performance parallel data processing.
Downloads
39
Maintainers
Readme
Cluster Hub
Node.js multi-core processing library for CPU-bound tasks, batch jobs, ETL pipelines, and high-performance parallel data processing.
Table of Contents
- Installation
- Quick Start
- Features
- Why Cluster Hub
- Architecture
- Flow Diagram
- Performance Benchmarks
- Usage
- Worker Pool
- Real-World Examples
- API Reference
- Error Modes
- When to Use
- When Not to Use
- Comparison
- Why Not worker_threads
- Production Use Cases
- Requirements
- TypeScript
- Scripts
- Considerations
- Documentation
- Contributing
- License
Installation
npm install @ananth_1725/cluster-hubQuick Start
import cluster from "@ananth_1725/cluster-hub"
import path from "path"
import { fileURLToPath } from "url"
const __dirname = path.dirname(fileURLToPath(import.meta.url))
const jobs = [1, 2, 3, 4, 5, 6, 7, 8]
const workerPath = path.join(__dirname, "worker.js")
const results = await cluster(jobs, workerPath, { exitOnComplete: false })
console.log(results)Worker file (worker.js):
export const childCoreProcessFunc = ({ data, cpuIndex }) => {
return data.map(item => item * 2)
}Features
- ⚡ Orchestration — Error handling, chunk balancing, graceful worker termination, ordered result aggregation
- 🔀 Smart chunk splitting across CPU cores
- 🧵 Process-based isolation (safe parallelism)
- 📦 ESM native, zero dependencies
- 🛡 Error propagation and worker cleanup on failure (no zombie processes)
- ⚙️ Configurable
errorMode: fail-fast (atomic) or continue (partial results) - 📊 Predictable chunk-order result aggregation
- 🧠 Simple worker contract
- ⏱ Optional timeout and observability hooks (onWorkerStart, onProgress, etc.)
- 🔄 Worker pool mode — Reuse workers across runs; eliminate startup overhead
Why Cluster Hub
Cluster Hub is a lightweight Node.js parallel processing library for:
- Multi-core processing — Use all CPU cores instead of one
- CPU-bound task distribution — Hashing, crypto, image processing, parsing
- Batch job execution — Nightly reports, migrations, exports
- ETL pipelines — Transform large datasets in parallel
- Worker process orchestration — Master–worker pattern with IPC
- High-performance data processing — Split arrays, process chunks, aggregate results
- Parallel array processing — Same function on many items, distributed across cores
Architecture
Cluster Hub uses a master–worker pattern. The main process orchestrates; worker processes do the work.
┌─────────────────────────────────────────────────────────────────────────────┐
│ MAIN PROCESS (Orchestrator) │
│ │
│ 1. Validate input │
│ 2. Split data into chunks (splitToChunks) │
│ 3. Fork N worker processes (N = min(cpuCores, chunkCount)) │
│ 4. Send task to each worker via IPC │
│ 5. Collect results / propagate errors │
│ 6. Return aggregated results (or exit) │
└─────────────────────────────────────────────────────────────────────────────┘
│ │ │ │
│ fork() │ fork() │ fork() │ fork()
▼ ▼ ▼ ▼
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ WORKER 0 │ │ WORKER 1 │ │ WORKER 2 │ │ WORKER N │
│ │ │ │ │ │ │ │
│ • Receive chunk │ │ • Receive chunk │ │ • Receive chunk │ │ • Receive chunk │
│ • Import user │ │ • Import user │ │ • Import user │ │ • Import user │
│ module │ │ module │ │ module │ │ module │
│ • Run │ │ • Run │ │ • Run │ │ • Run │
│ childCore │ │ childCore │ │ childCore │ │ childCore │
│ ProcessFunc │ │ ProcessFunc │ │ ProcessFunc │ │ ProcessFunc │
│ • Send result │ │ • Send result │ │ • Send result │ │ • Send result │
│ or error │ │ or error │ │ or error │ │ or error │
└─────────────────┘ └─────────────────┘ └─────────────────┘ └─────────────────┘
│ │ │ │
└────────────────────┴────────────────────┴────────────────────┘
│
IPC (process.send / message)
│
▼
┌─────────────────────────────────────────────────────────────────────────────┐
│ Results aggregated in chunk order → [result0, result1, result2, ...] │
└─────────────────────────────────────────────────────────────────────────────┘Component Overview
| Component | Responsibility |
| ---------------------- | ------------------------------------------------------------------------------------------------ |
| Main Process | Validates input, chunks data, spawns workers, collects results, handles errors |
| Worker Process | Receives chunk, dynamically imports your module, runs childCoreProcessFunc, sends result/error |
| IPC Channel | Bidirectional messaging between main and workers (worker.send() / process.send()) |
| User Worker Module | Your business logic—must export childCoreProcessFunc |
Flow Diagram
flowchart TD
subgraph Main["Main Process"]
A["cluster(data, workerPath, options)"] --> B{"Validate input"}
B -->|Invalid| C["Throw TypeError"]
B -->|Valid| D["Split array into N chunks"]
D --> E["workerCount = min(maxWorkers, chunks.length)"]
E --> F["Fork worker for each chunk"]
F --> G["Send task via IPC (chunk, index, workerPath)"]
G --> H["Wait for all workers"]
H --> I{"All succeeded"}
I -->|Yes| J["Build results array in chunk order"]
I -->|No| K["Reject with first error"]
J --> L{"exitOnComplete"}
L -->|true| M["process.exit(0)"]
L -->|false| N["Return results"]
end
subgraph Worker["Worker Process"]
W1["Receive message"] --> W2{"type == task"}
W2 -->|No| W3["Ignore"]
W2 -->|Yes| W4["Dynamic import worker module"]
W4 --> W5{"childCoreProcessFunc exists"}
W5 -->|No| W6["Send error via IPC"]
W5 -->|Yes| W7["Await childCoreProcessFunc"]
W7 --> W8{"Success"}
W8 -->|Yes| W9["Send result via IPC"]
W8 -->|No| W6
W9 --> W10["Disconnect"]
W6 --> W10
end
G -.->|IPC| W1
W9 -.->|IPC| H
W6 -.->|IPC| KMessage Flow (Sequence)
sequenceDiagram
participant M as Main Process
participant W0 as Worker 0
participant W1 as Worker 1
participant WN as Worker N
M->>M: splitToChunks(data)
M->>W0: fork() + send({ type: "task", inputData: chunk0, index: 0 })
M->>W1: fork() + send({ type: "task", inputData: chunk1, index: 1 })
M->>WN: fork() + send({ type: "task", inputData: chunkN, index: N })
par Parallel execution
W0->>W0: import(workerPath)
W0->>W0: childCoreProcessFunc(chunk0)
W0->>M: send({ type: "result", result })
and
W1->>W1: import(workerPath)
W1->>W1: childCoreProcessFunc(chunk1)
W1->>M: send({ type: "result", result })
and
WN->>WN: import(workerPath)
WN->>WN: childCoreProcessFunc(chunkN)
WN->>M: send({ type: "result", result })
end
M->>M: Aggregate results by index
M->>M: Return [result0, result1, ..., resultN]Performance Benchmarks
Test machine: 8-core CPU, Node.js 18+
Task: 1,000 items × 2,000 chained SHA256 hashes (heavy CPU work per item)
| Mode | Time Taken | | ------------------- | ---------- | | Sequential | 4.8 sec | | Promise.all() | 4.7 sec | | Cluster Hub (8 CPU) | 0.82 sec |
Result: ~5.8x speed improvement using all CPU cores.
Note: Cluster Hub has ~1–2 sec process startup overhead. It shines when per-item work is heavy (e.g. 5–50ms+). With trivial work (single fast hash), overhead dominates and sequential is faster.
Run the Included Benchmark
npm run benchmarkCustom Benchmark Code
import cluster from "@ananth_1725/cluster-hub"
import { performance } from "perf_hooks"
const data = Array.from({ length: 10000 }, (_, i) => ({ id: i }))
const start = performance.now()
await cluster(data, "./crypto-worker.js", { exitOnComplete: false })
const end = performance.now()
console.log(`Cluster Hub: ${((end - start) / 1000).toFixed(2)}s`)Usage
Standalone Script (exits when done)
Use when this is your main entry point (e.g. cron job, CLI):
import cluster from "@ananth_1725/cluster-hub"
const data = [
/* ... */
]
const workerPath = "/absolute/path/to/worker.js"
await cluster(data, workerPath)
// Process exits with code 0 on success, 1 on errorAs a Library (returns results)
Use when embedding in a larger app (API server, script with more logic):
import cluster from "@ananth_1725/cluster-hub"
const data = [
/* ... */
]
const workerPath = "/absolute/path/to/worker.js"
const results = await cluster(data, workerPath, { exitOnComplete: false })
// results: [chunk0Result, chunk1Result, ...]Worker Module Contract
Your worker file must export childCoreProcessFunc (or default.childCoreProcessFunc):
export const childCoreProcessFunc = ({ data, cpuIndex }) => {
// data: your chunk of the input array
// cpuIndex: 0-based worker index (0, 1, 2, ...)
return data.map(/* process each item */)
}| Parameter | Type | Description |
| ---------- | -------- | ------------------------------------------------ |
| data | T[] | Chunk of the input array assigned to this worker |
| cpuIndex | number | Zero-based index of this worker |
Return: Optional. Returned values are aggregated and passed back when exitOnComplete: false. Sync and async functions are supported.
Worker path: Must be a local file path (absolute or relative to cwd). URLs are not allowed.
Worker Pool
For repeated workloads (API servers, multiple batch runs), use a worker pool to reuse workers and eliminate startup overhead:
import { createClusterPool } from "@ananth_1725/cluster-hub"
const pool = createClusterPool("./worker.js", { maxWorkers: 4 })
// First run: workers start (no per-call fork overhead)
const results1 = await pool.run(data1)
// Second run: workers reused — instant
const results2 = await pool.run(data2)
pool.destroy() // cleanup when doneObservability Hooks
await cluster(data, workerPath, {
exitOnComplete: false,
timeout: 30000,
onWorkerStart: i => console.log(`Worker ${i} started`),
onWorkerComplete: i => console.log(`Worker ${i} done`),
onWorkerError: (i, err) => console.error(`Worker ${i} failed:`, err),
onProgress: (completed, total) => console.log(`${completed}/${total} chunks done`)
})Real-World Examples
1. Batch Image Resizing
// main.js
import cluster from "@ananth_1725/cluster-hub"
import path from "path"
import { fileURLToPath } from "url"
import { readdirSync } from "fs"
const __dirname = path.dirname(fileURLToPath(import.meta.url))
const imagePaths = readdirSync("./uploads").map(f => path.join("./uploads", f))
const results = await cluster(imagePaths, path.join(__dirname, "resize-worker.js"), {
exitOnComplete: false,
maxWorkers: 4
})// resize-worker.js
import sharp from "sharp"
import path from "path"
export const childCoreProcessFunc = async ({ data }) => {
const results = []
for (const imagePath of data) {
const outputPath = path.join("output", path.basename(imagePath))
await sharp(imagePath).resize(800).toFile(outputPath)
results.push(outputPath)
}
return results
}2. Parallel API / Database Calls
// main.js
const userIds = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
const results = await cluster(userIds, "./fetch-user-worker.js", {
exitOnComplete: false,
maxWorkers: 4
})
const flatResults = results.flat()// fetch-user-worker.js
export const childCoreProcessFunc = async ({ data }) => {
const users = await Promise.all(
data.map(id => fetch(`https://api.example.com/users/${id}`).then(r => r.json()))
)
return users
}3. Data ETL / Report Generation
// main.js
const rawRecords = await loadFromDatabase() // e.g. 10,000 rows
const processed = await cluster(rawRecords, "./transform-worker.js", {
exitOnComplete: false
})
const report = generateReport(processed.flat())
await saveReport(report)// transform-worker.js
export const childCoreProcessFunc = ({ data }) => {
return data.map(record => ({
id: record.id,
normalized: normalize(record),
computed: computeMetrics(record)
}))
}4. CPU-Intensive Calculations
// crypto-worker.js
import { createHash } from "crypto"
export const childCoreProcessFunc = ({ data }) => {
return data.map(item => {
const hash = createHash("sha256")
hash.update(JSON.stringify(item))
return hash.digest("hex")
})
}5. File Processing Pipeline
// main.js
import { readdirSync } from "fs"
import path from "path"
const files = readdirSync("./data").map(f => path.join("./data", f))
const parsed = await cluster(files, "./parse-worker.js", { exitOnComplete: false })
const allRecords = parsed.flat()// parse-worker.js
import { readFileSync } from "fs"
export const childCoreProcessFunc = ({ data }) => {
return data.map(filePath => {
const content = readFileSync(filePath, "utf-8")
return JSON.parse(content)
})
}API Reference
cluster(inputData, workerLocation, options?)
| Parameter | Type | Required | Description |
| ---------------- | ---------------- | -------- | ---------------------------------------------------- |
| inputData | T[] | Yes | Array of items to process. Split into chunks. |
| workerLocation | string | Yes | Path to worker module (absolute or relative to cwd). |
| options | ClusterOptions | No | Configuration. |
ClusterOptions
| Option | Type | Default | Description |
| ------------------ | -------------------------------------------- | --------------- | ---------------------------------------------- |
| exitOnComplete | boolean | false | If true, calls process.exit(0) on success. |
| maxWorkers | number | cpus().length | Maximum number of worker processes. |
| timeout | number | — | Max ms per worker; rejects if exceeded. |
| onWorkerStart | (cpuIndex: number) => void | — | Called when a worker starts. |
| onWorkerComplete | (cpuIndex: number) => void | — | Called when a worker completes successfully. |
| onWorkerError | (cpuIndex: number, error) => void | — | Called when a worker fails. |
| onProgress | (completed: number, total: number) => void | — | Called when each worker completes. |
| errorMode | "fail-fast" | "continue" | "fail-fast" | See Error Modes. |
Returns
- fail-fast (default):
Promise<R[]>— Array of results, one per chunk. Rejects on first worker error. - continue:
Promise<{ success, results, errors }>— Resolves with partial results;errorslists failed chunks.
Empty array when inputData is empty. When exitOnComplete: true, the process exits before resolving. Default is false for library safety.
Error Modes
| Mode | On worker failure | Use case |
| ----------- | ------------------------------------ | ---------------------------------- |
| fail-fast | Kill all workers, reject immediately | Atomic workloads (ETL, migrations) |
| continue | Record error, let others finish | Best-effort (images, scraping) |
// fail-fast (default) — reject on first error
const results = await cluster(data, workerPath, { exitOnComplete: false })
// continue — get partial results + errors
const { success, results, errors } = await cluster(data, workerPath, {
exitOnComplete: false,
errorMode: "continue"
})
if (!success) {
console.log(
"Failed chunks:",
errors.map(e => e.chunkIndex)
)
}Throws
TypeError— IfinputDatais not an array,workerLocationis invalid,maxWorkersis not a positive integer, orerrorModeis not"fail-fast"or"continue".Error— If any worker throws, exits with a non-zero code, is killed (e.g. SIGKILL), or times out.
createClusterPool(workerLocation, options?)
| Parameter | Type | Required | Description |
| ---------------- | ------------- | -------- | ------------------------ |
| workerLocation | string | Yes | Path to worker module. |
| options | PoolOptions | No | { maxWorkers: number } |
Returns: { run(data), destroy(), workerCount } — Use pool.run(data) for each batch; call pool.destroy() when done.
When to Use
| Use Cluster Hub when… | Example | | ------------------------------------------------- | --------------------------------------------------- | | Processing large arrays of CPU-bound work | Hashing files, image resizing, JSON parsing, crypto | | Batch jobs (nightly reports, ETL, migrations) | Process 50,000 DB records, generate reports | | Need to use all CPU cores | Single-threaded loop leaves other cores idle | | Each item is independent | No shared state or ordering requirements |
When Not to Use
| Situation | Better alternative |
| ------------------------------------------------- | -------------------------------- |
| Small arrays (< 100 items) | Promise.all() or a simple loop |
| Mostly I/O, few items | Promise.all() with async/await |
| Need shared memory or threads | Node.js worker_threads |
| Need job queue (retries, scheduling, persistence) | Bull, Agenda, or similar |
Comparison
| Feature | Cluster Hub | worker_threads | Piscina | Bull | | -------------------- | ----------- | -------------- | ------- | ---- | | Multi-core CPU usage | ✅ | ✅ | ✅ | ❌ | | Process isolation | ✅ | ❌ | ❌ | ❌ | | Shared memory | ❌ | ✅ | ✅ | ❌ | | Job persistence | ❌ | ❌ | ❌ | ✅ | | Redis required | ❌ | ❌ | ❌ | ✅ | | Ideal for ETL | ✅ | ⚠️ | ✅ | ⚠️ | | Setup complexity | Very Low | Medium | Medium | High |
Cluster Hub
- Best for CPU-bound batch jobs
- Simple mental model
- Zero dependencies
- Process-level crash isolation
worker_threads
- Lower-level API; requires manual thread lifecycle and message passing
- Good for shared memory scenarios
- Cluster Hub abstracts orchestration into a single function
Piscina
- Thread pool manager for worker_threads
- More performant for heavy reuse
- Higher complexity
Bull
- Persistent Redis-backed job queue
- Great for background tasks
- Not meant for CPU parallelism
Why Not worker_threads?
Cluster Hub uses process-based parallelism instead of threads.
Advantages of process-based:
- Full process isolation
- Safer memory boundaries
- Better crash containment
- Simpler mental model
Choose based on your needs:
- Use
worker_threadsif you need shared memory or lower overhead. - Use Cluster Hub if you need isolated parallel execution with minimal setup.
Production Use Cases
Cluster Hub is suitable for:
- Large-scale batch jobs
- Scheduled cron ETL tasks
- Data migration pipelines
- CPU-heavy cryptographic workloads
- File transformation pipelines
- Report generation from large datasets
- Image/video processing at scale
Requirements
- Node.js 18.0.0+
- ESM —
"type": "module"in yourpackage.json
TypeScript
TypeScript types are included. Import as usual:
import cluster from "@ananth_1725/cluster-hub"
const results = await cluster<number, number>(data, workerPath, { exitOnComplete: false })Scripts
| Command | Description |
| ------------------- | ------------------------- |
| npm test | Run unit tests |
| npm run benchmark | Run performance benchmark |
Considerations
| Scenario | Recommendation |
| ------------------------------------------------------ | ------------------------------------------------------------------------------------------------------------ |
| Running inside a clustered app (PM2, Node cluster) | Use maxWorkers: Math.floor(cpus().length / 2) to avoid oversubscribing cores |
| Containers with CPU limits | Set maxWorkers explicitly; cpus().length may not reflect limits |
| Light per-item work | Process overhead (~1–2 sec) dominates; use when each item takes 5–50ms+ |
| Large input arrays | Data is serialized and copied via IPC; avoid extremely large object graphs per chunk |
| Large items (>1MB each) | Consider passing file paths instead of raw objects; read in worker |
| Worker failure (fail-fast) | When one worker fails, all others are terminated (SIGTERM). Use errorMode: "continue" for partial results. |
Documentation
| Document | Description | | -------------------------------- | --------------------------------------------------------- | | docs/README.md | Documentation index | | docs/AUDIT.md | Security, performance, reliability, and OS-level behavior |
Contributing
Contributions are welcome!
- Fork the repo
- Create a feature branch
- Submit a pull request
Please ensure changes are tested and documented.
License
MIT
