npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

smart-pool

v1.0.0

Published

High-performance priority promise pool with concurrency control

Readme

Smart Pool CI

A high-performance, feature-rich task queue and concurrency management library for Node.js. Built for production workloads requiring advanced scheduling, priority management, batching, rate limiting, circuit breaking, worker thread support, and adaptive concurrency.

Features

  • Priority Queue: Binary max-heap with dynamic priority adjustments
  • Concurrency Control: Fixed or adaptive concurrency limits
  • Rate Limiting: Per-type rate limits with token bucket algorithm
  • Circuit Breakers: Automatic failure detection and recovery
  • Task Batching: Group similar tasks for efficient processing
  • Worker Threads: Offload CPU-intensive tasks to worker threads
  • Task Dependencies: Execute tasks only after dependencies complete
  • Caching: Deduplicate identical pending tasks
  • Retry Logic: Exponential backoff with configurable limits
  • Abort Support: Cancel tasks via AbortSignal
  • Priority Aging: Prevent starvation with automatic priority boosts
  • Priority Decay: Reduce priority of stale high-priority tasks
  • Metrics: Real-time performance tracking with percentiles
  • Lifecycle Hooks: Execute code at key points in task execution
  • Sub-queues: Isolated queues with independent concurrency limits
  • Weight-based Load: Track and limit load by task weight

Installation

npm install smart-pool

Quick Start

import smartPool from 'smart-pool';

const pool = smartPool(5);

const result = await pool(async () => {
  return 'Task completed';
});

console.log(result);

API Reference

smartPool(concurrency, options)

Creates a new task pool instance.

Parameters:

  • concurrency (number): Maximum number of concurrent tasks (minimum 1)
  • options (object, optional): Global configuration options

Returns: PoolInstance

Pool Instance

Methods

pool(task, options)

Enqueue and execute a task.

Parameters:

  • task (function): Async function to execute
  • options (number | object): Priority (number) or task options (object)

Task Options:

  • priority (number): Task priority (higher = executed sooner). Default: 0
  • weight (number): Task weight for load tracking. Default: 1
  • type (string): Task type for rate limiting and circuit breaking
  • cacheKey (string): Deduplicate identical pending tasks
  • batchKey (string): Group tasks for batch processing
  • id (string | number): Unique task identifier
  • tags (string[]): Tags for filtering/cancellation
  • metadata (object): Custom metadata
  • dependsOn (array): Task IDs that must complete first
  • deadline (number): Unix timestamp when task expires
  • signal (AbortSignal): Abort signal for cancellation
  • timeout (number): Task timeout in milliseconds
  • retryCount (number): Maximum retry attempts
  • retryDelay (number): Initial retry delay in milliseconds
  • worker (object): Worker thread configuration
    • path (string): Path to worker script
    • data (any): Data to pass to worker

Returns: Promise resolving to task result

Example:

const result = await pool(
  async () => fetchData(),
  {
    priority: 10,
    type: 'api',
    retryCount: 3,
    timeout: 5000
  }
);
pool.map(items, fn, options)

Map function over array items using the pool.

Parameters:

  • items (array): Items to process
  • fn (function): Async function to apply to each item
  • options (number | object): Priority or task options
    • throwOnError (boolean): Throw on first error. Default: true

Returns: Promise with results

Example:

const results = await pool.map(
  [1, 2, 3, 4, 5],
  async (n) => n * 2,
  { priority: 5 }
);
pool.pause()

Pause task execution. Queued tasks remain in queue.

Example:

pool.pause();
console.log(pool.isPaused);
pool.resume()

Resume task execution after pause.

Example:

pool.resume();
pool.cancel(query)

Cancel pending tasks matching criteria.

Parameters:

  • query (object | function):
    • Object: { id, tag } to match tasks
    • Function: (task) => boolean predicate

Returns: Number of cancelled tasks

Example:

await pool(async () => work(), { id: 'task-1', tags: ['batch-1'] });
await pool(async () => work(), { id: 'task-2', tags: ['batch-1'] });

pool.cancel({ tag: 'batch-1' });

pool.cancel((task) => task.priority < 5);
pool.onIdle()

Wait for all tasks to complete, including batched and blocked tasks.

Returns: Promise<{ errors, failed, metrics }>

Example:

const { errors, failed, metrics } = await pool.onIdle();
console.log(`Completed with ${errors.length} errors`);
pool.drain()

Stop accepting new tasks and wait for completion. Equivalent to setting drain mode + onIdle.

Returns: Promise<{ errors, failed, metrics }>

