playcluster
v1.1.0
Published
High-performance Playwright browser cluster with worker threads, auto-scaling, auto-healing, and built-in metrics
Maintainers
Readme
playcluster
A high-performance browser manager that runs Playwright browsers across isolated worker threads with auto-scaling, auto-healing, resource-aware scheduling, and built-in metrics.
Installation
npm install playclusterRequires Node.js 18+ —
playwright-coreis included as a dependency.You also need a Playwright-compatible browser installed. For example:
npx playwright install firefox
Quick Start
import { Cluster } from "playcluster";
const cluster = await Cluster.launch({
concurrency: "incognito",
maxConcurrency: 4,
browserType: "firefox",
monitor: true,
});
// Register a default task
await cluster.task(async ({ page, data }) => {
await page.goto(data.url);
return await page.title();
});
// Execute and get the result
const title = await cluster.execute({ url: "https://example.com" });
console.log("Page title:", title);
await cluster.close();API Reference
Cluster.launch(options)
Static factory method. Creates and initializes a cluster instance.
const cluster = await Cluster.launch({
concurrency: "incognito",
maxConcurrency: 8,
timeout: 30_000,
});Returns Promise<Cluster>.
Options
| Option | Type | Default | Description |
| ------------------------ | ------------------ | -------------- | ----------------------------------------------------------------------------------------------------------------------- |
| concurrency | string \| number | "incognito" | Concurrency mode: "page", "context", or "incognito" |
| maxConcurrency | number | cpus - 1 | Maximum number of worker threads |
| initialWorkers | number | 2 | Workers spawned at launch |
| timeout | number | 60000 | Per-job timeout in ms |
| retryLimit | number | 2 | Max retry attempts per failed job |
| retryDelay | number | 250 | Delay in ms between retries |
| maxQueueSize | number | 10000 | Max queued jobs before rejection |
| maxQueueWaitMs | number | 60000 | Max time a job can wait in queue |
| workerIdleTimeoutMs | number | 60000 | Idle time before a scaled-up worker is terminated |
| maxWorkersStarting | number | min(4, cpus) | Max workers booting simultaneously |
| workerRestartDelay | number | 500 | Delay before restarting a crashed worker |
| stallTimeoutPadding | number | 15000 | Extra time after timeout before a job is considered stalled |
| respectSystemResources | boolean | true | Throttle dispatch based on CPU/memory |
| maxCpuPercent | number | 80 | CPU threshold for throttling |
| maxMemoryPercent | number | 80 | Memory threshold for throttling |
| resourceCheckInterval | number | 750 | How often to re-check resources when throttled |
| pressureThresholds | number[] | [0.75, 0.9] | Queue pressure levels that trigger pressure events |
| monitor | boolean | false | Print periodic status to console |
| monitorInterval | number | 5000 | Interval for monitor output in ms |
| browserType | string | "firefox" | Playwright browser: "firefox", "chromium", or "webkit" |
| playwrightOptions | object | {} | Passed directly to browser.launch() |
| pageOptions | object | {} | Passed to browser.newContext() |
| workerModule | string | null | Absolute path to a JS module loaded in each worker thread. Its exports are available as helpers in the task function. |
Concurrency Modes
| Mode | Constant | Behavior |
| ------------- | ------------------------------- | -------------------------------------------------------- |
| "page" | Cluster.CONCURRENCY_PAGE | One browser, one context, reuses the same page |
| "context" | Cluster.CONCURRENCY_CONTEXT | One browser, one context, new page per job |
| "incognito" | Cluster.CONCURRENCY_INCOGNITO | One browser, new context + page per job (full isolation) |
cluster.task(fn)
Register a default task function. This function is serialized and executed inside worker threads.
await cluster.task(async ({ page, data, worker, helpers }) => {
await page.goto(data.url);
const html = await page.content();
return { url: data.url, length: html.length };
});Parameters received by the task function:
| Param | Description |
| --------- | --------------------------------------------------------------------------- |
| page | Playwright Page instance |
| data | The data object passed to execute() or queue() |
| worker | { id, thread_id } — worker identity |
| helpers | Exports from the workerModule (empty {} if no workerModule specified) |
Using workerModule
Since task functions are serialized and run in isolated worker threads, they cannot access imports or variables from the main thread scope. The workerModule option solves this by loading a module inside each worker thread and injecting its exports as helpers.
// helpers.js — this runs inside the worker thread
import { ofetch } from "ofetch";
export async function blockResources({ page }) {
await page.route("**/*", async (route) => {
const resourceType = route.request().resourceType();
if (["stylesheet", "font", "image"].includes(resourceType)) {
return route.abort();
}
return route.continue();
});
}
export const sleep = (ms) => new Promise((r) => setTimeout(r, ms));// index.js — main thread
import { Cluster } from "playcluster";
const cluster = await Cluster.launch({
concurrency: "incognito",
maxConcurrency: 4,
workerModule: new URL("./helpers.js", import.meta.url).pathname,
});
await cluster.task(async ({ page, data, helpers }) => {
const { blockResources, sleep } = helpers;
await blockResources({ page });
await page.goto(data.url);
await sleep(1000);
return await page.title();
});Important: Task functions are serialized to strings and evaluated in worker threads. They cannot reference closures, imports, or variables from the outer scope. Use
helpers(viaworkerModule) to access shared utilities, and{ page, data, worker }for per-job context.
cluster.execute(data, [taskFunction], [options])
Submit a job and wait for the result. Returns a Promise that resolves with the task's return value.
// Using the default task
const result = await cluster.execute({ url: "https://example.com" });
// With an inline task override
const title = await cluster.execute(
{ url: "https://example.com" },
async ({ page, data }) => {
await page.goto(data.url);
return page.title();
},
);
// With high priority (jumps to front of queue)
const urgent = await cluster.execute({ url: "https://critical.com" }, null, {
priority: "high",
});Parameters:
| Param | Type | Description |
| ------------------ | -------------------- | ---------------------------------------------------- |
| data | any | Serializable data passed to the task |
| taskFunction | function \| null | Optional per-job task override |
| options.priority | "high" \| "normal" | "high" inserts at queue front. Default: "normal" |
Throws if the cluster is closed, draining, queue is full, or the job fails after all retries.
cluster.queue(data, [taskFunction], [options])
Fire-and-forget version of execute(). Errors are emitted as taskerror events instead of throwing.
// Enqueue 1000 URLs without awaiting each one
for (const url of urls) {
await cluster.queue({ url });
}
// Wait for everything to finish
await cluster.idle();cluster.idle()
Returns a Promise that resolves when the queue is empty and all in-flight jobs have completed.
// Enqueue work
for (const item of items) {
await cluster.queue(item);
}
// Wait for all work to complete
await cluster.idle();
console.log("All done!");cluster.drain()
Graceful shutdown — stops accepting new jobs, waits for all queued and in-flight jobs to complete, then closes the cluster.
// Stop accepting and finish remaining work
await cluster.drain();
console.log("Drained and closed.");cluster.close()
Hard shutdown — immediately rejects all queued and in-flight jobs, then terminates all workers.
await cluster.close();cluster.pressure()
Returns the current queue pressure as a number between 0 and 1.
const p = cluster.pressure();
if (p > 0.9) {
console.warn("Queue almost full!");
}cluster.status()
Returns a snapshot of the cluster's current state including rolling metrics.
const s = cluster.status();
console.log(s);Returns:
{
start_time: "2026-03-05T12:00:00.000Z",
now: "2026-03-05T12:05:00.000Z",
uptime_ms: 300000,
all_target_count: 5000,
done_targets: 4800,
in_queue: 150,
in_flight: 4,
error_count: 12,
worker_count: 4,
workers_starting: 0,
idle_workers: 0,
cpu_usage: 62.3,
memory_usage: 45.1,
max_cpu_percent: 80,
max_memory_percent: 80,
pressure: 0.015,
draining: false,
metrics: {
jobs_per_second: 18.5,
avg_job_duration_ms: 3200,
p95_job_duration_ms: 8500,
error_rate: 0.0024,
},
workers: [
{
id: 0,
busy: true,
current_job_id: 4801,
total_jobs_completed: 1205,
consecutive_errors: 0,
restarts: 1,
},
// ...
],
}cluster.monitor()
Prints a one-line status summary to the console. Called automatically when monitor: true.
[cluster] done=4800/5000 queue=150 in_flight=4 workers=4(idle=0) cpu=62.3% mem=45.1% jps=18.5 p95=8500ms errors=12cluster.waitForOne()
Returns a Promise that resolves with the data of the next job that enters the queue.
const nextData = await cluster.waitForOne();
console.log("Next queued item:", nextData);Events
The cluster extends EventEmitter and emits the following events:
| Event | Payload | Description |
| ----------- | -------------------------- | --------------------------------------------- |
| taskerror | (error, data, willRetry) | Fired on every job failure (before retry) |
| queue | (data, taskFunction) | Fired when a new job enters the queue |
| idle | — | Fired when queue + in-flight reach zero |
| pressure | (ratio, threshold) | Fired when queue pressure crosses a threshold |
cluster.on("taskerror", (error, data, willRetry) => {
if (!willRetry) {
console.error(`Job failed permanently:`, error.message, data);
}
});
cluster.on("pressure", (ratio, threshold) => {
console.warn(
`Queue pressure at ${(ratio * 100).toFixed(0)}% (threshold: ${(threshold * 100).toFixed(0)}%)`,
);
});
cluster.on("idle", () => {
console.log("All work completed.");
});Usage Examples
Express Server with Scraping Endpoint
import express from "express";
import { Cluster } from "playcluster";
const app = express();
app.use(express.json());
const cluster = await Cluster.launch({
concurrency: "incognito",
maxConcurrency: 6,
timeout: 30_000,
monitor: true,
browserType: "firefox",
playwrightOptions: {
headless: true,
},
});
await cluster.task(async ({ page, data }) => {
await page.goto(data.url, { waitUntil: "domcontentloaded" });
const title = await page.title();
const text = await page.locator("body").innerText();
return { url: data.url, title, text_length: text.length };
});
app.post("/scrape", async (req, res) => {
try {
const result = await cluster.execute({ url: req.body.url });
res.json({ success: true, data: result });
} catch (error) {
res.status(500).json({ success: false, error: error.message });
}
});
app.get("/status", (req, res) => {
res.json(cluster.status());
});
app.listen(3000, () => console.log("Server running on :3000"));
// Graceful shutdown on SIGTERM
process.on("SIGTERM", async () => {
await cluster.drain();
process.exit(0);
});Batch Processing with Progress Tracking
import { Cluster } from "playcluster";
const cluster = await Cluster.launch({
concurrency: "incognito",
maxConcurrency: 8,
timeout: 45_000,
retryLimit: 3,
monitor: true,
});
const urls = [
"https://example.com/page1",
"https://example.com/page2",
"https://example.com/page3",
// ... thousands of URLs
];
const results = [];
const errors = [];
cluster.on("taskerror", (error, data, willRetry) => {
if (!willRetry) {
errors.push({ url: data.url, error: error.message });
}
});
// Enqueue all URLs
for (const url of urls) {
cluster.queue({ url }, async ({ page, data }) => {
await page.goto(data.url, { waitUntil: "networkidle" });
const title = await page.title();
results.push({ url: data.url, title });
});
}
// Wait for everything to complete
await cluster.idle();
console.log(`Completed: ${results.length} | Failed: ${errors.length}`);
await cluster.close();High-Priority Jobs
import { Cluster } from "playcluster";
const cluster = await Cluster.launch({
maxConcurrency: 4,
maxQueueSize: 5000,
});
await cluster.task(async ({ page, data }) => {
await page.goto(data.url);
return page.title();
});
// Normal priority — queued at the back
cluster.queue({ url: "https://example.com/low-priority" });
// High priority — jumps to the front of the queue
const urgentResult = await cluster.execute(
{ url: "https://example.com/urgent" },
null,
{ priority: "high" },
);
console.log("Urgent result:", urgentResult);
await cluster.close();Backpressure Handling
import { Cluster } from "playcluster";
const cluster = await Cluster.launch({
maxConcurrency: 4,
maxQueueSize: 1000,
pressureThresholds: [0.5, 0.75, 0.9],
});
await cluster.task(async ({ page, data }) => {
await page.goto(data.url);
return page.title();
});
cluster.on("pressure", (ratio, threshold) => {
if (threshold >= 0.9) {
console.error(
`CRITICAL: Queue at ${(ratio * 100).toFixed(0)}%! Pausing ingestion.`,
);
} else if (threshold >= 0.75) {
console.warn(`WARNING: Queue at ${(ratio * 100).toFixed(0)}%`);
} else {
console.log(`Queue pressure: ${(ratio * 100).toFixed(0)}%`);
}
});
// Check pressure before enqueuing
function enqueueIfSafe(data) {
if (cluster.pressure() >= 0.95) {
console.log("Queue near full, skipping:", data.url);
return false;
}
cluster.queue(data);
return true;
}
enqueueIfSafe({ url: "https://example.com" });
await cluster.idle();
await cluster.close();Custom Playwright Options (Proxy, Stealth, Viewport)
import { Cluster } from "playcluster";
const cluster = await Cluster.launch({
concurrency: "incognito",
maxConcurrency: 4,
browserType: "firefox",
playwrightOptions: {
headless: true,
firefoxUserPrefs: {
"media.peerconnection.enabled": false,
},
},
pageOptions: {
viewport: { width: 1920, height: 1080 },
userAgent: "Mozilla/5.0 (Windows NT 10.0; Win64; x64) ...",
proxy: {
server: "http://proxy.example.com:8080",
username: "user",
password: "pass",
},
},
});
await cluster.task(async ({ page, data }) => {
await page.goto(data.url);
return page.title();
});
const title = await cluster.execute({ url: "https://example.com" });
console.log(title);
await cluster.close();Using Concurrency Modes
import { Cluster } from "playcluster";
// PAGE mode — fastest, least isolation (shares one page)
const pageCluster = await Cluster.launch({
concurrency: Cluster.CONCURRENCY_PAGE,
maxConcurrency: 2,
});
// CONTEXT mode — new page per job, shares browser context (cookies shared)
const contextCluster = await Cluster.launch({
concurrency: Cluster.CONCURRENCY_CONTEXT,
maxConcurrency: 4,
});
// INCOGNITO mode — full isolation, new context + page per job (recommended)
const incognitoCluster = await Cluster.launch({
concurrency: Cluster.CONCURRENCY_INCOGNITO,
maxConcurrency: 8,
});Graceful Drain on Shutdown
import { Cluster } from "playcluster";
const cluster = await Cluster.launch({ maxConcurrency: 4 });
await cluster.task(async ({ page, data }) => {
await page.goto(data.url);
return page.title();
});
// Enqueue work
for (let i = 0; i < 100; i++) {
cluster.queue({ url: `https://example.com/page/${i}` });
}
// drain() stops accepting new work, finishes everything in-flight, then closes
await cluster.drain();
console.log("All jobs finished, cluster shut down gracefully.");Monitoring & Metrics in Production
import { Cluster } from "playcluster";
const cluster = await Cluster.launch({
maxConcurrency: 8,
monitor: true, // Auto-prints status every monitorInterval
monitorInterval: 10_000,
});
// Or manually poll status for your own monitoring system
setInterval(() => {
const s = cluster.status();
// Send to your metrics backend
metrics.gauge("cluster.queue_size", s.in_queue);
metrics.gauge("cluster.in_flight", s.in_flight);
metrics.gauge("cluster.workers", s.worker_count);
metrics.gauge("cluster.jps", s.metrics.jobs_per_second);
metrics.gauge("cluster.p95_ms", s.metrics.p95_job_duration_ms);
metrics.gauge("cluster.error_rate", s.metrics.error_rate);
metrics.gauge("cluster.cpu", s.cpu_usage);
metrics.gauge("cluster.memory", s.memory_usage);
}, 15_000);Architecture
Main Thread Worker Thread (×N)
┌──────────────────────┐ ┌─────────────────────────┐
│ Cluster │ │ threadWorker.js │
│ ├─ JobQueue (FIFO) │ message │ ├─ Playwright Browser │
│ ├─ Workers Map │ ────────► │ ├─ Context / Page pool │
│ ├─ Idle Set │ ◄──────── │ ├─ Task executor │
│ ├─ InFlight Map │ result │ └─ Auto-restart on crash│
│ ├─ SystemMonitor │ └─────────────────────────┘
│ ├─ RollingMetrics │
│ └─ Auto-scaler │
└──────────────────────┘- Task serialization — Task functions are converted to strings via
.toString()andeval()-ed inside worker threads. This means task functions must be self-contained. - Auto-scaling — Workers scale up when queue depth exceeds idle capacity, and scale down when idle longer than
workerIdleTimeoutMs. - Auto-healing — Crashed browsers are automatically restarted. Failed jobs are retried up to
retryLimittimes. - Resource awareness — CPU and memory are monitored; dispatch is throttled when thresholds are exceeded.
- Health-based routing — Jobs are routed to the healthiest available worker (fewest consecutive errors, most completions).
License
MIT
