npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

@convex-dev/batch-worker

v0.2.0

Published

A batch worker component for Convex.

Downloads

237

Readme

Batch Worker

npm version

Run a single background "main loop" over work you insert into your own table — with scheduling, debouncing, and recovery built in.

You bring two functions:

  • A work query that returns the next batch of work, or explicitly go idle.
  • A worker mutation that processes that batch.

After inserting work, call ping(...). The component takes care of the rest:

  • Runs exactly one loop at a time per named Worker.
  • Supports debouncing bursts so they batch together.
  • Keeps the loop "warm" with a short polling cooldown so a trickle of new work does not thrash the running status.
  • uses snapshot reads while draining so concurrent inserts don't cause OCC retries, and confirms with a real read before going idle so nothing is lost,
  • goes idle when the queue drains, and restarts automatically the next time you ping,
  • monitors the loop and restarts it if it ever dies (e.g. an unexpected error), logging the failure so you can alert on it.

This is the pattern behind components like Workpool — extracted so you can build your own "process a queue" components on top of it.

Found a bug? Feature request? File it here.

Installation

Create a convex.config.ts file in your app's convex/ folder and install the component by calling use:

// convex/convex.config.ts
import { defineApp } from "convex/server";
import batchWorker from "@convex-dev/batch-worker/convex.config.js";

const app = defineApp();
app.use(batchWorker, { env: { LOG_LEVEL: "REPORT" } });

export default app;

Usage

Insert work into your own table, then call ping. Provide a query (typed with vBatchQueryArgs / vBatchResult) that returns the next batch or idle, and a mutation that processes it. The query's batch shape must match the mutation's args.

import { v } from "convex/values";
import { ping, vBatchQueryArgs, vBatchResult } from "@convex-dev/batch-worker";
import { components, internal } from "./_generated/api";
import { internalMutation, internalQuery, mutation } from "./_generated/server";

const BATCH_SIZE = 10;

// Insert work, then make sure the loop is running.
export const addEvent = mutation({
  args: { value: v.number() },
  handler: async (ctx, { value }) => {
    await ctx.db.insert("events", { value });
    await ping(ctx, components.batchWorker, {
      name: "events", // distinct names give you independent queues
      workQuery: internal.example.getBatch,
      workerMutation: internal.example.processBatch,
    });
  },
});

// Return the next batch of work, or `idle` when there's nothing to do.
export const getBatch = internalQuery({
  args: vBatchQueryArgs, // { name } — lets one query serve multiple queues
  returns: vBatchResult(v.object({ ids: v.array(v.id("events")) })),
  handler: async (ctx) => {
    const events = await ctx.db.query("events").take(BATCH_SIZE);
    if (events.length === 0) {
      return { kind: "idle" as const };
      // Or, if you know when the next item is due:
      // return { kind: "idle" as const, timeoutMs: 30_000 };
    }
    return { kind: "work" as const, batch: { ids: events.map((e) => e._id) } };
  },
});

// Process one batch. The worker owns cleanup — delete what you process!
export const processBatch = internalMutation({
  args: { ids: v.array(v.id("events")) },
  handler: async (ctx, { ids }) => {
    // ... do the work (sum, call an API, schedule downstream jobs, etc.) ...
    for (const id of ids) {
      await ctx.db.delete("events", id);
    }
    // Returning nothing re-runs immediately to drain the rest.
  },
});

The component does not clean up your work for you — your worker mutation is responsible for deleting (or marking complete / advancing past) the rows it processed, otherwise the next query will return them again.

Steering the loop dynamically

Your worker mutation may return { debounceMs } to throttle the loop:

return {
  // Don't run again — and ignore pings — for at least this long (debounce).
  debounceMs: 30_000,
};

Similarly, when there's no work your query can return { kind: "idle", timeoutMs } to ensure it wakes up after some time even if ping is not called. A ping still wakes it immediately.

return {
  kind: "idle",
  // Keep polling this long before transitioning to idle.
  cooldownMs: 10_000,
  // How often to poll while cooling down.
  pollIntervalMs: 250,
  // After cooling down, wake again after at most this long even if no ping
  // arrives. Measured from this query response, so re-run it each query if you
  // want it to track a fixed deadline. A ping still wakes it sooner.
  timeoutMs: 60_000,
};

Multiple queues

Give each queue a distinct name. The name is passed to your query as args.name, so one query/mutation pair can serve many queues:

await ping(ctx, components.batchWorker, {
  name: "emails",
  workQuery: internal.email.getBatch,
  workerMutation: internal.email.send,
});

Configuration

Pass config to ping (it's stored on the worker and refreshed when it changes):

await ping(ctx, components.batchWorker, {
  name: "events",
  workQuery: internal.example.getBatch,
  workerMutation: internal.example.processBatch,
  config: {
    debounceMs: 100, // wait before the first batch so a burst accumulates
    // Schedule the liveness monitor this long after the loop's next run.
    // Default 1 minute, minimum 10 seconds. Also the retry cadence if your
    // work query or worker mutation throws (the loop dies; the monitor restarts
    // it).
    monitorLagMs: 15_000,
  },
});

Log level is set via the component's LOG_LEVEL env var (see Installation).

Stopping & resuming

stop halts processing entirely: the loop stops and ping is ignored, so no new work is picked up. start resumes it (reusing the last pinged query/mutation). Call them on the component:

await ctx.runMutation(components.batchWorker.lib.stop, { name: "events" });
// ...later, when you want it processing again:
await ctx.runMutation(components.batchWorker.lib.start, { name: "events" });

status reports the run state, including whether the worker is stopped.

ping vs start

  • ping creates the worker on first call and resumes it when it's idle. It's a no-op while the loop is running or stopped.
  • start resumes a stopped worker, and only start will — ping won't.

See the full working example in example.ts.

Development

Run the example app with a file watcher that rebuilds the component:

npm i
npm run dev

Run npm run dev:frontend to interact with it through a Vite app.

How it works

| Table | Written by | Read by | | ------------- | ----------------------------------- | ----------------------- | | workers | ping/start/loop (transitions) | ping/start, monitor | | workerState | loop (every iteration) | loop, monitor |

The high-churn loop state lives in workerState (generation, heartbeat, the scheduled runner, and the monitor), separate from the rarely-written workers doc (which holds the handles, config, and run-status: idle / running / stopped, plus a pointer to its workerState). That lets ping/start — which you call on every insert — read workers and return without conflicting (OCC) with the fast-looping loop. A monotonic generation (in workerState) guarantees only one loop chain runs at a time: a superseded loop sees a mismatched generation and exits. workerState is looked up by id and re-created if it's ever missing. The liveness monitor is scheduled ~monitorLagMs after the loop's next run and pushed back as the loop keeps running, so it only fires (and restarts the loop) if the loop actually died.