smart-pool
v1.0.0
Published
High-performance priority promise pool with concurrency control
Maintainers
Readme
Smart Pool 
A high-performance, feature-rich task queue and concurrency management library for Node.js. Built for production workloads requiring advanced scheduling, priority management, batching, rate limiting, circuit breaking, worker thread support, and adaptive concurrency.
Features
- Priority Queue: Binary max-heap with dynamic priority adjustments
- Concurrency Control: Fixed or adaptive concurrency limits
- Rate Limiting: Per-type rate limits with token bucket algorithm
- Circuit Breakers: Automatic failure detection and recovery
- Task Batching: Group similar tasks for efficient processing
- Worker Threads: Offload CPU-intensive tasks to worker threads
- Task Dependencies: Execute tasks only after dependencies complete
- Caching: Deduplicate identical pending tasks
- Retry Logic: Exponential backoff with configurable limits
- Abort Support: Cancel tasks via AbortSignal
- Priority Aging: Prevent starvation with automatic priority boosts
- Priority Decay: Reduce priority of stale high-priority tasks
- Metrics: Real-time performance tracking with percentiles
- Lifecycle Hooks: Execute code at key points in task execution
- Sub-queues: Isolated queues with independent concurrency limits
- Weight-based Load: Track and limit load by task weight
Installation
npm install smart-poolQuick Start
import smartPool from 'smart-pool';
const pool = smartPool(5);
const result = await pool(async () => {
return 'Task completed';
});
console.log(result);API Reference
smartPool(concurrency, options)
Creates a new task pool instance.
Parameters:
concurrency(number): Maximum number of concurrent tasks (minimum 1)options(object, optional): Global configuration options
Returns: PoolInstance
Pool Instance
Methods
pool(task, options)
Enqueue and execute a task.
Parameters:
task(function): Async function to executeoptions(number | object): Priority (number) or task options (object)
Task Options:
priority(number): Task priority (higher = executed sooner). Default: 0weight(number): Task weight for load tracking. Default: 1type(string): Task type for rate limiting and circuit breakingcacheKey(string): Deduplicate identical pending tasksbatchKey(string): Group tasks for batch processingid(string | number): Unique task identifiertags(string[]): Tags for filtering/cancellationmetadata(object): Custom metadatadependsOn(array): Task IDs that must complete firstdeadline(number): Unix timestamp when task expiressignal(AbortSignal): Abort signal for cancellationtimeout(number): Task timeout in millisecondsretryCount(number): Maximum retry attemptsretryDelay(number): Initial retry delay in millisecondsworker(object): Worker thread configurationpath(string): Path to worker scriptdata(any): Data to pass to worker
Returns: Promise resolving to task result
Example:
const result = await pool(
async () => fetchData(),
{
priority: 10,
type: 'api',
retryCount: 3,
timeout: 5000
}
);pool.map(items, fn, options)
Map function over array items using the pool.
Parameters:
items(array): Items to processfn(function): Async function to apply to each itemoptions(number | object): Priority or task optionsthrowOnError(boolean): Throw on first error. Default: true
Returns: Promise with results
Example:
const results = await pool.map(
[1, 2, 3, 4, 5],
async (n) => n * 2,
{ priority: 5 }
);pool.pause()
Pause task execution. Queued tasks remain in queue.
Example:
pool.pause();
console.log(pool.isPaused);pool.resume()
Resume task execution after pause.
Example:
pool.resume();pool.cancel(query)
Cancel pending tasks matching criteria.
Parameters:
query(object | function):- Object:
{ id, tag }to match tasks - Function:
(task) => booleanpredicate
- Object:
Returns: Number of cancelled tasks
Example:
await pool(async () => work(), { id: 'task-1', tags: ['batch-1'] });
await pool(async () => work(), { id: 'task-2', tags: ['batch-1'] });
pool.cancel({ tag: 'batch-1' });
pool.cancel((task) => task.priority < 5);pool.onIdle()
Wait for all tasks to complete, including batched and blocked tasks.
Returns: Promise<{ errors, failed, metrics }>
Example:
const { errors, failed, metrics } = await pool.onIdle();
console.log(`Completed with ${errors.length} errors`);pool.drain()
Stop accepting new tasks and wait for completion. Equivalent to setting drain mode + onIdle.
Returns: Promise<{ errors, failed, metrics }>
Example:
await pool.drain();
console.log('All tasks completed, pool drained');pool.clear()
Cancel all pending tasks and terminate all workers. Resets pool state.
Returns: Promise
Example:
await pool.clear();
console.log('Pool cleared');pool.setConcurrency(limit)
Dynamically adjust concurrency limit.
Parameters:
limit(number): New concurrency limit
Example:
pool.setConcurrency(10);
console.log(pool.concurrency);pool.peek()
View the next task to be executed without removing it.
Returns: Task object or null
Example:
const nextTask = pool.peek();
console.log(nextTask?.priority);pool.remove(predicate)
Remove tasks from queue matching predicate.
Parameters:
predicate(function):(task) => boolean
Returns: Boolean indicating if any tasks were removed
Example:
pool.remove((task) => task.priority < 3);pool.useQueue(name, concurrency)
Create or get an isolated sub-queue with independent concurrency control.
Parameters:
name(string): Sub-queue identifierconcurrency(number, optional): Sub-queue concurrency. Default: parent concurrency
Returns: PoolInstance for the sub-queue
Example:
const apiQueue = pool.useQueue('api', 3);
const dbQueue = pool.useQueue('database', 5);
await apiQueue(async () => fetchAPI());
await dbQueue(async () => queryDB());pool.getWorkerHealth()
Get health status of all worker threads.
Returns: Array<{ path, busy, active }>
Example:
const health = pool.getWorkerHealth();
health.forEach(w => {
console.log(`Worker ${w.path}: ${w.busy ? 'busy' : 'idle'}`);
});Properties
pool.activeCount
Number of currently executing tasks (read-only).
pool.pendingCount
Number of tasks waiting in queue, batches, or blocked by dependencies (read-only).
pool.currentLoad
Current total weight of active tasks (read-only).
pool.concurrency
Current concurrency limit (read-only).
pool.isDraining
Whether pool is in drain mode (read-only).
pool.isPaused
Whether pool is paused (read-only).
pool.metrics
Performance metrics object (read-only):
totalTasks: Total tasks processedsuccessfulTasks: Successful task countfailedTasks: Failed task countthroughput: Tasks per second (formatted string)errorRate: Failure rate (formatted string)percentiles: Latency percentilesp50: Median latency (ms)p90: 90th percentile (ms)p99: 99th percentile (ms)
Global Options
Configure pool behavior during initialization:
const pool = smartPool(5, {
// Queue Management
maxQueueSize: 10000,
// Adaptive Concurrency
adaptive: true,
minConcurrency: 2,
maxConcurrency: 20,
// Rate Limiting
rateLimits: {
api: { interval: 1000, tasksPerInterval: 10 },
database: { interval: 100, tasksPerInterval: 5 }
},
// Circuit Breaker
circuitThreshold: 5,
circuitResetTimeout: 30000,
// Batching
batchSize: 10,
batchTimeout: 100,
// Retry
retryCount: 3,
initialRetryDelay: 100,
retryFactor: 2,
maxRetryDelay: 10000,
// Priority Management
agingThreshold: 5,
agingBoost: 1,
decayThreshold: 10,
decayAmount: 1,
// Worker Threads
workerPoolSize: 4,
workerPathWhitelist: ['/app/workers/'],
// Maintenance
interval: 1000,
completedTaskCleanupMs: 60000,
maxLatencyHistory: 10000,
maxErrorHistory: 1000,
// Events
emitter: eventEmitter,
// Lifecycle Hooks
onEnqueue: (task) => console.log('Enqueued:', task.id),
onDequeue: (task) => console.log('Dequeued:', task.id),
beforeExecute: (task) => console.log('Executing:', task.id),
afterExecute: (task, profile) => {
console.log('Completed:', task.id, profile.duration, 'ms');
}
});Option Descriptions:
Queue Management:
maxQueueSize: Maximum number of queued tasks. Default: 10000
Adaptive Concurrency:
adaptive: Enable automatic concurrency adjustment. Default: falseminConcurrency: Minimum concurrent tasks. Default: 1maxConcurrency: Maximum concurrent tasks. Default: 2x initial concurrency
Rate Limiting:
rateLimits: Per-type rate limits using token bucketinterval: Time window in millisecondstasksPerInterval: Tasks allowed per interval
Circuit Breaker:
circuitThreshold: Consecutive failures to open circuit. Default: 5circuitResetTimeout: Time before retry after circuit opens (ms). Default: 30000
Batching:
batchSize: Tasks per batch. Default: 10batchTimeout: Max wait time before flushing partial batch (ms). Default: 100
Retry:
retryCount: Default retry attempts. Default: 0initialRetryDelay: Initial retry delay (ms). Default: 100retryFactor: Backoff multiplier. Default: 2maxRetryDelay: Maximum retry delay (ms). Default: 10000
Priority Management:
agingThreshold: Cycles before boosting low-priority tasks. Default: undefinedagingBoost: Priority increase amount. Default: 1decayThreshold: Cycles before decaying high-priority tasks. Default: undefineddecayAmount: Priority decrease amount. Default: 1
Worker Threads:
workerPoolSize: Maximum worker threads. Default: 0 (disabled)workerPathWhitelist: Allowed worker script paths
Maintenance:
interval: Maintenance cycle interval (ms). Default: 1000completedTaskCleanupMs: Time before cleaning completed task records (ms). Default: 60000maxLatencyHistory: Maximum latency samples to retain. Default: 10000maxErrorHistory: Maximum error samples to retain. Default: 1000
Events:
emitter: EventEmitter instance for pool events
Lifecycle Hooks:
onEnqueue: Called when task added to queueonDequeue: Called when task removed from queuebeforeExecute: Called before task executionafterExecute: Called after task execution with profile data
Tutorials
1. Basic Task Queue
Simple task queue with priority management:
import smartPool from 'smart-pool';
const pool = smartPool(3);
await pool(async () => {
console.log('Low priority task');
}, 1);
await pool(async () => {
console.log('High priority task');
}, 10);
await pool.onIdle();2. API Rate Limiting
Respect API rate limits with type-based limiting:
const pool = smartPool(10, {
rateLimits: {
github: { interval: 3600000, tasksPerInterval: 5000 },
twitter: { interval: 900000, tasksPerInterval: 300 }
}
});
async function fetchGithubUser(username) {
return pool(
async () => {
const res = await fetch(`https://api.github.com/users/${username}`);
return res.json();
},
{ type: 'github', priority: 5 }
);
}
async function fetchTweet(id) {
return pool(
async () => {
const res = await fetch(`https://api.twitter.com/tweets/${id}`);
return res.json();
},
{ type: 'twitter', priority: 3 }
);
}
const users = await Promise.all([
fetchGithubUser('alice'),
fetchGithubUser('bob'),
fetchGithubUser('charlie')
]);3. Task Batching
Batch database operations for efficiency:
const pool = smartPool(5, {
batchSize: 50,
batchTimeout: 100
});
async function insertUser(user) {
return pool(
async (batch) => {
const ids = await db.users.insertMany(batch.map(t => t.data));
return ids[batch.indexOf(user)];
},
{
batchKey: 'user-insert',
data: user
}
);
}
const users = Array.from({ length: 200 }, (_, i) => ({
name: `User ${i}`,
email: `user${i}@example.com`
}));
const ids = await Promise.all(users.map(insertUser));
console.log(`Inserted ${ids.length} users in batches`);4. Circuit Breaker
Protect external services from cascading failures:
const pool = smartPool(5, {
circuitThreshold: 3,
circuitResetTimeout: 30000,
retryCount: 2,
initialRetryDelay: 1000
});
async function callUnstableAPI(endpoint) {
return pool(
async () => {
const res = await fetch(endpoint);
if (!res.ok) throw new Error(`HTTP ${res.status}`);
return res.json();
},
{ type: 'unstable-api' }
);
}
try {
const data = await callUnstableAPI('https://api.example.com/data');
console.log(data);
} catch (err) {
console.error('API call failed:', err.message);
}5. Worker Threads
Offload CPU-intensive work to worker threads:
worker.js:
import { parentPort } from 'node:worker_threads';
parentPort.on('message', ({ data }) => {
const result = expensiveComputation(data);
parentPort.postMessage({ result });
});
function expensiveComputation(n) {
let sum = 0;
for (let i = 0; i < n; i++) {
sum += Math.sqrt(i);
}
return sum;
}main.js:
const pool = smartPool(5, {
workerPoolSize: 4,
workerPathWhitelist: ['/app/workers/']
});
const results = await pool.map(
[1000000, 2000000, 3000000],
async (n) => {
return pool(
async () => {},
{
worker: {
path: '/app/workers/worker.js',
data: n
}
}
);
}
);
console.log(results);6. Task Dependencies
Execute tasks after dependencies complete:
const pool = smartPool(10);
const userId = await pool(
async () => db.users.create({ name: 'Alice' }),
{ id: 'create-user' }
);
const profileId = await pool(
async () => db.profiles.create({ userId, bio: 'Developer' }),
{ id: 'create-profile', dependsOn: ['create-user'] }
);
await pool(
async () => sendWelcomeEmail(userId),
{ dependsOn: ['create-user', 'create-profile'] }
);7. Task Caching
Deduplicate identical pending requests:
const pool = smartPool(5);
async function fetchUserData(userId) {
return pool(
async () => {
console.log(`Fetching user ${userId}`);
const res = await fetch(`https://api.example.com/users/${userId}`);
return res.json();
},
{ cacheKey: `user-${userId}` }
);
}
const [user1, user2, user3] = await Promise.all([
fetchUserData(123),
fetchUserData(123),
fetchUserData(123)
]);
console.log('Only one request made');8. Adaptive Concurrency
Automatically adjust concurrency based on performance:
const pool = smartPool(5, {
adaptive: true,
minConcurrency: 2,
maxConcurrency: 20,
adaptiveLatencyLow: 50,
adaptiveLatencyHigh: 200
});
for (let i = 0; i < 1000; i++) {
pool(async () => {
await simulateWork();
});
}
setInterval(() => {
console.log(`Current concurrency: ${pool.concurrency}`);
console.log(`Active tasks: ${pool.activeCount}`);
console.log(`Pending tasks: ${pool.pendingCount}`);
}, 1000);
await pool.onIdle();9. Priority Aging
Prevent task starvation with automatic priority boosts:
const pool = smartPool(3, {
agingThreshold: 5,
agingBoost: 1,
interval: 1000
});
for (let i = 0; i < 100; i++) {
pool(
async () => {
console.log(`Task ${i}`);
await sleep(100);
},
{ priority: i < 10 ? 1 : 10 }
);
}
await pool.onIdle();10. Sub-queues
Isolate different workload types:
const pool = smartPool(10);
const criticalQueue = pool.useQueue('critical', 5);
const backgroundQueue = pool.useQueue('background', 2);
await criticalQueue(async () => {
await processPayment();
});
await backgroundQueue(async () => {
await generateReport();
});
console.log(`Critical active: ${criticalQueue.activeCount}`);
console.log(`Background active: ${backgroundQueue.activeCount}`);11. Timeout and Abort
Cancel tasks via timeout or AbortSignal:
const pool = smartPool(5);
const controller = new AbortController();
const timeoutTask = pool(
async () => {
await longRunningOperation();
},
{ timeout: 5000 }
);
const abortTask = pool(
async () => {
await anotherOperation();
},
{ signal: controller.signal }
);
setTimeout(() => controller.abort(), 2000);
try {
await Promise.all([timeoutTask, abortTask]);
} catch (err) {
console.error('Task cancelled:', err.message);
}12. Metrics and Monitoring
Track performance metrics:
const pool = smartPool(10);
for (let i = 0; i < 1000; i++) {
pool(async () => {
await simulateWork();
});
}
await pool.onIdle();
const metrics = pool.metrics;
console.log(`Total tasks: ${metrics.totalTasks}`);
console.log(`Success rate: ${((1 - parseFloat(metrics.errorRate)) * 100).toFixed(2)}%`);
console.log(`Throughput: ${metrics.throughput} tasks/sec`);
console.log(`Latency p50: ${metrics.percentiles.p50}ms`);
console.log(`Latency p90: ${metrics.percentiles.p90}ms`);
console.log(`Latency p99: ${metrics.percentiles.p99}ms`);13. Lifecycle Hooks
Monitor task execution:
const pool = smartPool(5, {
onEnqueue: (task) => {
console.log(`[ENQUEUE] ${task.id || 'anonymous'} (priority: ${task.priority})`);
},
onDequeue: (task) => {
console.log(`[DEQUEUE] ${task.id || 'anonymous'}`);
},
beforeExecute: (task) => {
console.log(`[EXECUTE] ${task.id || 'anonymous'}`);
},
afterExecute: (task, profile) => {
console.log(`[COMPLETE] ${task.id || 'anonymous'} in ${profile.duration}ms`);
if (profile.error) {
console.error(`[ERROR] ${profile.error}`);
}
}
});
await pool(async () => {
await performWork();
}, { id: 'my-task', priority: 10 });14. Weight-based Load
Track and limit load by task weight:
const pool = smartPool(100);
async function cpuIntensiveTask() {
return pool(
async () => {
return performComputation();
},
{ weight: 10 }
);
}
async function lightweightTask() {
return pool(
async () => {
return fetchData();
},
{ weight: 1 }
);
}
await Promise.all([
...Array(5).fill().map(cpuIntensiveTask),
...Array(50).fill().map(lightweightTask)
]);
console.log(`Current load: ${pool.currentLoad}`);15. Task Cancellation
Cancel tasks by ID, tag, or predicate:
const pool = smartPool(5);
for (let i = 0; i < 100; i++) {
pool(
async () => {
await processItem(i);
},
{
id: `task-${i}`,
tags: i % 2 === 0 ? ['even'] : ['odd'],
priority: i
}
).catch(err => {
if (err.message === 'Task cancelled via API') {
console.log(`Task ${i} was cancelled`);
}
});
}
pool.cancel({ tag: 'even' });
pool.cancel((task) => task.priority < 50);
await pool.onIdle();Events
When an emitter is provided, the pool emits these events:
circuit:open- Circuit breaker openedcircuit:closed- Circuit breaker closedconcurrency:adjust- Adaptive concurrency changedtask:retry- Task retry attempttask:timeout- Task timeoutbatch:flush- Batch flushed
import { EventEmitter } from 'events';
const emitter = new EventEmitter();
const pool = smartPool(5, { emitter });
emitter.on('circuit:open', ({ type }) => {
console.log(`Circuit opened for ${type}`);
});
emitter.on('concurrency:adjust', ({ concurrency, reason }) => {
console.log(`Concurrency adjusted to ${concurrency}: ${reason}`);
});
emitter.on('task:retry', ({ id, attempt, delay }) => {
console.log(`Retrying task ${id} (attempt ${attempt}) after ${delay}ms`);
});Best Practices
1. Choose Appropriate Concurrency
Start conservative and adjust based on metrics:
const pool = smartPool(5, {
adaptive: true,
minConcurrency: 2,
maxConcurrency: 20
});2. Use Type-based Rate Limiting
Respect external API limits:
const pool = smartPool(10, {
rateLimits: {
'api-provider': { interval: 60000, tasksPerInterval: 100 }
}
});3. Implement Circuit Breakers
Protect against cascading failures:
const pool = smartPool(5, {
circuitThreshold: 5,
circuitResetTimeout: 30000,
retryCount: 3
});4. Batch Similar Operations
Reduce overhead for bulk operations:
const pool = smartPool(5, {
batchSize: 100,
batchTimeout: 50
});5. Use Sub-queues for Isolation
Separate critical and background work:
const critical = pool.useQueue('critical', 10);
const background = pool.useQueue('background', 2);6. Monitor Metrics
Track performance and adjust configuration:
setInterval(() => {
const { throughput, errorRate, percentiles } = pool.metrics;
console.log({ throughput, errorRate, p99: percentiles.p99 });
}, 5000);7. Handle Errors Gracefully
Always catch and handle task errors:
try {
await pool(async () => riskyOperation());
} catch (err) {
console.error('Task failed:', err);
}8. Clean Up Resources
Always drain or clear the pool on shutdown:
process.on('SIGTERM', async () => {
await pool.drain();
process.exit(0);
});Performance Tips
- Batch when possible: Use
batchKeyfor operations that can be grouped - Enable adaptive mode: Let the pool optimize concurrency automatically
- Use worker threads: Offload CPU-intensive tasks to avoid blocking
- Cache duplicate requests: Use
cacheKeyto deduplicate pending tasks - Set appropriate priorities: High-priority tasks execute first
- Monitor metrics: Use percentiles to identify bottlenecks
- Tune rate limits: Match external service limits
- Use sub-queues: Isolate different workload types
License
MIT
