npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

@ananth_1725/cluster-hub

v2.0.0

Published

Node.js multi-core processing library for CPU-bound tasks, batch jobs, ETL pipelines, and high-performance parallel data processing.

Downloads

39

Readme

Cluster Hub

npm version npm downloads Node.js License: MIT

Node.js multi-core processing library for CPU-bound tasks, batch jobs, ETL pipelines, and high-performance parallel data processing.



Table of Contents


Installation

npm install @ananth_1725/cluster-hub

Quick Start

import cluster from "@ananth_1725/cluster-hub"
import path from "path"
import { fileURLToPath } from "url"

const __dirname = path.dirname(fileURLToPath(import.meta.url))

const jobs = [1, 2, 3, 4, 5, 6, 7, 8]
const workerPath = path.join(__dirname, "worker.js")

const results = await cluster(jobs, workerPath, { exitOnComplete: false })
console.log(results)

Worker file (worker.js):

export const childCoreProcessFunc = ({ data, cpuIndex }) => {
	return data.map(item => item * 2)
}

Features

  • Orchestration — Error handling, chunk balancing, graceful worker termination, ordered result aggregation
  • 🔀 Smart chunk splitting across CPU cores
  • 🧵 Process-based isolation (safe parallelism)
  • 📦 ESM native, zero dependencies
  • 🛡 Error propagation and worker cleanup on failure (no zombie processes)
  • ⚙️ Configurable errorMode: fail-fast (atomic) or continue (partial results)
  • 📊 Predictable chunk-order result aggregation
  • 🧠 Simple worker contract
  • ⏱ Optional timeout and observability hooks (onWorkerStart, onProgress, etc.)
  • 🔄 Worker pool mode — Reuse workers across runs; eliminate startup overhead

Why Cluster Hub

Cluster Hub is a lightweight Node.js parallel processing library for:

  • Multi-core processing — Use all CPU cores instead of one
  • CPU-bound task distribution — Hashing, crypto, image processing, parsing
  • Batch job execution — Nightly reports, migrations, exports
  • ETL pipelines — Transform large datasets in parallel
  • Worker process orchestration — Master–worker pattern with IPC
  • High-performance data processing — Split arrays, process chunks, aggregate results
  • Parallel array processing — Same function on many items, distributed across cores

Architecture

Cluster Hub uses a master–worker pattern. The main process orchestrates; worker processes do the work.

┌─────────────────────────────────────────────────────────────────────────────┐
│                           MAIN PROCESS (Orchestrator)                         │
│                                                                              │
│  1. Validate input                                                          │
│  2. Split data into chunks (splitToChunks)                                   │
│  3. Fork N worker processes (N = min(cpuCores, chunkCount))                  │
│  4. Send task to each worker via IPC                                         │
│  5. Collect results / propagate errors                                       │
│  6. Return aggregated results (or exit)                                      │
└─────────────────────────────────────────────────────────────────────────────┘
          │                    │                    │                    │
          │ fork()              │ fork()              │ fork()              │ fork()
          ▼                    ▼                    ▼                    ▼
┌─────────────────┐  ┌─────────────────┐  ┌─────────────────┐  ┌─────────────────┐
│   WORKER 0      │  │   WORKER 1      │  │   WORKER 2      │  │   WORKER N      │
│                 │  │                 │  │                 │  │                 │
│ • Receive chunk │  │ • Receive chunk │  │ • Receive chunk │  │ • Receive chunk │
│ • Import user   │  │ • Import user   │  │ • Import user   │  │ • Import user   │
│   module        │  │   module        │  │   module        │  │   module        │
│ • Run           │  │ • Run           │  │ • Run           │  │ • Run           │
│   childCore     │  │   childCore     │  │   childCore     │  │   childCore     │
│   ProcessFunc   │  │   ProcessFunc   │  │   ProcessFunc   │  │   ProcessFunc   │
│ • Send result   │  │ • Send result   │  │ • Send result   │  │ • Send result   │
│   or error      │  │   or error      │  │   or error      │  │   or error      │
└─────────────────┘  └─────────────────┘  └─────────────────┘  └─────────────────┘
          │                    │                    │                    │
          └────────────────────┴────────────────────┴────────────────────┘
                                        │
                              IPC (process.send / message)
                                        │
                                        ▼