Example:

await pool.drain();
console.log('All tasks completed, pool drained');
pool.clear()

Cancel all pending tasks and terminate all workers. Resets pool state.

Returns: Promise

Example:

await pool.clear();
console.log('Pool cleared');
pool.setConcurrency(limit)

Dynamically adjust concurrency limit.

Parameters:

  • limit (number): New concurrency limit

Example:

pool.setConcurrency(10);
console.log(pool.concurrency);
pool.peek()

View the next task to be executed without removing it.

Returns: Task object or null

Example:

const nextTask = pool.peek();
console.log(nextTask?.priority);
pool.remove(predicate)

Remove tasks from queue matching predicate.

Parameters:

  • predicate (function): (task) => boolean

Returns: Boolean indicating if any tasks were removed

Example:

pool.remove((task) => task.priority < 3);
pool.useQueue(name, concurrency)

Create or get an isolated sub-queue with independent concurrency control.

Parameters:

  • name (string): Sub-queue identifier
  • concurrency (number, optional): Sub-queue concurrency. Default: parent concurrency

Returns: PoolInstance for the sub-queue

Example:

const apiQueue = pool.useQueue('api', 3);
const dbQueue = pool.useQueue('database', 5);

await apiQueue(async () => fetchAPI());
await dbQueue(async () => queryDB());
pool.getWorkerHealth()

Get health status of all worker threads.

Returns: Array<{ path, busy, active }>

Example:

const health = pool.getWorkerHealth();
health.forEach(w => {
  console.log(`Worker ${w.path}: ${w.busy ? 'busy' : 'idle'}`);
});

Properties

pool.activeCount

Number of currently executing tasks (read-only).

pool.pendingCount

Number of tasks waiting in queue, batches, or blocked by dependencies (read-only).

pool.currentLoad

Current total weight of active tasks (read-only).

pool.concurrency

Current concurrency limit (read-only).

pool.isDraining

Whether pool is in drain mode (read-only).

pool.isPaused

Whether pool is paused (read-only).

pool.metrics

Performance metrics object (read-only):

  • totalTasks: Total tasks processed
  • successfulTasks: Successful task count
  • failedTasks: Failed task count
  • throughput: Tasks per second (formatted string)
  • errorRate: Failure rate (formatted string)
  • percentiles: Latency percentiles
    • p50: Median latency (ms)
    • p90: 90th percentile (ms)
    • p99: 99th percentile (ms)

Global Options

Configure pool behavior during initialization:

const pool = smartPool(5, {
  // Queue Management
  maxQueueSize: 10000,
  
  // Adaptive Concurrency
  adaptive: true,
  minConcurrency: 2,
  maxConcurrency: 20,
  
  // Rate Limiting
  rateLimits: {
    api: { interval: 1000, tasksPerInterval: 10 },
    database: { interval: 100, tasksPerInterval: 5 }
  },
  
  // Circuit Breaker
  circuitThreshold: 5,
  circuitResetTimeout: 30000,
  
  // Batching
  batchSize: 10,
  batchTimeout: 100,
  
  // Retry
  retryCount: 3,
  initialRetryDelay: 100,
  retryFactor: 2,
  maxRetryDelay: 10000,
  
  // Priority Management
  agingThreshold: 5,
  agingBoost: 1,
  decayThreshold: 10,
  decayAmount: 1,
  
  // Worker Threads
  workerPoolSize: 4,
  workerPathWhitelist: ['/app/workers/'],
  
  // Maintenance
  interval: 1000,
  completedTaskCleanupMs: 60000,
  maxLatencyHistory: 10000,
  maxErrorHistory: 1000,
  
  // Events
  emitter: eventEmitter,
  
  // Lifecycle Hooks
  onEnqueue: (task) => console.log('Enqueued:', task.id),
  onDequeue: (task) => console.log('Dequeued:', task.id),
  beforeExecute: (task) => console.log('Executing:', task.id),
  afterExecute: (task, profile) => {
    console.log('Completed:', task.id, profile.duration, 'ms');
  }
});

Option Descriptions:

Queue Management:

  • maxQueueSize: Maximum number of queued tasks. Default: 10000

Adaptive Concurrency:

  • adaptive: Enable automatic concurrency adjustment. Default: false
  • minConcurrency: Minimum concurrent tasks. Default: 1
  • maxConcurrency: Maximum concurrent tasks. Default: 2x initial concurrency

Rate Limiting:

  • rateLimits: Per-type rate limits using token bucket
    • interval: Time window in milliseconds
    • tasksPerInterval: Tasks allowed per interval

