npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

@phyxiusjs/queue

v0.2.0

Published

Queue adapter — broker-agnostic consumer that invokes handlers per message

Downloads

182

Readme

Queue

The queue adapter for @phyxiusjs/handler. Broker-agnostic — one consumer, one handler, one journal event per message. Every stability concern (timeout, retry, circuit breaker, concurrency) already lives on the handler. The adapter is the translator.


What this really is

A createQueueConsumer({ source, handler, decode }) that pulls messages from a MessageSource, invokes the handler, and translates the Result into ack / nack. That's the whole surface.

MessageSource is the contract broker adapters implement — SQS, Kafka, Redis Streams, RabbitMQ, BullMQ, wherever. Your code is coupled to the contract, not to the broker.

The payoff is the claim the handler made months ago, now concrete: the same handler runs behind HTTP and behind a queue with identical stability guarantees and identical journal events. A TIMEOUT is a 504 on HTTP and a retry-nack on queue. A BACKPRESSURE_REJECT is a 503 on HTTP and a requeue-now on queue. One source of truth for "what went wrong" — different transport translations.


Installation

npm install @phyxiusjs/queue @phyxiusjs/handler @phyxiusjs/clock

Quick start

import { createSystemClock, ms } from "@phyxiusjs/clock";
import { Journal } from "@phyxiusjs/journal";
import { observe } from "@phyxiusjs/observe";
import { cb, defineHandler, retry, spawn } from "@phyxiusjs/handler";
import { createQueueConsumer } from "@phyxiusjs/queue";

// Wire your broker adapter here (SQS, Redis, etc.) — it implements MessageSource.
import { createSqsSource } from "@phyxiusjs/queue-sqs";

const orderFields = observe.fields({
  customerId: observe.field<string>(),
  amount: observe.number(),
});

const orderSpec = defineHandler({
  name: "order.process",
  input: z.object({ customerId: z.string(), amount: z.number().positive() }),
  output: z.object({ chargeId: z.string() }),
  fields: orderFields,

  timeout: ms(5_000),
  concurrency: { max: 20, queueSize: 100, backpressure: "reject" },
  retry: retry.exponential({ maxAttempts: 3, initialDelay: ms(200) }),
  circuitBreaker: cb.policy({ failureThreshold: 10, resetTimeout: ms(30_000) }),

  run: async ({ customerId, amount }) => {
    orderFields.customerId.set(customerId);
    orderFields.amount.set(amount);
    return { chargeId: await charge(customerId, amount) };
  },
});

const clock = createSystemClock();
const journal = new Journal({ clock });
const handler = await spawn(orderSpec, { clock, journal });

const source = createSqsSource({
  queueUrl: "https://sqs.us-east-1.amazonaws.com/.../orders",
  waitTimeSeconds: 20,
});

const consumer = createQueueConsumer({
  source,
  handler,
  decode: (msg) => msg.body as { customerId: string; amount: number },
  maxConcurrent: 4,
  clock,
});

await consumer.start();

// Graceful shutdown on SIGTERM:
process.on("SIGTERM", async () => {
  await consumer.stop();
  await handler.stop();
});

Same handler spec, same observability, same stability. Swap createSqsSource for a Redis or Kafka source; nothing else changes.


The QueueConsumer surface

interface QueueConsumer {
  start(): Promise<void>; // begin the consume loop
  stop(): Promise<void>; // drain in-flight, close source
  getStatus(): QueueConsumerStatus; // "idle" | "running" | "stopping" | "stopped"
  getInFlight(): number; // how many messages are being processed now
}

start resolves once the loop is running; the loop itself runs until stop. stop is idempotent — safe to call from multiple SIGTERM handlers, unhandledRejection cleanup, or a final afterAll in tests.


The MessageSource contract

interface MessageSource {
  receive(signal?: AbortSignal): Promise<QueueMessage | null>;
  ack(message: QueueMessage): Promise<void>;
  nack(message: QueueMessage, reason: NackReason): Promise<void>;
  close?(): Promise<void>;
}

interface QueueMessage {
  id: string;
  body: unknown;
  headers?: Readonly<Record<string, string>>;
  receivedAt: Instant;
  deliveryCount?: number;
  metadata?: Readonly<Record<string, unknown>>;
}

type NackReason =
  | { type: "retry"; delayMs?: Millis; cause?: string }
  | { type: "dead-letter"; cause: string }
  | { type: "requeue-now"; cause?: string };

Pull-based by design. Any push-based broker can be wrapped in a pull interface (see createMemorySource for the reference implementation); the reverse coupling is much harder to undo. receive should long-poll until a message arrives or return null on idle timeout / abort so the consumer loop can re-check its running flag.

Nack reasons are intent, not implementation. Sources translate them to broker-native operations (SQS ChangeMessageVisibility, Kafka commit-offset skip, Redis XCLAIM, etc.). A broker that doesn't support delayed retry can treat { type: "retry", delayMs: 1000 } the same as { type: "retry" } — no data is lost, the consumer just doesn't get the back-pressure it asked for.


Default onResult mapping

Every HandlerError variant maps to a sensible default outcome. Override per-route with onResult for domain-specific policy.