┌─────────────────────────────────────────────────────────────────────────────┐
│  Results aggregated in chunk order → [result0, result1, result2, ...]        │
└─────────────────────────────────────────────────────────────────────────────┘

Component Overview

| Component | Responsibility | | ---------------------- | ------------------------------------------------------------------------------------------------ | | Main Process | Validates input, chunks data, spawns workers, collects results, handles errors | | Worker Process | Receives chunk, dynamically imports your module, runs childCoreProcessFunc, sends result/error | | IPC Channel | Bidirectional messaging between main and workers (worker.send() / process.send()) | | User Worker Module | Your business logic—must export childCoreProcessFunc |


Flow Diagram

flowchart TD
    subgraph Main["Main Process"]
        A["cluster(data, workerPath, options)"] --> B{"Validate input"}
        B -->|Invalid| C["Throw TypeError"]
        B -->|Valid| D["Split array into N chunks"]
        D --> E["workerCount = min(maxWorkers, chunks.length)"]
        E --> F["Fork worker for each chunk"]
        F --> G["Send task via IPC (chunk, index, workerPath)"]
        G --> H["Wait for all workers"]
        H --> I{"All succeeded"}
        I -->|Yes| J["Build results array in chunk order"]
        I -->|No| K["Reject with first error"]
        J --> L{"exitOnComplete"}
        L -->|true| M["process.exit(0)"]
        L -->|false| N["Return results"]
    end

    subgraph Worker["Worker Process"]
        W1["Receive message"] --> W2{"type == task"}
        W2 -->|No| W3["Ignore"]
        W2 -->|Yes| W4["Dynamic import worker module"]
        W4 --> W5{"childCoreProcessFunc exists"}
        W5 -->|No| W6["Send error via IPC"]
        W5 -->|Yes| W7["Await childCoreProcessFunc"]
        W7 --> W8{"Success"}
        W8 -->|Yes| W9["Send result via IPC"]
        W8 -->|No| W6
        W9 --> W10["Disconnect"]
        W6 --> W10
    end

    G -.->|IPC| W1
    W9 -.->|IPC| H
    W6 -.->|IPC| K

Message Flow (Sequence)

sequenceDiagram
    participant M as Main Process
    participant W0 as Worker 0
    participant W1 as Worker 1
    participant WN as Worker N

    M->>M: splitToChunks(data)
    M->>W0: fork() + send({ type: "task", inputData: chunk0, index: 0 })
    M->>W1: fork() + send({ type: "task", inputData: chunk1, index: 1 })
    M->>WN: fork() + send({ type: "task", inputData: chunkN, index: N })

    par Parallel execution
        W0->>W0: import(workerPath)
        W0->>W0: childCoreProcessFunc(chunk0)
        W0->>M: send({ type: "result", result })
    and
        W1->>W1: import(workerPath)
        W1->>W1: childCoreProcessFunc(chunk1)
        W1->>M: send({ type: "result", result })
    and
        WN->>WN: import(workerPath)
        WN->>WN: childCoreProcessFunc(chunkN)
        WN->>M: send({ type: "result", result })
    end

    M->>M: Aggregate results by index
    M->>M: Return [result0, result1, ..., resultN]

Performance Benchmarks

Test machine: 8-core CPU, Node.js 18+
Task: 1,000 items × 2,000 chained SHA256 hashes (heavy CPU work per item)

| Mode | Time Taken | | ------------------- | ---------- | | Sequential | 4.8 sec | | Promise.all() | 4.7 sec | | Cluster Hub (8 CPU) | 0.82 sec |

Result: ~5.8x speed improvement using all CPU cores.

Note: Cluster Hub has ~1–2 sec process startup overhead. It shines when per-item work is heavy (e.g. 5–50ms+). With trivial work (single fast hash), overhead dominates and sequential is faster.

Run the Included Benchmark

npm run benchmark

Custom Benchmark Code