Circuit Breaker:

  • circuitThreshold: Consecutive failures to open circuit. Default: 5
  • circuitResetTimeout: Time before retry after circuit opens (ms). Default: 30000

Batching:

  • batchSize: Tasks per batch. Default: 10
  • batchTimeout: Max wait time before flushing partial batch (ms). Default: 100

Retry:

  • retryCount: Default retry attempts. Default: 0
  • initialRetryDelay: Initial retry delay (ms). Default: 100
  • retryFactor: Backoff multiplier. Default: 2
  • maxRetryDelay: Maximum retry delay (ms). Default: 10000

Priority Management:

  • agingThreshold: Cycles before boosting low-priority tasks. Default: undefined
  • agingBoost: Priority increase amount. Default: 1
  • decayThreshold: Cycles before decaying high-priority tasks. Default: undefined
  • decayAmount: Priority decrease amount. Default: 1

Worker Threads:

  • workerPoolSize: Maximum worker threads. Default: 0 (disabled)
  • workerPathWhitelist: Allowed worker script paths

Maintenance:

  • interval: Maintenance cycle interval (ms). Default: 1000
  • completedTaskCleanupMs: Time before cleaning completed task records (ms). Default: 60000
  • maxLatencyHistory: Maximum latency samples to retain. Default: 10000
  • maxErrorHistory: Maximum error samples to retain. Default: 1000

Events:

  • emitter: EventEmitter instance for pool events

Lifecycle Hooks:

  • onEnqueue: Called when task added to queue
  • onDequeue: Called when task removed from queue
  • beforeExecute: Called before task execution
  • afterExecute: Called after task execution with profile data

Tutorials

1. Basic Task Queue

Simple task queue with priority management:

import smartPool from 'smart-pool';

const pool = smartPool(3);

await pool(async () => {
  console.log('Low priority task');
}, 1);

await pool(async () => {
  console.log('High priority task');
}, 10);

await pool.onIdle();

2. API Rate Limiting

Respect API rate limits with type-based limiting:

const pool = smartPool(10, {
  rateLimits: {
    github: { interval: 3600000, tasksPerInterval: 5000 },
    twitter: { interval: 900000, tasksPerInterval: 300 }
  }
});

async function fetchGithubUser(username) {
  return pool(
    async () => {
      const res = await fetch(`https://api.github.com/users/${username}`);
      return res.json();
    },
    { type: 'github', priority: 5 }
  );
}

async function fetchTweet(id) {
  return pool(
    async () => {
      const res = await fetch(`https://api.twitter.com/tweets/${id}`);
      return res.json();
    },
    { type: 'twitter', priority: 3 }
  );
}

const users = await Promise.all([
  fetchGithubUser('alice'),
  fetchGithubUser('bob'),
  fetchGithubUser('charlie')
]);

3. Task Batching

Batch database operations for efficiency:

const pool = smartPool(5, {
  batchSize: 50,
  batchTimeout: 100
});

async function insertUser(user) {
  return pool(
    async (batch) => {
      const ids = await db.users.insertMany(batch.map(t => t.data));
      return ids[batch.indexOf(user)];
    },
    {
      batchKey: 'user-insert',
      data: user
    }
  );
}

const users = Array.from({ length: 200 }, (_, i) => ({
  name: `User ${i}`,
  email: `user${i}@example.com`
}));

const ids = await Promise.all(users.map(insertUser));
console.log(`Inserted ${ids.length} users in batches`);

4. Circuit Breaker

Protect external services from cascading failures:

const pool = smartPool(5, {
  circuitThreshold: 3,
  circuitResetTimeout: 30000,
  retryCount: 2,
  initialRetryDelay: 1000
});

async function callUnstableAPI(endpoint) {
  return pool(
    async () => {
      const res = await fetch(endpoint);
      if (!res.ok) throw new Error(`HTTP ${res.status}`);
      return res.json();
    },
    { type: 'unstable-api' }
  );
}

try {
  const data = await callUnstableAPI('https://api.example.com/data');
  console.log(data);
} catch (err) {
  console.error('API call failed:', err.message);
}

5. Worker Threads

Offload CPU-intensive work to worker threads:

worker.js:

import { parentPort } from 'node:worker_threads';

parentPort.on('message', ({ data }) => {
  const result = expensiveComputation(data);
  parentPort.postMessage({ result });
});

function expensiveComputation(n) {
  let sum = 0;
  for (let i = 0; i < n; i++) {
    sum += Math.sqrt(i);
  }
  return sum;
}

