npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

accumulating-processor

v1.0.0

Published

batch processor that accumulates items by count, delay, or size

Readme

accumulating-processor

A JavaScript batch processor that supports accumulating items by count, delay, or size.

Installation

In Node:

npm install accumulating-processor
import { AccumulatingProcessor } from "accumulating-processor";

In Deno:

import { AccumulatingProcessor } from "https://raw.githubusercontent.com/dstelljes/accumulating-processor/1.0.0/mod.ts";

or

import { AccumulatingProcessor } from "npm:accumulating-processor";

Usage

AccumulatingProcessor is instantiated with a function that will be invoked whenever there is a batch ready to be processed:

const processor = new AccumulatingProcessor((entities) =>
  repo.process(entities)
);

The process method returns a Promise that will resolve when the item's batch is processed or reject when the batch fails:

// adds the item to a batch and blocks until the batch is processed:
await processor.process({ id: "jpqlbd" });

If no threshold options are specified, the processor will accumulate items until release is called:

const promises = [
  processor.process({ id: "apbker" }),
  processor.process({ id: "mzlexi" }),
];

processor.release();

// blocks until the batch is processed:
const [first, second] = await Promise.all(promises);

The flush method can be used to ensure that all pending items are processed:

processor.process({ id: "hhqpro" });
processor.process({ id: "pnojwe" });
processor.release();
processor.process({ id: "mbypsd" });

// releases a batch containing the third item and blocks until all three items
// are processed:
await processor.flush();

Count thresholds

Use the count.max option to specify the maximum number of items that may be included in a batch:

const processor = new AccumulatingProcessor(
  (entities) => repo.process(entities),
  {
    count: { max: 3 },
  },
);

const promises = [
  processor.process({ id: "bliauf" }),
  processor.process({ id: "etbkte" }),
  processor.process({ id: "hpgnou" }),
];

// blocks until the batch is processed:
const [first, second, third] = await Promise.all(promises);

Delay thresholds

Use the delay.max option to specify the maximum amount of time that a batch may accumulate:

const processor = new AccumulatingProcessor(
  (entities) => repo.process(entities),
  {
    delay: { max: 1000 },
  },
);

const start = Date.now();
await processor.process({ id: "spjlwr" });
const end = Date.now();

// end - start === 1000 + processing time

Size thresholds

Use the size.max and size.calculate options to specify the maximum total size of a batch:

const processor = new AccumulatingProcessor(
  (messages) => bus.produce(messages),
  {
    size: {
      max: 1024,
      calculate: ({ key, value }) => key.byteLength + value.byteLength,
    },
  },
);

processor.process({
  key: Buffer.from("albpre"),
  value: Buffer.alloc(512),
});

processor.process({
  key: Buffer.from("albpre"),
  value: Buffer.alloc(256),
});

processor.process({
  key: Buffer.from("mspvjj"),
  value: Buffer.alloc(768),
});

// releases two batches, one with the first and second items and one with the
// third item:
await processor.flush();

By default, the processor will reject any item whose size exceeds size.max:

processor.process({
  key: Buffer.from("ghiphr"),
  value: Buffer.alloc(1024),
});

// Error: item has size 1030, greater than 1024 allowed

To allow individual items to exceeed size.max, size.strict may be set to false:

const processor = new AccumulatingProcessor(
  (messages) => bus.produce(messages),
  {
    size: {
      max: 1024,
      calculate: ({ key, value }) => key.byteLength + value.byteLength,
      strict: false,
    },
  },
);

processor.process({
  key: Buffer.from("bnaser"),
  value: Buffer.alloc(512),
});

processor.process({
  key: Buffer.from("nksdfd"),
  value: Buffer.alloc(1024),
});

// releases two batches, one with the first item (total calculated size 518)
// and one with the second item (total calculated size 1030):
await processor.flush();

Per-item results

A processing function may return a Dataloader-style array of values or Error instances that will be mapped back to individual items by index:

function reciprocate(n) {
  if (n === 0) {
    return Error("divide by zero");
  }

  return 1 / n;
}

const processor = new AccumulatingProcessor((numbers) =>
  numbers.map(reciprocate)
);

const two = processor.process(2);
const one = processor.process(1);
const zero = processor.process(0);
processor.release();

await two; // 0.5
await one; // 1
await zero; // throws Error: divide by zero