import cluster from "@ananth_1725/cluster-hub"
import { performance } from "perf_hooks"

const data = Array.from({ length: 10000 }, (_, i) => ({ id: i }))

const start = performance.now()
await cluster(data, "./crypto-worker.js", { exitOnComplete: false })
const end = performance.now()

console.log(`Cluster Hub: ${((end - start) / 1000).toFixed(2)}s`)

Usage

Standalone Script (exits when done)

Use when this is your main entry point (e.g. cron job, CLI):

import cluster from "@ananth_1725/cluster-hub"

const data = [
	/* ... */
]
const workerPath = "/absolute/path/to/worker.js"

await cluster(data, workerPath)
// Process exits with code 0 on success, 1 on error

As a Library (returns results)

Use when embedding in a larger app (API server, script with more logic):

import cluster from "@ananth_1725/cluster-hub"

const data = [
	/* ... */
]
const workerPath = "/absolute/path/to/worker.js"

const results = await cluster(data, workerPath, { exitOnComplete: false })
// results: [chunk0Result, chunk1Result, ...]

Worker Module Contract

Your worker file must export childCoreProcessFunc (or default.childCoreProcessFunc):

export const childCoreProcessFunc = ({ data, cpuIndex }) => {
	// data: your chunk of the input array
	// cpuIndex: 0-based worker index (0, 1, 2, ...)
	return data.map(/* process each item */)
}

| Parameter | Type | Description | | ---------- | -------- | ------------------------------------------------ | | data | T[] | Chunk of the input array assigned to this worker | | cpuIndex | number | Zero-based index of this worker |

Return: Optional. Returned values are aggregated and passed back when exitOnComplete: false. Sync and async functions are supported.

Worker path: Must be a local file path (absolute or relative to cwd). URLs are not allowed.

Worker Pool

For repeated workloads (API servers, multiple batch runs), use a worker pool to reuse workers and eliminate startup overhead:

import { createClusterPool } from "@ananth_1725/cluster-hub"

const pool = createClusterPool("./worker.js", { maxWorkers: 4 })

// First run: workers start (no per-call fork overhead)
const results1 = await pool.run(data1)

// Second run: workers reused — instant
const results2 = await pool.run(data2)

pool.destroy() // cleanup when done

Observability Hooks

await cluster(data, workerPath, {
	exitOnComplete: false,
	timeout: 30000,
	onWorkerStart: i => console.log(`Worker ${i} started`),
	onWorkerComplete: i => console.log(`Worker ${i} done`),
	onWorkerError: (i, err) => console.error(`Worker ${i} failed:`, err),
	onProgress: (completed, total) => console.log(`${completed}/${total} chunks done`)
})

Real-World Examples

1. Batch Image Resizing

// main.js
import cluster from "@ananth_1725/cluster-hub"
import path from "path"
import { fileURLToPath } from "url"
import { readdirSync } from "fs"

const __dirname = path.dirname(fileURLToPath(import.meta.url))
const imagePaths = readdirSync("./uploads").map(f => path.join("./uploads", f))

const results = await cluster(imagePaths, path.join(__dirname, "resize-worker.js"), {
	exitOnComplete: false,
	maxWorkers: 4
})
// resize-worker.js
import sharp from "sharp"
import path from "path"

export const childCoreProcessFunc = async ({ data }) => {
	const results = []
	for (const imagePath of data) {
		const outputPath = path.join("output", path.basename(imagePath))
		await sharp(imagePath).resize(800).toFile(outputPath)
		results.push(outputPath)
	}
	return results
}

2. Parallel API / Database Calls

// main.js
const userIds = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
const results = await cluster(userIds, "./fetch-user-worker.js", {
	exitOnComplete: false,
	maxWorkers: 4
})
const flatResults = results.flat()
// fetch-user-worker.js
export const childCoreProcessFunc = async ({ data }) => {
	const users = await Promise.all(
		data.map(id => fetch(`https://api.example.com/users/${id}`).then(r => r.json()))
	)
	return users
}

3. Data ETL / Report Generation