main.js:

const pool = smartPool(5, {
  workerPoolSize: 4,
  workerPathWhitelist: ['/app/workers/']
});

const results = await pool.map(
  [1000000, 2000000, 3000000],
  async (n) => {
    return pool(
      async () => {},
      {
        worker: {
          path: '/app/workers/worker.js',
          data: n
        }
      }
    );
  }
);

console.log(results);

6. Task Dependencies

Execute tasks after dependencies complete:

const pool = smartPool(10);

const userId = await pool(
  async () => db.users.create({ name: 'Alice' }),
  { id: 'create-user' }
);

const profileId = await pool(
  async () => db.profiles.create({ userId, bio: 'Developer' }),
  { id: 'create-profile', dependsOn: ['create-user'] }
);

await pool(
  async () => sendWelcomeEmail(userId),
  { dependsOn: ['create-user', 'create-profile'] }
);

7. Task Caching

Deduplicate identical pending requests:

const pool = smartPool(5);

async function fetchUserData(userId) {
  return pool(
    async () => {
      console.log(`Fetching user ${userId}`);
      const res = await fetch(`https://api.example.com/users/${userId}`);
      return res.json();
    },
    { cacheKey: `user-${userId}` }
  );
}

const [user1, user2, user3] = await Promise.all([
  fetchUserData(123),
  fetchUserData(123),
  fetchUserData(123)
]);

console.log('Only one request made');

8. Adaptive Concurrency

Automatically adjust concurrency based on performance:

const pool = smartPool(5, {
  adaptive: true,
  minConcurrency: 2,
  maxConcurrency: 20,
  adaptiveLatencyLow: 50,
  adaptiveLatencyHigh: 200
});

for (let i = 0; i < 1000; i++) {
  pool(async () => {
    await simulateWork();
  });
}

setInterval(() => {
  console.log(`Current concurrency: ${pool.concurrency}`);
  console.log(`Active tasks: ${pool.activeCount}`);
  console.log(`Pending tasks: ${pool.pendingCount}`);
}, 1000);

await pool.onIdle();

9. Priority Aging

Prevent task starvation with automatic priority boosts:

const pool = smartPool(3, {
  agingThreshold: 5,
  agingBoost: 1,
  interval: 1000
});

for (let i = 0; i < 100; i++) {
  pool(
    async () => {
      console.log(`Task ${i}`);
      await sleep(100);
    },
    { priority: i < 10 ? 1 : 10 }
  );
}

await pool.onIdle();

10. Sub-queues

Isolate different workload types:

const pool = smartPool(10);

const criticalQueue = pool.useQueue('critical', 5);
const backgroundQueue = pool.useQueue('background', 2);

await criticalQueue(async () => {
  await processPayment();
});

await backgroundQueue(async () => {
  await generateReport();
});

console.log(`Critical active: ${criticalQueue.activeCount}`);
console.log(`Background active: ${backgroundQueue.activeCount}`);

11. Timeout and Abort

Cancel tasks via timeout or AbortSignal:

const pool = smartPool(5);

const controller = new AbortController();

const timeoutTask = pool(
  async () => {
    await longRunningOperation();
  },
  { timeout: 5000 }
);

const abortTask = pool(
  async () => {
    await anotherOperation();
  },
  { signal: controller.signal }
);

setTimeout(() => controller.abort(), 2000);

try {
  await Promise.all([timeoutTask, abortTask]);
} catch (err) {
  console.error('Task cancelled:', err.message);
}

12. Metrics and Monitoring

Track performance metrics:

const pool = smartPool(10);

for (let i = 0; i < 1000; i++) {
  pool(async () => {
    await simulateWork();
  });
}

await pool.onIdle();

const metrics = pool.metrics;
console.log(`Total tasks: ${metrics.totalTasks}`);
console.log(`Success rate: ${((1 - parseFloat(metrics.errorRate)) * 100).toFixed(2)}%`);
console.log(`Throughput: ${metrics.throughput} tasks/sec`);
console.log(`Latency p50: ${metrics.percentiles.p50}ms`);
console.log(`Latency p90: ${metrics.percentiles.p90}ms`);
console.log(`Latency p99: ${metrics.percentiles.p99}ms`);

13. Lifecycle Hooks

Monitor task execution:

