@convex-dev/batch-worker
v0.2.0
Published
A batch worker component for Convex.
Downloads
237
Readme
Batch Worker
Run a single background "main loop" over work you insert into your own table — with scheduling, debouncing, and recovery built in.
You bring two functions:
- A work query that returns the next batch of work, or explicitly go idle.
- A worker mutation that processes that batch.
After inserting work, call ping(...). The component takes care of the rest:
- Runs exactly one loop at a time per named Worker.
- Supports debouncing bursts so they batch together.
- Keeps the loop "warm" with a short polling cooldown so a trickle of new work does not thrash the running status.
- uses snapshot reads while draining so concurrent inserts don't cause OCC retries, and confirms with a real read before going idle so nothing is lost,
- goes idle when the queue drains, and restarts automatically the next time you ping,
- monitors the loop and restarts it if it ever dies (e.g. an unexpected error), logging the failure so you can alert on it.
This is the pattern behind components like Workpool — extracted so you can build your own "process a queue" components on top of it.
Found a bug? Feature request? File it here.
Installation
Create a convex.config.ts file in your app's convex/ folder and install the
component by calling use:
// convex/convex.config.ts
import { defineApp } from "convex/server";
import batchWorker from "@convex-dev/batch-worker/convex.config.js";
const app = defineApp();
app.use(batchWorker, { env: { LOG_LEVEL: "REPORT" } });
export default app;Usage
Insert work into your own table, then call ping. Provide a query (typed with
vBatchQueryArgs / vBatchResult) that returns the next batch or idle, and a
mutation that processes it. The query's batch shape must match the mutation's
args.
import { v } from "convex/values";
import { ping, vBatchQueryArgs, vBatchResult } from "@convex-dev/batch-worker";
import { components, internal } from "./_generated/api";
import { internalMutation, internalQuery, mutation } from "./_generated/server";
const BATCH_SIZE = 10;
// Insert work, then make sure the loop is running.
export const addEvent = mutation({
args: { value: v.number() },
handler: async (ctx, { value }) => {
await ctx.db.insert("events", { value });
await ping(ctx, components.batchWorker, {
name: "events", // distinct names give you independent queues
workQuery: internal.example.getBatch,
workerMutation: internal.example.processBatch,
});
},
});
// Return the next batch of work, or `idle` when there's nothing to do.
export const getBatch = internalQuery({
args: vBatchQueryArgs, // { name } — lets one query serve multiple queues
returns: vBatchResult(v.object({ ids: v.array(v.id("events")) })),
handler: async (ctx) => {
const events = await ctx.db.query("events").take(BATCH_SIZE);
if (events.length === 0) {
return { kind: "idle" as const };
// Or, if you know when the next item is due:
// return { kind: "idle" as const, timeoutMs: 30_000 };
}
return { kind: "work" as const, batch: { ids: events.map((e) => e._id) } };
},
});
// Process one batch. The worker owns cleanup — delete what you process!
export const processBatch = internalMutation({
args: { ids: v.array(v.id("events")) },
handler: async (ctx, { ids }) => {
// ... do the work (sum, call an API, schedule downstream jobs, etc.) ...
for (const id of ids) {
await ctx.db.delete("events", id);
}
// Returning nothing re-runs immediately to drain the rest.
},
});The component does not clean up your work for you — your worker mutation is responsible for deleting (or marking complete / advancing past) the rows it processed, otherwise the next query will return them again.
Steering the loop dynamically
Your worker mutation may return { debounceMs } to throttle the
loop:
return {
// Don't run again — and ignore pings — for at least this long (debounce).
debounceMs: 30_000,
};Similarly, when there's no work your query can return
{ kind: "idle", timeoutMs } to ensure it wakes up after some time even if ping is not called. A ping still wakes it immediately.
return {
kind: "idle",
// Keep polling this long before transitioning to idle.
cooldownMs: 10_000,
// How often to poll while cooling down.
pollIntervalMs: 250,
// After cooling down, wake again after at most this long even if no ping
// arrives. Measured from this query response, so re-run it each query if you
// want it to track a fixed deadline. A ping still wakes it sooner.
timeoutMs: 60_000,
};Multiple queues
Give each queue a distinct name. The name is passed to your query as
args.name, so one query/mutation pair can serve many queues:
await ping(ctx, components.batchWorker, {
name: "emails",
workQuery: internal.email.getBatch,
workerMutation: internal.email.send,
});Configuration
Pass config to ping (it's stored on the worker and refreshed when it
changes):
await ping(ctx, components.batchWorker, {
name: "events",
workQuery: internal.example.getBatch,
workerMutation: internal.example.processBatch,
config: {
debounceMs: 100, // wait before the first batch so a burst accumulates
// Schedule the liveness monitor this long after the loop's next run.
// Default 1 minute, minimum 10 seconds. Also the retry cadence if your
// work query or worker mutation throws (the loop dies; the monitor restarts
// it).
monitorLagMs: 15_000,
},
});Log level is set via the component's LOG_LEVEL env var (see Installation).
Stopping & resuming
stop halts processing entirely: the loop stops and ping is ignored, so no
new work is picked up. start resumes it (reusing the last pinged
query/mutation). Call them on the component:
await ctx.runMutation(components.batchWorker.lib.stop, { name: "events" });
// ...later, when you want it processing again:
await ctx.runMutation(components.batchWorker.lib.start, { name: "events" });status reports the run state, including whether the worker is stopped.
ping vs start
pingcreates the worker on first call and resumes it when it's idle. It's a no-op while the loop is running or stopped.startresumes astoppedworker, and onlystartwill —pingwon't.
See the full working example in example.ts.
Development
Run the example app with a file watcher that rebuilds the component:
npm i
npm run devRun npm run dev:frontend to interact with it through a Vite app.
How it works
| Table | Written by | Read by |
| ------------- | ----------------------------------- | ----------------------- |
| workers | ping/start/loop (transitions) | ping/start, monitor |
| workerState | loop (every iteration) | loop, monitor |
The high-churn loop state lives in workerState (generation, heartbeat, the
scheduled runner, and the monitor), separate from the rarely-written workers
doc (which holds the handles, config, and run-status: idle / running /
stopped, plus a pointer to its workerState). That lets ping/start —
which you call on every insert — read workers and return without conflicting
(OCC) with the fast-looping loop. A monotonic generation (in workerState)
guarantees only one loop chain runs at a time: a superseded loop sees a
mismatched generation and exits. workerState is looked up by id and
re-created if it's ever missing. The liveness monitor is scheduled
~monitorLagMs after the loop's next run and pushed back as the loop keeps
running, so it only fires (and restarts the loop) if the loop actually died.
