@davo/wrkr
v0.0.4
Published
A worker utility for creative coding.
Downloads
117
Readme
@davo/wrkr
A simple and efficient library for managing Web Workers and Shared Workers in JavaScript/TypeScript environments, optimized for Bun but usable elsewhere.
Features
WorkerPool: Manages a pool of dedicated workers for CPU-intensive tasks.- Concurrent task execution with automatic load balancing
- Automatic worker management (creation, reuse, termination)
- Enhanced error handling with structured clone detection
- Support for transferable objects
- Broadcasting messages from workers to the main thread
- RPC mode for complex data serialization (Date, Map, Set, RegExp, BigInt)
SharedWorkerPool: Manages connections to a singleSharedWorker.- Facilitates communication between multiple browser tabs/windows and a single background worker instance
- Symbol-based connection management for unique identification
- Broadcasting messages from the main thread to all connected contexts
- Automatic port cleanup on disconnect
What's New in v0.0.4
- Bun-Specific Features: Module preloading (
preload), worker 'open' event (onOpen), environment data sharing - Enhanced Security: URL validation for preload imports to prevent code injection
- Runtime Detection: Improved Bun/Node.js/Browser detection in runtime abstraction layer
- New Tests: 12 additional tests covering Bun-specific features
What's New in v0.0.3
- RPC Layer: Complete devalue-based serialization for complex types (Date, Map, Set, RegExp, BigInt)
- Enhanced Error Handling: Automatic structuredClone detection with JSON fallback mechanisms
- MLXR Examples: Comprehensive Three.js + OffscreenCanvas multi-canvas rendering demonstrations
- Architecture Improvements: Modular example organization with performance optimizations
- Documentation: Added CHANGELOG.md, enhanced AGENTS.md, and publishing workflow guides
- Better Test Coverage: Comprehensive test suite maintaining >94% coverage with RPC integration tests
Installation
pnpm add @davo/wrkr
# or
bun install @davo/wrkr
# or
npm install @davo/wrkr
# or
yarn add @davo/wrkrUsage
Quick Start
import { WorkerPool } from '@davo/wrkr';
// Create a pool with a simple task
const pool = new WorkerPool(async (num) => num * 2);
// Process data
const result = await pool.enqueue(21);
console.log(result); // 42
// Clean up
pool.terminate();WorkerPool
Manages a pool of workers to execute tasks concurrently.
import { WorkerPool } from '@davo/wrkr';
// 1. Define your worker task function
const taskFunction = async (data) => {
// Perform CPU-intensive work
await new Promise(resolve => setTimeout(resolve, data * 10)); // Simulate work
const result = data * 2;
// Optionally broadcast messages back
broadcast('progress', result / 2);
return result;
};
// 2. Create a pool
const pool = new WorkerPool(taskFunction, {
size: 4, // Number of workers in the pool (defaults to navigator.hardwareConcurrency)
onBroadcast: (type, payload) => {
console.log(`Broadcast received: ${type} - ${payload}`);
}
});
// 3. Enqueue tasks
const inputs = [10, 20, 5, 15, 25];
// Using async generator
async function processTasksGenerator() {
console.log('Processing with generator...');
const results = [];
for await (const result of pool.enqueueGenerator(inputs)) {
console.log(`Generator Result: ${result}`);
results.push(result);
}
console.log('Generator Final Results:', results); // e.g., [20, 40, 10, 30, 50] (order matches input)
}
// Or using individual promises
async function processTasksPromises() {
console.log('\nProcessing with promises...');
const promises = inputs.map(input => pool.enqueue(input));
const results = await Promise.all(promises);
console.log('Promise Results:', results); // e.g., [10, 20, 40, 30, 50] (order depends on completion time)
}
// Run examples
await processTasksGenerator();
await processTasksPromises();
// 4. Handle errors with enhanced error detection
try {
const errorPool = new WorkerPool(() => {
throw new Error('Worker failed!');
}, { size: 1 });
await errorPool.enqueue('fail');
} catch (error) {
console.error('\nCaught worker error:', error.message); // Caught worker error: Worker failed!
}
// 5. Handle structured clone errors gracefully
const complexPool = new WorkerPool(async (data) => {
// Library automatically detects if data can't be cloned
// and provides helpful error messages
return data;
});
try {
// Functions can't be cloned - library will detect this
await complexPool.enqueue(() => console.log('test'));
} catch (error) {
console.error('Clone error:', error.message);
}
// 6. Terminate the pools when done
pool.terminate();
complexPool.terminate();
console.log('\nPools terminated.');SharedWorkerPool
Manages connections to a SharedWorker for inter-context communication.
my-shared-worker.js (Example Shared Worker Script):
// my-shared-worker.js
const ports = new Map(); // Map connectionId (Symbol) to port
self.onconnect = function (e) {
const port = e.ports[0];
const connectionId = Symbol('connection-' + Math.random().toString(36).substring(7));
ports.set(connectionId, port);
console.log(`SharedWorker: Port connected, assigned ID: ${connectionId.toString()}`);
port.onmessage = async function (event) {
const [channel, id, name, args] = event.data;
console.log(`SharedWorker received [${channel}]: ID=${id}, Name=${name}, Args=`, args);
try {
// Initial channel setup confirmation
if (id === 0 && channel === name) {
// Send back [channel, id, connectionId] to confirm connection and provide ID
port.postMessage([channel, id, connectionId]);
console.log(`SharedWorker: Sent connection confirmation for ${connectionId.toString()}`);
return;
}
// Handle specific commands (e.g., 'processData')
if (name === 'processData') {
const result = args[0] * 10; // Example processing
port.postMessage([channel, id, result]); // Send result back [channel, id, result]
console.log(`SharedWorker: Processed data for ${connectionId.toString()}, result: ${result}`);
}
// Handle broadcasts from main thread (e.g., 'broadcastToAll')
else if (name === 'broadcastToAll') {
console.log(`SharedWorker: Received broadcast command: ${args[0]}`);
// Echo broadcast back to all ports (optional)
ports.forEach(p => {
if (p !== port) { // Don't send back to originator
p.postMessage([channel, 0, 'messageFromWorker', `Broadcast received: ${args[0]}`]);
}
});
}
else {
// Echo back unknown messages for simplicity in this example
port.postMessage([channel, id, args]);
}
} catch (error) {
console.error(`SharedWorker error processing message for ${connectionId.toString()}:`, error);
port.postMessage([channel, id, { error: error.message }]);
}
};
port.onclose = () => {
console.log(`SharedWorker: Port disconnected: ${connectionId.toString()}`);
ports.delete(connectionId);
};
port.start(); // Required for MessageChannel ports
};
console.log("SharedWorker script loaded.");Main Thread Usage:
import { SharedWorkerPool } from '@davo/wrkr';
// Ensure SharedWorker is available
if (typeof SharedWorker !== 'undefined') {
let connectionId1;
let connectionId2;
const receivedBroadcasts = [];
// Create a pool instance connected to your shared worker script
const pool = new SharedWorkerPool('./my-shared-worker.js', {
onConnect: (id) => {
console.log(`MainThread: Connected to SharedWorker, Connection ID: ${id.toString()}`);
// Assign connection IDs as they come in
if (!connectionId1) connectionId1 = id;
else if (!connectionId2) connectionId2 = id;
},
onCast: (type, payload) => {
console.log(`MainThread: Received broadcast from SharedWorker: ${type} - ${payload}`);
receivedBroadcasts.push({ type, payload });
},
onError: (error) => {
console.error("MainThread: SharedWorkerPool Error:", error);
}
});
// Wait a bit for connections to establish (in real apps, manage connection state)
await new Promise(resolve => setTimeout(resolve, 200));
if (connectionId1) {
console.log(`\nMainThread: Enqueuing task for connection ${connectionId1.toString()}`);
const result1 = await pool.enqueue(connectionId1, 'processData', 5);
console.log(`MainThread: Result 1 from SharedWorker: ${result1}`); // Expected: 50
// Example of sending a simple message (echoed back by example worker)
const echoResult = await pool.enqueue(connectionId1, 'echoMessage', 'hello');
console.log(`MainThread: Echo result from SharedWorker: ${echoResult}`); // Expected: ['hello']
} else {
console.error("MainThread: Connection 1 failed to establish.");
}
// Simulate another connection/tab (in reality, this would be separate)
// For testing, we can just use the same pool instance if the worker assigns unique IDs
if (connectionId2) {
console.log(`\nMainThread: Enqueuing task for connection ${connectionId2.toString()}`);
const result2 = await pool.enqueue(connectionId2, 'processData', 12);
console.log(`MainThread: Result 2 from SharedWorker: ${result2}`); // Expected: 120
} else {
// In a real scenario, a second pool instance in another context would connect.
// We simulate waiting for a potential second connection ID here for demo purposes.
console.log("MainThread: Second connection ID not received (as expected in single context test).");
}
// Broadcast a message from the main thread to the SharedWorker (and potentially other tabs)
console.log('\nMainThread: Broadcasting message to SharedWorker...');
pool.cast('broadcastToAll', 'Hello from main thread!');
// Wait a bit to see if the broadcast is echoed back by the worker
await new Promise(resolve => setTimeout(resolve, 100));
console.log("MainThread: Received Broadcasts:", receivedBroadcasts);
// Terminate the pool (closes the connection for this context)
pool.terminate();
console.log('\nMainThread: SharedWorkerPool terminated for this context.');
} else {
console.log('SharedWorker is not supported in this environment.');
}Bun-Specific Features
The following features are available when running in Bun and are silently ignored (or throw errors where noted) in other environments.
Module Preloading (preload)
Preload modules before the worker function executes. Useful for initializing monitoring agents, polyfills, or shared dependencies.
import { WorkerPool } from '@davo/wrkr';
// Preload a single module
const pool = new WorkerPool(async (data) => {
// Sentry is preloaded and available
Sentry.captureMessage('Processing started');
return processData(data);
}, {
preload: './sentry-init.js',
size: 4
});
// Preload multiple modules (array)
const pool2 = new WorkerPool(async (data) => {
// Both modules are loaded before this function runs
return data * 2;
}, {
preload: ['./monitoring.js', './polyfills.js'],
size: 4
});Notes:
- Preload modules execute via dynamic
import()before your worker function and preamble - Order is preserved:
preload→preamble→ worker function - In RPC mode, preload works the same way
Worker 'open' Event (onOpen)
Bun emits an 'open' event when a worker is ready to receive messages (not available in browsers). Use onOpen to run setup code.
import { createWorker } from '@davo/wrkr';
const worker = createWorker((x) => x * 2, {
onOpen: () => {
console.log('Worker is ready to receive messages');
}
});Notes:
- Only fires in Bun; silently ignored in Node.js and browsers
- The event fires immediately after worker creation
- For
WorkerPool, passonOpenin options (it will be attached to each worker)
Environment Data Sharing
Share data between main thread and workers using setEnvironmentData / getEnvironmentData (from Node.js worker_threads).
import { setEnvironmentData, getEnvironmentData, WorkerPool } from '@davo/wrkr';
// Set data in main thread (Node.js/Bun only)
await setEnvironmentData('config', {
apiUrl: 'https://api.example.com',
apiKey: 'secret123'
});
// Access in worker
const pool = new WorkerPool(async (data) => {
const config = await getEnvironmentData('config');
const response = await fetch(`${config.apiUrl}/process`, {
headers: { 'Authorization': config.apiKey }
});
return response.json();
}, { size: 4 });
// Clean up when done
await setEnvironmentData('config', undefined);Notes:
- Only works in Node.js and Bun (uses
worker_threads) - Throws
Error: not supported in browserin browser environments - Data must be set before the worker is created
- Works in both main thread and worker threads in Node.js/Bun
Feature Support Matrix
| Feature | Bun | Node.js | Browser |
|---------|-----|---------|---------|
| preload | ✅ | ✅ | ✅ |
| onOpen | ✅ | ❌ (ignored) | ❌ (ignored) |
| setEnvironmentData | ✅ | ✅ | ❌ (throws) |
| getEnvironmentData | ✅ | ✅ | ❌ (throws) |
Development
Prerequisites
- Bun v1.0+ (recommended) or Node.js v20+
- Git for version control
Local Setup
Fork and clone the repository:
git clone https://github.com/davo/wrkr.git cd wrkrInstall dependencies:
bun install # or npm installRun tests to verify setup:
bun test
Development Commands
# Development
bun run dev # Watch mode for development
bun test # Run all tests
bun test:watch # Run tests in watch mode
bun test:coverage # Run tests with coverage
# Building & Publishing
bun run build # Build for production
bun run prepublishOnly # Build before publish
npm publish # Publish to npm
# Specific Test Suites
bun test:worker # Test worker.test.js
bun test:pool # Test pool.test.js
bun test:shared # Test shared-pool.test.js
bun test:utils # Test utils.test.js
# Feature-Based Testing
bun test:feature:basic # Test pattern "Worker Tests"
bun test:feature:pool # Test pattern "Worker Pool Tests"
bun test:feature:shared # Test pattern "SharedWorkerPool Tests"
bun test:feature:errors # Test pattern "Error Handling"
bun test:feature:lifecycle # Test pattern "Lifecycle"Examples Dev Server (Bun + Vite)
Run the examples with a local Vite server via Bun workspaces. For more details on how workspaces are configured, see the Bun Workspaces Guide.
# Install dependencies for all workspaces (root + examples/dev-server)
bun install
# Start only the examples server (opens Three.js example by default)
bun run dev:examples
# Or build library in watch mode and serve examples together
bun run dev:allVite serves from examples/ at http://localhost:5173 and opens /three-offscreen-worker/.
Available Examples
- Three.js OffscreenCanvas (
/three-offscreen-worker/): Basic worker-based rendering with Three.js - MLXR Examples (
/mlxr/): Advanced multi-canvas and OffscreenCanvas patterns- Multi-canvas synchronized rendering
- OffscreenCanvas with OrbitControls
- OffscreenCanvas with picking/raycasting
- Multi-view and multi-screen coordination
- Spherical Strip (
/spherical-strip/): Complex geometry generation in workers - Image Filters (
/image-filters/): Image processing using worker pools
The examples import ../../src/index.js directly; the Vite config allows
serving source files from the repository root for fast iteration without building.
RPC Mode (Opt-in)
Enable richer serialization for complex types using a devalue-backed protocol. RPC is off by default; turn it on per pool.
import { WorkerPool } from '@davo/wrkr'
const pool = new WorkerPool((data) => {
// `data` supports Date, Map, Set, RegExp, BigInt
return {
echoed: data,
seenAt: new Date()
}
}, { rpc: true, size: 2, onBroadcast: (type, payload) => console.log(type, payload) })
const result = await pool.enqueue({
date: new Date('2024-01-01'),
map: new Map([["a", 1]]),
set: new Set([1, 2, 3]),
regex: /test/gi,
bigint: BigInt('12345678901234567890')
})
// result.echoed preserves complex types
pool.broadcast('tick', 42) // broadcast in RPC mode
pool.terminate()Notes
- Protocol: Single-string messages prefer
devalue.stringify/parsewith JSON fallback. - Worker: The library injects a pre-bundled/shimmed
devalueinto the worker whenrpc: true(no network calls). - Backward compatibility: Non-RPC mode is unchanged and continues to use structured clone.
- Status: Function/promise remoting and streaming are planned next (see docs/planned/prd_rpc_layer.md).
Node integration tests (optional)
- Gate Node worker_threads RPC tests with
WRKR_NODE_RPC_TESTS=1to avoid CI/sandbox issues:WRKR_NODE_RPC_TESTS=1 bun test src/__tests__/rpc-node-integration.test.js- Or
WRKR_NODE_RPC_TESTS=1 bun testto include them with the full suite
Project Structure
@davo/wrkr/
├── src/
│ ├── index.js # Main entry point, exports all classes
│ ├── worker.js # Worker creation utilities
│ ├── pool.js # WorkerPool implementation
│ ├── shared-pool.js # SharedWorkerPool implementation
│ ├── utils.js # Utility functions
│ ├── runtime/ # Runtime compatibility layer (Node.js support)
│ └── __tests__/ # Test suite with .test.js suffix
├── examples/ # Usage examples
├── docs/ # Technical documentation & PRDs
├── data/ # Research documentation
├── chats/ # Development session logs
├── dist/ # Built output (bun build target)
└── bunfig.toml # Bun configuration with coverage thresholdsRuntime Compatibility
This library supports multiple JavaScript runtimes with varying feature sets:
| Runtime | Worker API | SharedWorker | worker_threads | Status | |---------|------------|--------------|----------------|--------| | Bun | ✅ | ❌ | ✅ | Stable (Primary) | | Browser | ✅ | ✅ | ❌ | Stable | | Node.js | ⚠️ | ❌ | ✅ | Phase 1 (In Progress) | | Cloudflare Workers | ✅ | ❌ | ❌ | Experimental | | Deno | ✅ | ❌ | ❌ | Experimental |
Edge Runtime Support (Experimental)
Edge runtimes like Cloudflare Workers and Deno use V8 isolates instead of OS threads, which creates different constraints:
What works:
- ✅
WorkerPoolwith standard Worker API - ✅ Message passing via
postMessage - ✅ Inline worker functions
What doesn't work:
- ❌
setEnvironmentData/getEnvironmentData(requiresworker_threads) - ❌ SharedWorker API (not available in most edge runtimes)
- ❌ Bun-specific features (
preload,onOpen)
Check runtime support:
import { runtime } from '@davo/wrkr';
if (runtime.capabilities.environmentData) {
// Use setEnvironmentData
} else {
// Fallback to postMessage
}See docs/EDGE_RUNTIME_SUPPORT.md for detailed compatibility matrix and usage patterns.
Node.js Compatibility Status
- Current: Phase 1 runtime abstraction layer implemented
- Issues: Async/sync impedance mismatch, message queue timing problems
- Test Coverage: 81.61% function, 77.83% line (down from 94% target)
- Status: Requires Phase 2 completion for production use
See docs/in-progress/PRD_NODE_COMPATIBILITY.md for detailed implementation status and completion roadmap.
Contributing
- Create a feature branch:
git checkout -b feature/your-feature-name - Make your changes and ensure tests pass:
bun test - Check coverage:
bun test:coverage(maintain >94% coverage) - Build the project:
bun run build - Submit a pull request with clear description of changes
Testing Strategy
- Unit Tests: Individual component functionality
- Integration Tests: Cross-component interaction
- Performance Tests: Worker pool efficiency and memory usage
- Coverage Requirements: >94% function coverage, >94% line coverage
Known Issues & Limitations
- Node.js: Phase 1 foundation only - async initialization issues, 8/10 pool tests failing
- SharedWorker: Not available in Node.js (emulation planned for Phase 2)
- Edge Runtimes: Experimental support - detection heuristics may need refinement based on community feedback
- Test Coverage: Reduced to 81.61%/77.83% due to incomplete Node.js compatibility layer
For detailed technical information, see the docs/ directory.
For AI Assistants
This section provides a structured overview and ready-to-use code snippets to help AI assistants understand and integrate the @davo/wrkr library for managing Web Workers and Shared Workers.
Library Specification
name: @davo/wrkr
type: JavaScript/TypeScript Library
primary_classes:
- WorkerPool: Manages a pool of dedicated Web Workers.
- SharedWorkerPool: Manages connections to a single SharedWorker.
purpose: Simplify creation, management, and communication with Web Workers and SharedWorkers.
inputs:
- WorkerPool: Task function, data payloads, pool options (size, onBroadcast).
- SharedWorkerPool: Path to SharedWorker script, connection options (onConnect, onCast, onError).
outputs:
- WorkerPool: Task results (via Promises or AsyncGenerator), broadcasted messages.
- SharedWorkerPool: Task results (via Promises), messages cast from SharedWorker.
dependencies: Relies on native Worker and SharedWorker APIs in the environment (Browser, Bun).
environment: Browser environments supporting Workers/SharedWorkers, Bun runtime. Node.js support may require polyfills or adjustments.Common Usage Patterns
WorkerPool
import { WorkerPool } from '@davo/wrkr';
// Define task
const heavyTask = async (input) => {
// ... perform work ...
broadcast('update', /* progress */);
return /* result */;
};
// Create pool
const pool = new WorkerPool(heavyTask, {
size: navigator.hardwareConcurrency || 4,
onBroadcast: (type, payload) => console.log(`Worker says: ${type}`, payload)
});
// Enqueue single task
pool.enqueue(data)
.then(result => console.log('Result:', result))
.catch(error => console.error('Task failed:', error));
// Enqueue multiple tasks (results order matches input order)
async function processBatch(inputs) {
const results = [];
for await (const result of pool.enqueueGenerator(inputs)) {
results.push(result);
}
return results;
}
// Terminate pool
pool.terminate();SharedWorkerPool
import { SharedWorkerPool } from '@davo/wrkr';
let myConnectionId = null;
// Create pool (connects to the SharedWorker)
const sharedPool = new SharedWorkerPool('./my-shared-worker.js', {
onConnect: (id) => {
console.log('Connected with ID:', id);
myConnectionId = id; // Store the unique ID for this connection
},
onCast: (type, payload) => console.log(`SharedWorker broadcast: ${type}`, payload),
onError: (error) => console.error('SharedWorker error:', error)
});
// Wait for connection before sending tasks
await new Promise(resolve => setTimeout(resolve, 100)); // Or use a more robust connection check
// Enqueue task for this specific connection
if (myConnectionId) {
sharedPool.enqueue(myConnectionId, 'commandName', arg1, arg2)
.then(result => console.log('SharedWorker result:', result))
.catch(error => console.error('SharedWorker task failed:', error));
}
// Broadcast message from main thread to SharedWorker (and potentially other tabs)
sharedPool.cast('messageToAllTabs', { info: 'Update available' });
// Terminate this connection
sharedPool.terminate(); // Closes the port for this contextCommon Questions & Scenarios
- How to handle worker errors?
WorkerPool: Individualenqueuepromises will reject. Use.catch()ortry...catchwithawait.SharedWorkerPool: Individualenqueuepromises will reject. The globalonErrorhandler can catch connection or unhandled worker errors.
- Difference between
WorkerPool.enqueueandWorkerPool.enqueueGenerator?enqueue: Returns aPromisefor a single task. Multiple calls resolve as tasks complete (order not guaranteed).enqueueGenerator: Takes an iterable of inputs, yields results as anAsyncGenerator. Results are yielded in the order of the input iterable, regardless of completion time. Ideal for processing batches where input order matters.
- How does
SharedWorkerPoolmanage multiple tabs/connections?- The
SharedWorkerscript itself runs once. EachSharedWorkerPoolinstance (in different tabs/contexts) creates a unique connection (MessagePort) to that single worker. - The
onConnectcallback provides a uniqueconnectionId(Symbol) for each connection. You must use thisconnectionIdwhen callingsharedPool.enqueueto target the correct context within the SharedWorker. sharedPool.castsends a message from the main thread to the SharedWorker, which can then decide how to handle it (e.g., broadcast to other connected ports).- Messages from the SharedWorker intended for all connections should be handled via the
onCastcallback.
- The
- How to pass complex data or transferables?
- Both pools use
postMessageinternally. You can pass any data cloneable via the structured clone algorithm. - For large data (like
ArrayBuffer), wrap the arguments intended for transfer in an array as the last argument toenqueue. The library attempts to identify and transfer these objects automatically. (Self-correction: Need to verify if the library explicitly supports transferable objects this way or if manual handling is needed). Let's assume standardpostMessagebehavior for now unless the tests show specific transferable handling.
- Both pools use
- Environment Compatibility?
- Designed primarily for Browsers and Bun, which have native
WorkerandSharedWorker. SharedWorkeris not available in Node.js.Worker(Dedicated) might work in Node.js viaworker_threadsif the internal worker creation logic adapts or is polyfilled.
- Designed primarily for Browsers and Bun, which have native
Parameter Reference
WorkerPool(taskFunction, options)
taskFunction:async (data) => result- The function to execute in the worker. Can usebroadcast(type, payload).options:size:number(Optional) - Number of workers in the pool. Defaults tonavigator.hardwareConcurrency.onBroadcast:(type, payload) => void(Optional) - Callback for messages sent viabroadcast()from workers.workerOptions:object(Optional) - Options passed directly to theWorkerconstructor (e.g.,{ type: 'module' }).
SharedWorkerPool(scriptURL, options)
scriptURL:string | URL- Path to the SharedWorker JavaScript file.options:onConnect:(connectionId: symbol) => void(Optional) - Called when a connection is established, providing the unique ID for this context.onCast:(type, payload) => void(Optional) - Callback for messages broadcast from the SharedWorker to all connections.onError:(error) => void(Optional) - Global error handler for the SharedWorker connection.workerOptions:object(Optional) - Options passed directly to theSharedWorkerconstructor (e.g.,{ type: 'module', name: 'mySharedWorker' }).
Running Tests
To run the included tests:
bun testThis project was created using bun init in bun v1.1.42. Bun is a fast all-in-one JavaScript runtime.