// main.js
const rawRecords = await loadFromDatabase() // e.g. 10,000 rows
const processed = await cluster(rawRecords, "./transform-worker.js", {
	exitOnComplete: false
})
const report = generateReport(processed.flat())
await saveReport(report)
// transform-worker.js
export const childCoreProcessFunc = ({ data }) => {
	return data.map(record => ({
		id: record.id,
		normalized: normalize(record),
		computed: computeMetrics(record)
	}))
}

4. CPU-Intensive Calculations

// crypto-worker.js
import { createHash } from "crypto"

export const childCoreProcessFunc = ({ data }) => {
	return data.map(item => {
		const hash = createHash("sha256")
		hash.update(JSON.stringify(item))
		return hash.digest("hex")
	})
}

5. File Processing Pipeline

// main.js
import { readdirSync } from "fs"
import path from "path"

const files = readdirSync("./data").map(f => path.join("./data", f))
const parsed = await cluster(files, "./parse-worker.js", { exitOnComplete: false })
const allRecords = parsed.flat()
// parse-worker.js
import { readFileSync } from "fs"

export const childCoreProcessFunc = ({ data }) => {
	return data.map(filePath => {
		const content = readFileSync(filePath, "utf-8")
		return JSON.parse(content)
	})
}

API Reference

cluster(inputData, workerLocation, options?)

| Parameter | Type | Required | Description | | ---------------- | ---------------- | -------- | ---------------------------------------------------- | | inputData | T[] | Yes | Array of items to process. Split into chunks. | | workerLocation | string | Yes | Path to worker module (absolute or relative to cwd). | | options | ClusterOptions | No | Configuration. |

ClusterOptions

| Option | Type | Default | Description | | ------------------ | -------------------------------------------- | --------------- | ---------------------------------------------- | | exitOnComplete | boolean | false | If true, calls process.exit(0) on success. | | maxWorkers | number | cpus().length | Maximum number of worker processes. | | timeout | number | — | Max ms per worker; rejects if exceeded. | | onWorkerStart | (cpuIndex: number) => void | — | Called when a worker starts. | | onWorkerComplete | (cpuIndex: number) => void | — | Called when a worker completes successfully. | | onWorkerError | (cpuIndex: number, error) => void | — | Called when a worker fails. | | onProgress | (completed: number, total: number) => void | — | Called when each worker completes. | | errorMode | "fail-fast" | "continue" | "fail-fast" | See Error Modes. |

Returns

  • fail-fast (default): Promise<R[]> — Array of results, one per chunk. Rejects on first worker error.
  • continue: Promise<{ success, results, errors }> — Resolves with partial results; errors lists failed chunks.

Empty array when inputData is empty. When exitOnComplete: true, the process exits before resolving. Default is false for library safety.

Error Modes

| Mode | On worker failure | Use case | | ----------- | ------------------------------------ | ---------------------------------- | | fail-fast | Kill all workers, reject immediately | Atomic workloads (ETL, migrations) | | continue | Record error, let others finish | Best-effort (images, scraping) |

// fail-fast (default) — reject on first error
const results = await cluster(data, workerPath, { exitOnComplete: false })

// continue — get partial results + errors
const { success, results, errors } = await cluster(data, workerPath, {
	exitOnComplete: false,
	errorMode: "continue"
})
if (!success) {
	console.log(
		"Failed chunks:",
		errors.map(e => e.chunkIndex)
	)
}

Throws

  • TypeError — If inputData is not an array, workerLocation is invalid, maxWorkers is not a positive integer, or errorMode is not "fail-fast" or "continue".
  • Error — If any worker throws, exits with a non-zero code, is killed (e.g. SIGKILL), or times out.

createClusterPool(workerLocation, options?)

| Parameter | Type | Required | Description | | ---------------- | ------------- | -------- | ------------------------ | | workerLocation | string | Yes | Path to worker module. | | options | PoolOptions | No | { maxWorkers: number } |

Returns: { run(data), destroy(), workerCount } — Use pool.run(data) for each batch; call pool.destroy() when done.


When to Use

