npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

@emkodev/emkew

v0.1.0-beta.0

Published

Durable, multi-tenant work queue primitive for emko-based applications. SQLite-backed, with retry, dead-letter, lease-based crash recovery, and on-failure compensation.

Readme

@emkodev/emkew

Durable work queue primitive for emko-based applications. SQLite-backed, multi-tenant, retry-aware, with crash recovery and on-failure compensation.

The package is emkew; the primitive it provides is a Queue. emkew is the emkodev brand prefix; Queue is the noun you actually call in code.

What

A small, focused queue primitive that covers every "do this work later, durably, possibly retried, possibly across processes" need with one consistent shape:

  • Durability — tasks survive process crashes; orphaned tasks are reclaimed by the next worker.
  • Multi-tenancy — every operation strictly scoped by actor.businessId; cross-tenant leaks are not possible at the API.
  • Retry — exponential backoff on explicit fail(); immediate re-run on lease-expiry reclaim.
  • Dead-letter — after maxAttempts, status flips to dead-letter.
  • CompensationonFailureName auto-enqueues a follow-up task on terminal failure, inheriting the original workflow.
  • Idempotency — per-tenant idempotency keys make duplicate enqueue safe.
  • SchedulingscheduledAt defers a task to a specific future time.
  • Workflow correlationworkflowId ties multi-step flows for observability.

The design intent: one primitive does the work that "engines, schedulers, retry libraries, pub/sub, outbox patterns" usually require three separate pieces of infrastructure to cover.

Install

bun add @emkodev/emkew @emkodev/emkore

@emkodev/emkore is a peer dependency — its Actor is the identity passed to every queue operation.

Quick start

import { SqliteQueue, runWorker } from "@emkodev/emkew";
import { Actor } from "@emkodev/emkore";
import { Database } from "bun:sqlite";

class MyActor extends Actor {
  constructor(
    private readonly _id: string,
    private readonly _businessId: string,
  ) {
    super();
  }
  override get id(): string { return this._id; }
  override get businessId(): string { return this._businessId; }
  override get token(): string { return ""; }
}

const actor = new MyActor("worker-1", "tenant-a");
const queue = new SqliteQueue(new Database("queue.db"));

// Producer: enqueue work somewhere
await queue.enqueue(actor, {
  name: "create-order-from-quote",
  input: JSON.stringify({ quoteId: "q-42" }),
  workflowId: "opp-close-won-42",
});

// Consumer: a worker process picks it up
const controller = new AbortController();
await runWorker(
  queue,
  async (task) => {
    if (task.name === "create-order-from-quote") {
      const { quoteId } = JSON.parse(task.input);
      // ... business logic. Throw to fail, return string to store as result.
    }
  },
  { actor, pollMs: 100, leaseMs: 60_000, signal: controller.signal },
);

API

class Queue (abstract)

The contract any backend implements:

| Method | Purpose | | --- | --- | | enqueue(actor, spec) | Add a task. Returns the task id (or existing id if idempotencyKey matches). | | claim(actor, leaseMs) | Atomically pick the next pending task for the actor's tenant. Flips to running. Returns null if nothing is ready. | | complete(actor, taskId, result?) | Mark a task done. Caller's tenant must match. | | fail(actor, taskId, error) | Record failure. Either retry-with-backoff or DLQ-with-optional-compensation, based on attempts vs maxAttempts. | | requeue(actor, taskId) | Pull a task out of dead-letter back to pending. Resets attempts. | | listByWorkflow(actor, workflowId) | Observability: every task tagged with this workflow, ordered by createdAt. |

class SqliteQueue extends Queue

bun:sqlite-backed reference implementation.

new SqliteQueue(db, opts?)

Constructor options:

| Option | Default | Purpose | | --- | --- | --- | | now | () => Date.now() | Clock injection for tests. | | backoff | (attempts) => 1000 * 2^(attempts-1) | Backoff function used after explicit fail. | | defaultMaxAttempts | 3 | Used when a TaskSpec doesn't specify. | | applySchema | true | Pass false if your project runs DDL via its own loader, then import SQLITE_QUEUE_SCHEMA_SQL and apply it yourself. |

runWorker(queue, handler, options)

Polling loop that claims, dispatches to your handler, and acks (complete on return, fail on throw).

type WorkerHandler = (task: Task) => Promise<string | void>;

Return a string to store as task.result; return nothing for void semantics.

Other exports

  • Task, TaskSpec — domain types.
  • TaskStatus, TASK_STATUS"pending" | "running" | "done" | "failed" | "dead-letter".
  • WorkerOptions{ actor, pollMs, leaseMs, signal }.
  • SQLITE_QUEUE_SCHEMA_SQL — raw DDL constant.

Semantics

Multi-tenancy

Every queue operation takes an Actor and gates on actor.businessId. A worker for tenant A cannot claim, complete, fail, requeue, or observe tenant B's tasks. Idempotency keys are scoped per-tenant: tenants A and B can both use the key "k" for unrelated tasks without collision.

One worker process = one tenant. To serve N tenants concurrently, spawn N runWorker instances, one per tenant.

Retry and dead-letter

  • Explicit fail()attempts++, scheduled for retry with backoff. When attempts >= maxAttempts, status flips to dead-letter.
  • Lease expiry (worker disappeared without ack) → on the next claim, the original task is reclaimed with attempts++. If that crosses maxAttempts it goes straight to dead-letter. Otherwise it becomes immediately re-claimable (no backoff — the work never had a fair chance).

Compensation

If a task is enqueued with onFailureName, that follow-up is automatically enqueued the moment the task lands in dead-letter — regardless of whether DLQ was reached via explicit fail or via lease expiry. The compensation inherits the original task's workflowId and workflowType for grouping in listByWorkflow.

Idempotency

Pass idempotencyKey at enqueue. If a non-terminal task with that key already exists for the same tenant, enqueue returns the existing task's id and does not create a duplicate. Useful for retried webhook handlers and double-click safety.

Atomicity

SqliteQueue.claim and SqliteQueue.fail use BEGIN IMMEDIATE transactions. As a result:

  • Concurrent claimers can't both grab the same task.
  • Lease-expiry reclaim + DLQ + compensation insert are atomic.
  • Explicit fail + DLQ + compensation insert are atomic.

Contract testing

If you write a non-SQLite backend (Postgres, etc.), the contract suite is re-runnable:

import { runQueueContract } from "@emkodev/emkew/test/queue-contract.ts";
import { runWorkerContract } from "@emkodev/emkew/test/worker-contract.ts";

runQueueContract("MyQueue", (opts) => new MyQueue(opts));
runWorkerContract("MyQueue + runWorker", () => new MyQueue());

License

MIT.