@davo/wrkr
v0.0.2
Published
A worker utility for creative coding.
Readme
@davo/wrkr
A simple and efficient library for managing Web Workers and Shared Workers in JavaScript/TypeScript environments, optimized for Bun but usable elsewhere.
Features
WorkerPool: Manages a pool of dedicated workers for CPU-intensive tasks.- Concurrent task execution.
- Automatic worker management (creation, reuse, termination).
- Error handling.
- Support for transferable objects.
- Broadcasting messages from workers to the main thread.
SharedWorkerPool: Manages connections to a singleSharedWorker.- Facilitates communication between multiple browser tabs/windows and a single background worker instance.
- Connection management.
- Broadcasting messages from the main thread to all connected contexts.
Installation
pnpm add @davo/wrkr
# or
bun install @davo/wrkr
# or
npm install @davo/wrkr
# or
yarn add @davo/wrkrUsage
WorkerPool
Manages a pool of workers to execute tasks concurrently.
import { WorkerPool } from '@davo/wrkr';
// 1. Define your worker task function
const taskFunction = async (data) => {
// Perform CPU-intensive work
await new Promise(resolve => setTimeout(resolve, data * 10)); // Simulate work
const result = data * 2;
// Optionally broadcast messages back
broadcast('progress', result / 2);
return result;
};
// 2. Create a pool
const pool = new WorkerPool(taskFunction, {
size: 4, // Number of workers in the pool (defaults to navigator.hardwareConcurrency)
onBroadcast: (type, payload) => {
console.log(`Broadcast received: ${type} - ${payload}`);
}
});
// 3. Enqueue tasks
const inputs = [10, 20, 5, 15, 25];
// Using async generator
async function processTasksGenerator() {
console.log('Processing with generator...');
const results = [];
for await (const result of pool.enqueueGenerator(inputs)) {
console.log(`Generator Result: ${result}`);
results.push(result);
}
console.log('Generator Final Results:', results); // e.g., [20, 40, 10, 30, 50] (order matches input)
}
// Or using individual promises
async function processTasksPromises() {
console.log('\nProcessing with promises...');
const promises = inputs.map(input => pool.enqueue(input));
const results = await Promise.all(promises);
console.log('Promise Results:', results); // e.g., [10, 20, 40, 30, 50] (order depends on completion time)
}
// Run examples
await processTasksGenerator();
await processTasksPromises();
// 4. Handle errors
try {
const errorPool = new WorkerPool(() => {
throw new Error('Worker failed!');
}, { size: 1 });
await errorPool.enqueue('fail');
} catch (error) {
console.error('\nCaught worker error:', error.message); // Caught worker error: Worker failed!
}
// 5. Terminate the pool when done
pool.terminate();
console.log('\nPool terminated.');SharedWorkerPool
Manages connections to a SharedWorker for inter-context communication.
my-shared-worker.js (Example Shared Worker Script):
// my-shared-worker.js
const ports = new Map(); // Map connectionId (Symbol) to port
self.onconnect = function (e) {
const port = e.ports[0];
const connectionId = Symbol('connection-' + Math.random().toString(36).substring(7));
ports.set(connectionId, port);
console.log(`SharedWorker: Port connected, assigned ID: ${connectionId.toString()}`);
port.onmessage = async function (event) {
const [channel, id, name, args] = event.data;
console.log(`SharedWorker received [${channel}]: ID=${id}, Name=${name}, Args=`, args);
try {
// Initial channel setup confirmation
if (id === 0 && channel === name) {
// Send back [channel, id, connectionId] to confirm connection and provide ID
port.postMessage([channel, id, connectionId]);
console.log(`SharedWorker: Sent connection confirmation for ${connectionId.toString()}`);
return;
}
// Handle specific commands (e.g., 'processData')
if (name === 'processData') {
const result = args[0] * 10; // Example processing
port.postMessage([channel, id, result]); // Send result back [channel, id, result]
console.log(`SharedWorker: Processed data for ${connectionId.toString()}, result: ${result}`);
}
// Handle broadcasts from main thread (e.g., 'broadcastToAll')
else if (name === 'broadcastToAll') {
console.log(`SharedWorker: Received broadcast command: ${args[0]}`);
// Echo broadcast back to all ports (optional)
ports.forEach(p => {
if (p !== port) { // Don't send back to originator
p.postMessage([channel, 0, 'messageFromWorker', `Broadcast received: ${args[0]}`]);
}
});
}
else {
// Echo back unknown messages for simplicity in this example
port.postMessage([channel, id, args]);
}
} catch (error) {
console.error(`SharedWorker error processing message for ${connectionId.toString()}:`, error);
port.postMessage([channel, id, { error: error.message }]);
}
};
port.onclose = () => {
console.log(`SharedWorker: Port disconnected: ${connectionId.toString()}`);
ports.delete(connectionId);
};
port.start(); // Required for MessageChannel ports
};
console.log("SharedWorker script loaded.");Main Thread Usage:
import { SharedWorkerPool } from '@davo/wrkr';
// Ensure SharedWorker is available
if (typeof SharedWorker !== 'undefined') {
let connectionId1;
let connectionId2;
const receivedBroadcasts = [];
// Create a pool instance connected to your shared worker script
const pool = new SharedWorkerPool('./my-shared-worker.js', {
onConnect: (id) => {
console.log(`MainThread: Connected to SharedWorker, Connection ID: ${id.toString()}`);
// Assign connection IDs as they come in
if (!connectionId1) connectionId1 = id;
else if (!connectionId2) connectionId2 = id;
},
onCast: (type, payload) => {
console.log(`MainThread: Received broadcast from SharedWorker: ${type} - ${payload}`);
receivedBroadcasts.push({ type, payload });
},
onError: (error) => {
console.error("MainThread: SharedWorkerPool Error:", error);
}
});
// Wait a bit for connections to establish (in real apps, manage connection state)
await new Promise(resolve => setTimeout(resolve, 200));
if (connectionId1) {
console.log(`\nMainThread: Enqueuing task for connection ${connectionId1.toString()}`);
const result1 = await pool.enqueue(connectionId1, 'processData', 5);
console.log(`MainThread: Result 1 from SharedWorker: ${result1}`); // Expected: 50
// Example of sending a simple message (echoed back by example worker)
const echoResult = await pool.enqueue(connectionId1, 'echoMessage', 'hello');
console.log(`MainThread: Echo result from SharedWorker: ${echoResult}`); // Expected: ['hello']
} else {
console.error("MainThread: Connection 1 failed to establish.");
}
// Simulate another connection/tab (in reality, this would be separate)
// For testing, we can just use the same pool instance if the worker assigns unique IDs
if (connectionId2) {
console.log(`\nMainThread: Enqueuing task for connection ${connectionId2.toString()}`);
const result2 = await pool.enqueue(connectionId2, 'processData', 12);
console.log(`MainThread: Result 2 from SharedWorker: ${result2}`); // Expected: 120
} else {
// In a real scenario, a second pool instance in another context would connect.
// We simulate waiting for a potential second connection ID here for demo purposes.
console.log("MainThread: Second connection ID not received (as expected in single context test).");
}
// Broadcast a message from the main thread to the SharedWorker (and potentially other tabs)
console.log('\nMainThread: Broadcasting message to SharedWorker...');
pool.cast('broadcastToAll', 'Hello from main thread!');
// Wait a bit to see if the broadcast is echoed back by the worker
await new Promise(resolve => setTimeout(resolve, 100));
console.log("MainThread: Received Broadcasts:", receivedBroadcasts);
// Terminate the pool (closes the connection for this context)
pool.terminate();
console.log('\nMainThread: SharedWorkerPool terminated for this context.');
} else {
console.log('SharedWorker is not supported in this environment.');
}For AI Assistants
This section provides a structured overview and ready-to-use code snippets to help AI assistants understand and integrate the @davo/wrkr library for managing Web Workers and Shared Workers.
Library Specification
name: @davo/wrkr
type: JavaScript/TypeScript Library
primary_classes:
- WorkerPool: Manages a pool of dedicated Web Workers.
- SharedWorkerPool: Manages connections to a single SharedWorker.
purpose: Simplify creation, management, and communication with Web Workers and SharedWorkers.
inputs:
- WorkerPool: Task function, data payloads, pool options (size, onBroadcast).
- SharedWorkerPool: Path to SharedWorker script, connection options (onConnect, onCast, onError).
outputs:
- WorkerPool: Task results (via Promises or AsyncGenerator), broadcasted messages.
- SharedWorkerPool: Task results (via Promises), messages cast from SharedWorker.
dependencies: Relies on native Worker and SharedWorker APIs in the environment (Browser, Bun).
environment: Browser environments supporting Workers/SharedWorkers, Bun runtime. Node.js support may require polyfills or adjustments.Common Usage Patterns
WorkerPool
import { WorkerPool } from '@davo/wrkr';
// Define task
const heavyTask = async (input) => {
// ... perform work ...
broadcast('update', /* progress */);
return /* result */;
};
// Create pool
const pool = new WorkerPool(heavyTask, {
size: navigator.hardwareConcurrency || 4,
onBroadcast: (type, payload) => console.log(`Worker says: ${type}`, payload)
});
// Enqueue single task
pool.enqueue(data)
.then(result => console.log('Result:', result))
.catch(error => console.error('Task failed:', error));
// Enqueue multiple tasks (results order matches input order)
async function processBatch(inputs) {
const results = [];
for await (const result of pool.enqueueGenerator(inputs)) {
results.push(result);
}
return results;
}
// Terminate pool
pool.terminate();SharedWorkerPool
import { SharedWorkerPool } from '@davo/wrkr';
let myConnectionId = null;
// Create pool (connects to the SharedWorker)
const sharedPool = new SharedWorkerPool('./my-shared-worker.js', {
onConnect: (id) => {
console.log('Connected with ID:', id);
myConnectionId = id; // Store the unique ID for this connection
},
onCast: (type, payload) => console.log(`SharedWorker broadcast: ${type}`, payload),
onError: (error) => console.error('SharedWorker error:', error)
});
// Wait for connection before sending tasks
await new Promise(resolve => setTimeout(resolve, 100)); // Or use a more robust connection check
// Enqueue task for this specific connection
if (myConnectionId) {
sharedPool.enqueue(myConnectionId, 'commandName', arg1, arg2)
.then(result => console.log('SharedWorker result:', result))
.catch(error => console.error('SharedWorker task failed:', error));
}
// Broadcast message from main thread to SharedWorker (and potentially other tabs)
sharedPool.cast('messageToAllTabs', { info: 'Update available' });
// Terminate this connection
sharedPool.terminate(); // Closes the port for this contextCommon Questions & Scenarios
- How to handle worker errors?
WorkerPool: Individualenqueuepromises will reject. Use.catch()ortry...catchwithawait.SharedWorkerPool: Individualenqueuepromises will reject. The globalonErrorhandler can catch connection or unhandled worker errors.
- Difference between
WorkerPool.enqueueandWorkerPool.enqueueGenerator?enqueue: Returns aPromisefor a single task. Multiple calls resolve as tasks complete (order not guaranteed).enqueueGenerator: Takes an iterable of inputs, yields results as anAsyncGenerator. Results are yielded in the order of the input iterable, regardless of completion time. Ideal for processing batches where input order matters.
- How does
SharedWorkerPoolmanage multiple tabs/connections?- The
SharedWorkerscript itself runs once. EachSharedWorkerPoolinstance (in different tabs/contexts) creates a unique connection (MessagePort) to that single worker. - The
onConnectcallback provides a uniqueconnectionId(Symbol) for each connection. You must use thisconnectionIdwhen callingsharedPool.enqueueto target the correct context within the SharedWorker. sharedPool.castsends a message from the main thread to the SharedWorker, which can then decide how to handle it (e.g., broadcast to other connected ports).- Messages from the SharedWorker intended for all connections should be handled via the
onCastcallback.
- The
- How to pass complex data or transferables?
- Both pools use
postMessageinternally. You can pass any data cloneable via the structured clone algorithm. - For large data (like
ArrayBuffer), wrap the arguments intended for transfer in an array as the last argument toenqueue. The library attempts to identify and transfer these objects automatically. (Self-correction: Need to verify if the library explicitly supports transferable objects this way or if manual handling is needed). Let's assume standardpostMessagebehavior for now unless the tests show specific transferable handling.
- Both pools use
- Environment Compatibility?
- Designed primarily for Browsers and Bun, which have native
WorkerandSharedWorker. SharedWorkeris not available in Node.js.Worker(Dedicated) might work in Node.js viaworker_threadsif the internal worker creation logic adapts or is polyfilled.
- Designed primarily for Browsers and Bun, which have native
Parameter Reference
WorkerPool(taskFunction, options)
taskFunction:async (data) => result- The function to execute in the worker. Can usebroadcast(type, payload).options:size:number(Optional) - Number of workers in the pool. Defaults tonavigator.hardwareConcurrency.onBroadcast:(type, payload) => void(Optional) - Callback for messages sent viabroadcast()from workers.workerOptions:object(Optional) - Options passed directly to theWorkerconstructor (e.g.,{ type: 'module' }).
SharedWorkerPool(scriptURL, options)
scriptURL:string | URL- Path to the SharedWorker JavaScript file.options:onConnect:(connectionId: symbol) => void(Optional) - Called when a connection is established, providing the unique ID for this context.onCast:(type, payload) => void(Optional) - Callback for messages broadcast from the SharedWorker to all connections.onError:(error) => void(Optional) - Global error handler for the SharedWorker connection.workerOptions:object(Optional) - Options passed directly to theSharedWorkerconstructor (e.g.,{ type: 'module', name: 'mySharedWorker' }).
Running Tests
To run the included tests:
bun testThis project was created using bun init in bun v1.1.42. Bun is a fast all-in-one JavaScript runtime.