| Use Cluster Hub when… | Example | | ------------------------------------------------- | --------------------------------------------------- | | Processing large arrays of CPU-bound work | Hashing files, image resizing, JSON parsing, crypto | | Batch jobs (nightly reports, ETL, migrations) | Process 50,000 DB records, generate reports | | Need to use all CPU cores | Single-threaded loop leaves other cores idle | | Each item is independent | No shared state or ordering requirements |


When Not to Use

| Situation | Better alternative | | ------------------------------------------------- | -------------------------------- | | Small arrays (< 100 items) | Promise.all() or a simple loop | | Mostly I/O, few items | Promise.all() with async/await | | Need shared memory or threads | Node.js worker_threads | | Need job queue (retries, scheduling, persistence) | Bull, Agenda, or similar |


Comparison

| Feature | Cluster Hub | worker_threads | Piscina | Bull | | -------------------- | ----------- | -------------- | ------- | ---- | | Multi-core CPU usage | ✅ | ✅ | ✅ | ❌ | | Process isolation | ✅ | ❌ | ❌ | ❌ | | Shared memory | ❌ | ✅ | ✅ | ❌ | | Job persistence | ❌ | ❌ | ❌ | ✅ | | Redis required | ❌ | ❌ | ❌ | ✅ | | Ideal for ETL | ✅ | ⚠️ | ✅ | ⚠️ | | Setup complexity | Very Low | Medium | Medium | High |

Cluster Hub

  • Best for CPU-bound batch jobs
  • Simple mental model
  • Zero dependencies
  • Process-level crash isolation

worker_threads

  • Lower-level API; requires manual thread lifecycle and message passing
  • Good for shared memory scenarios
  • Cluster Hub abstracts orchestration into a single function

Piscina

  • Thread pool manager for worker_threads
  • More performant for heavy reuse
  • Higher complexity

Bull

  • Persistent Redis-backed job queue
  • Great for background tasks
  • Not meant for CPU parallelism

Why Not worker_threads?

Cluster Hub uses process-based parallelism instead of threads.

Advantages of process-based:

  • Full process isolation
  • Safer memory boundaries
  • Better crash containment
  • Simpler mental model

Choose based on your needs:

  • Use worker_threads if you need shared memory or lower overhead.
  • Use Cluster Hub if you need isolated parallel execution with minimal setup.

Production Use Cases

Cluster Hub is suitable for:

  • Large-scale batch jobs
  • Scheduled cron ETL tasks
  • Data migration pipelines
  • CPU-heavy cryptographic workloads
  • File transformation pipelines
  • Report generation from large datasets
  • Image/video processing at scale

Requirements

  • Node.js 18.0.0+
  • ESM"type": "module" in your package.json

TypeScript

TypeScript types are included. Import as usual:

import cluster from "@ananth_1725/cluster-hub"

const results = await cluster<number, number>(data, workerPath, { exitOnComplete: false })

Scripts

| Command | Description | | ------------------- | ------------------------- | | npm test | Run unit tests | | npm run benchmark | Run performance benchmark |


Considerations

| Scenario | Recommendation | | ------------------------------------------------------ | ------------------------------------------------------------------------------------------------------------ | | Running inside a clustered app (PM2, Node cluster) | Use maxWorkers: Math.floor(cpus().length / 2) to avoid oversubscribing cores | | Containers with CPU limits | Set maxWorkers explicitly; cpus().length may not reflect limits | | Light per-item work | Process overhead (~1–2 sec) dominates; use when each item takes 5–50ms+ | | Large input arrays | Data is serialized and copied via IPC; avoid extremely large object graphs per chunk | | Large items (>1MB each) | Consider passing file paths instead of raw objects; read in worker | | Worker failure (fail-fast) | When one worker fails, all others are terminated (SIGTERM). Use errorMode: "continue" for partial results. |


Documentation

| Document | Description | | -------------------------------- | --------------------------------------------------------- | | docs/README.md | Documentation index | | docs/AUDIT.md | Security, performance, reliability, and OS-level behavior |


Contributing

Contributions are welcome!

  1. Fork the repo
  2. Create a feature branch
  3. Submit a pull request

Please ensure changes are tested and documented.


License

MIT