| Result | Action | Reason | | ---------------------------- | ------ | -------------------------------------------- | | Ok(T) | ack | | | VALIDATION_ERROR("input") | nack | dead-letter (cause: "validation:input") | | VALIDATION_ERROR("output") | nack | dead-letter (cause: "validation:output") | | TIMEOUT | nack | retry (cause: "timeout:Nms") | | HANDLER_ERROR | nack | retry (cause: "handler_error") | | RETRY_EXHAUSTED | nack | dead-letter (cause: "retry_exhausted:N") | | CIRCUIT_OPEN | nack | retry (delayMs = time-until-close) | | BACKPRESSURE_REJECT | nack | requeue-now (cause: "queue_full") | | DROPPED | nack | requeue-now (cause: "dropped") | | HANDLER_NOT_RUNNING | nack | requeue-now (cause: "shutting_down") |

Why these choices:

  • Input validation → DLQ. A malformed message will never succeed; retrying is a guaranteed loop. Route it to a DLQ so an operator can inspect.
  • Output validation → DLQ. The server is buggy for this message shape; retrying won't help until the code changes.
  • Timeout → retry. Probably transient (slow downstream); the broker's visibility window picks it up.
  • Handler error → retry. Unknown cause — one more attempt might succeed. If it won't, the handler's own retry policy will surface RETRY_EXHAUSTED instead.
  • Retry exhausted → DLQ. The handler already retried internally. Don't retry again at the broker layer; that's how duplicate work happens.
  • Circuit open → retry with delay. The breaker's willRetryAfter - openedAt becomes the broker delay. Sources that can't honor it just retry normally — the breaker will re-open until it closes, so the nack path repeats until the downstream recovers.
  • Backpressure / dropped / shutting down → requeue-now. We can't take this message, but another worker (or this worker later) probably can. Put it back immediately.

Override any of this per-consumer with a custom onResult:

createQueueConsumer({
  source,
  handler,
  decode,
  clock,
  onResult: (result, message) => {
    if (result._tag === "Err" && result.error.type === "TIMEOUT") {
      // We log timeouts upstream; don't retry at the broker.
      return { action: "ack" };
    }
    return defaultOnResult(result, message);
  },
});

Concurrency

maxConcurrent (default 1) is the consumer's own in-flight cap. Tune it based on broker semantics:

  • SQS / single-partition brokers: match your Visibility Timeout budget. Higher parallelism means more work in flight, but each message still has the same visibility window.
  • Kafka / partitioned brokers: one consumer per partition is the norm; leave maxConcurrent: 1 and run multiple consumers.
  • Redis Streams with XGROUP: pick a value based on worker CPU, not broker capacity.

The handler's concurrency.max is the real ceiling — messages beyond it will be rejected with BACKPRESSURE_REJECT and nacked with requeue-now. If you see this in your journal, lower the consumer's maxConcurrent rather than raising the handler's concurrency.max, so load-shedding stays visible.


Correlation IDs

The consumer extracts a correlation ID from message headers in this order:

  1. x-correlation-id
  2. correlation-id
  3. The message's own id (always present)

That ID flows into the handler invocation and onto every HandlerEvent in the journal. An HTTP-to-queue hop keeps a single trace — set x-correlation-id on the enqueued message's headers upstream and the downstream consumer inherits it automatically.


Testing

createMemorySource is the reference MessageSource — deterministic, in-memory, Clock-driven. Use it to drive consumer tests without a real broker:

import { createControlledClock, ms } from "@phyxiusjs/clock";
import { createMemorySource, createQueueConsumer } from "@phyxiusjs/queue";

const clock = createControlledClock({ initialTime: 0 });
const source = createMemorySource({ clock });

const consumer = createQueueConsumer({ source, handler, decode, clock });

await consumer.start();
source.enqueue({ body: { customerId: "alice", amount: 42 } });

// Poll for observable effects.
await vi.waitFor(() => {
  expect(source.getAckHistory()).toHaveLength(1);
});

expect(journal.getSnapshot().entries[0].data.source).toBe("queue");

The memory source also exposes getDeadLettered(), getNackHistory(), getPending(), and getInFlight() — use them for precise assertions.


What this does NOT do

  • No broker implementation. This package is the consumer shape; broker adapters (@phyxiusjs/queue-sqs, @phyxiusjs/queue-redis, etc.) live in their own packages with their own SDK dependencies.
  • No transactional outbox / dedupe. Idempotency is a handler concern (the handler sees deliveryCount in the invocation meta). The adapter's job is just ack / nack.
  • No batch consumption. One message = one handler invocation = one journal event. Batching breaks the "same shape as HTTP" invariant.
  • No scheduler / cron. Time-driven work is a different adapter.

What you get

  • Transport-stable observability. Every queue message produces the same HandlerEvent shape as an HTTP request or a cron tick. Single dashboard, single query layer.
  • Every failure mode mapped. No generic retries, no silent swallows. RETRY_EXHAUSTED goes to DLQ automatically so you don't retry at two layers. CIRCUIT_OPEN carries the expected recovery delay.
  • Broker portable. Swap SQS for Redis Streams without touching the handler, the observability schema, the retry policy, or the stability guarantees. The consumer's onResult table is the full translation layer.
  • Deterministic tests. createMemorySource + ControlledClock = no real brokers, no flaky backoff, no race-condition flakes.

The queue adapter is deliberately small. The handler is where the work lives.