@emkodev/emkew
v0.1.0-beta.0
Published
Durable, multi-tenant work queue primitive for emko-based applications. SQLite-backed, with retry, dead-letter, lease-based crash recovery, and on-failure compensation.
Maintainers
Readme
@emkodev/emkew
Durable work queue primitive for emko-based applications. SQLite-backed, multi-tenant, retry-aware, with crash recovery and on-failure compensation.
The package is emkew; the primitive it provides is a Queue. emkew is the emkodev brand prefix;
Queueis the noun you actually call in code.
What
A small, focused queue primitive that covers every "do this work later, durably, possibly retried, possibly across processes" need with one consistent shape:
- Durability — tasks survive process crashes; orphaned tasks are reclaimed by the next worker.
- Multi-tenancy — every operation strictly scoped by
actor.businessId; cross-tenant leaks are not possible at the API. - Retry — exponential backoff on explicit
fail(); immediate re-run on lease-expiry reclaim. - Dead-letter — after
maxAttempts, status flips todead-letter. - Compensation —
onFailureNameauto-enqueues a follow-up task on terminal failure, inheriting the original workflow. - Idempotency — per-tenant idempotency keys make duplicate enqueue safe.
- Scheduling —
scheduledAtdefers a task to a specific future time. - Workflow correlation —
workflowIdties multi-step flows for observability.
The design intent: one primitive does the work that "engines, schedulers, retry libraries, pub/sub, outbox patterns" usually require three separate pieces of infrastructure to cover.
Install
bun add @emkodev/emkew @emkodev/emkore@emkodev/emkore is a peer dependency — its Actor is the identity passed to every queue operation.
Quick start
import { SqliteQueue, runWorker } from "@emkodev/emkew";
import { Actor } from "@emkodev/emkore";
import { Database } from "bun:sqlite";
class MyActor extends Actor {
constructor(
private readonly _id: string,
private readonly _businessId: string,
) {
super();
}
override get id(): string { return this._id; }
override get businessId(): string { return this._businessId; }
override get token(): string { return ""; }
}
const actor = new MyActor("worker-1", "tenant-a");
const queue = new SqliteQueue(new Database("queue.db"));
// Producer: enqueue work somewhere
await queue.enqueue(actor, {
name: "create-order-from-quote",
input: JSON.stringify({ quoteId: "q-42" }),
workflowId: "opp-close-won-42",
});
// Consumer: a worker process picks it up
const controller = new AbortController();
await runWorker(
queue,
async (task) => {
if (task.name === "create-order-from-quote") {
const { quoteId } = JSON.parse(task.input);
// ... business logic. Throw to fail, return string to store as result.
}
},
{ actor, pollMs: 100, leaseMs: 60_000, signal: controller.signal },
);API
class Queue (abstract)
The contract any backend implements:
| Method | Purpose |
| --- | --- |
| enqueue(actor, spec) | Add a task. Returns the task id (or existing id if idempotencyKey matches). |
| claim(actor, leaseMs) | Atomically pick the next pending task for the actor's tenant. Flips to running. Returns null if nothing is ready. |
| complete(actor, taskId, result?) | Mark a task done. Caller's tenant must match. |
| fail(actor, taskId, error) | Record failure. Either retry-with-backoff or DLQ-with-optional-compensation, based on attempts vs maxAttempts. |
| requeue(actor, taskId) | Pull a task out of dead-letter back to pending. Resets attempts. |
| listByWorkflow(actor, workflowId) | Observability: every task tagged with this workflow, ordered by createdAt. |
class SqliteQueue extends Queue
bun:sqlite-backed reference implementation.
new SqliteQueue(db, opts?)Constructor options:
| Option | Default | Purpose |
| --- | --- | --- |
| now | () => Date.now() | Clock injection for tests. |
| backoff | (attempts) => 1000 * 2^(attempts-1) | Backoff function used after explicit fail. |
| defaultMaxAttempts | 3 | Used when a TaskSpec doesn't specify. |
| applySchema | true | Pass false if your project runs DDL via its own loader, then import SQLITE_QUEUE_SCHEMA_SQL and apply it yourself. |
runWorker(queue, handler, options)
Polling loop that claims, dispatches to your handler, and acks (complete on return, fail on throw).
type WorkerHandler = (task: Task) => Promise<string | void>;Return a string to store as task.result; return nothing for void semantics.
Other exports
Task,TaskSpec— domain types.TaskStatus,TASK_STATUS—"pending" | "running" | "done" | "failed" | "dead-letter".WorkerOptions—{ actor, pollMs, leaseMs, signal }.SQLITE_QUEUE_SCHEMA_SQL— raw DDL constant.
Semantics
Multi-tenancy
Every queue operation takes an Actor and gates on actor.businessId. A worker for tenant A cannot claim, complete, fail, requeue, or observe tenant B's tasks. Idempotency keys are scoped per-tenant: tenants A and B can both use the key "k" for unrelated tasks without collision.
One worker process = one tenant. To serve N tenants concurrently, spawn N runWorker instances, one per tenant.
Retry and dead-letter
- Explicit
fail()→attempts++, scheduled for retry with backoff. Whenattempts >= maxAttempts, status flips todead-letter. - Lease expiry (worker disappeared without ack) → on the next
claim, the original task is reclaimed withattempts++. If that crossesmaxAttemptsit goes straight todead-letter. Otherwise it becomes immediately re-claimable (no backoff — the work never had a fair chance).
Compensation
If a task is enqueued with onFailureName, that follow-up is automatically enqueued the moment the task lands in dead-letter — regardless of whether DLQ was reached via explicit fail or via lease expiry. The compensation inherits the original task's workflowId and workflowType for grouping in listByWorkflow.
Idempotency
Pass idempotencyKey at enqueue. If a non-terminal task with that key already exists for the same tenant, enqueue returns the existing task's id and does not create a duplicate. Useful for retried webhook handlers and double-click safety.
Atomicity
SqliteQueue.claim and SqliteQueue.fail use BEGIN IMMEDIATE transactions. As a result:
- Concurrent claimers can't both grab the same task.
- Lease-expiry reclaim + DLQ + compensation insert are atomic.
- Explicit
fail+ DLQ + compensation insert are atomic.
Contract testing
If you write a non-SQLite backend (Postgres, etc.), the contract suite is re-runnable:
import { runQueueContract } from "@emkodev/emkew/test/queue-contract.ts";
import { runWorkerContract } from "@emkodev/emkew/test/worker-contract.ts";
runQueueContract("MyQueue", (opts) => new MyQueue(opts));
runWorkerContract("MyQueue + runWorker", () => new MyQueue());License
MIT.