const pool = smartPool(5, {
  onEnqueue: (task) => {
    console.log(`[ENQUEUE] ${task.id || 'anonymous'} (priority: ${task.priority})`);
  },
  onDequeue: (task) => {
    console.log(`[DEQUEUE] ${task.id || 'anonymous'}`);
  },
  beforeExecute: (task) => {
    console.log(`[EXECUTE] ${task.id || 'anonymous'}`);
  },
  afterExecute: (task, profile) => {
    console.log(`[COMPLETE] ${task.id || 'anonymous'} in ${profile.duration}ms`);
    if (profile.error) {
      console.error(`[ERROR] ${profile.error}`);
    }
  }
});

await pool(async () => {
  await performWork();
}, { id: 'my-task', priority: 10 });

14. Weight-based Load

Track and limit load by task weight:

const pool = smartPool(100);

async function cpuIntensiveTask() {
  return pool(
    async () => {
      return performComputation();
    },
    { weight: 10 }
  );
}

async function lightweightTask() {
  return pool(
    async () => {
      return fetchData();
    },
    { weight: 1 }
  );
}

await Promise.all([
  ...Array(5).fill().map(cpuIntensiveTask),
  ...Array(50).fill().map(lightweightTask)
]);

console.log(`Current load: ${pool.currentLoad}`);

15. Task Cancellation

Cancel tasks by ID, tag, or predicate:

const pool = smartPool(5);

for (let i = 0; i < 100; i++) {
  pool(
    async () => {
      await processItem(i);
    },
    {
      id: `task-${i}`,
      tags: i % 2 === 0 ? ['even'] : ['odd'],
      priority: i
    }
  ).catch(err => {
    if (err.message === 'Task cancelled via API') {
      console.log(`Task ${i} was cancelled`);
    }
  });
}

pool.cancel({ tag: 'even' });

pool.cancel((task) => task.priority < 50);

await pool.onIdle();

Events

When an emitter is provided, the pool emits these events:

  • circuit:open - Circuit breaker opened
  • circuit:closed - Circuit breaker closed
  • concurrency:adjust - Adaptive concurrency changed
  • task:retry - Task retry attempt
  • task:timeout - Task timeout
  • batch:flush - Batch flushed
import { EventEmitter } from 'events';

const emitter = new EventEmitter();
const pool = smartPool(5, { emitter });

emitter.on('circuit:open', ({ type }) => {
  console.log(`Circuit opened for ${type}`);
});

emitter.on('concurrency:adjust', ({ concurrency, reason }) => {
  console.log(`Concurrency adjusted to ${concurrency}: ${reason}`);
});

emitter.on('task:retry', ({ id, attempt, delay }) => {
  console.log(`Retrying task ${id} (attempt ${attempt}) after ${delay}ms`);
});

Best Practices

1. Choose Appropriate Concurrency

Start conservative and adjust based on metrics:

const pool = smartPool(5, {
  adaptive: true,
  minConcurrency: 2,
  maxConcurrency: 20
});

2. Use Type-based Rate Limiting

Respect external API limits:

const pool = smartPool(10, {
  rateLimits: {
    'api-provider': { interval: 60000, tasksPerInterval: 100 }
  }
});

3. Implement Circuit Breakers

Protect against cascading failures:

const pool = smartPool(5, {
  circuitThreshold: 5,
  circuitResetTimeout: 30000,
  retryCount: 3
});

4. Batch Similar Operations

Reduce overhead for bulk operations:

const pool = smartPool(5, {
  batchSize: 100,
  batchTimeout: 50
});

5. Use Sub-queues for Isolation

Separate critical and background work:

const critical = pool.useQueue('critical', 10);
const background = pool.useQueue('background', 2);

6. Monitor Metrics

Track performance and adjust configuration:

setInterval(() => {
  const { throughput, errorRate, percentiles } = pool.metrics;
  console.log({ throughput, errorRate, p99: percentiles.p99 });
}, 5000);

7. Handle Errors Gracefully

Always catch and handle task errors:

try {
  await pool(async () => riskyOperation());
} catch (err) {
  console.error('Task failed:', err);
}

8. Clean Up Resources

Always drain or clear the pool on shutdown:

process.on('SIGTERM', async () => {
  await pool.drain();
  process.exit(0);
});

Performance Tips

  1. Batch when possible: Use batchKey for operations that can be grouped
  2. Enable adaptive mode: Let the pool optimize concurrency automatically
  3. Use worker threads: Offload CPU-intensive tasks to avoid blocking
  4. Cache duplicate requests: Use cacheKey to deduplicate pending tasks
  5. Set appropriate priorities: High-priority tasks execute first
  6. Monitor metrics: Use percentiles to identify bottlenecks
  7. Tune rate limits: Match external service limits
  8. Use sub-queues: Isolate different workload types

License

MIT