chasquimq
v1.3.0
Published
The fastest open-source message broker for Redis.
Maintainers
Readme
chasquimq (Node.js)
Node.js bindings for ChasquiMQ — the fastest open-source message broker for Redis. The Rust engine pulls jobs; JavaScript / TypeScript handlers process them.
Status: 1.0. Prebuilt native binaries for
darwin-arm64,darwin-x64,linux-x64-gnu,linux-arm64-gnu,win32-x64-msvc.
Install
npm install chasquimqThe install resolves a prebuilt platform package (chasquimq-<platform>-<arch>) — no Rust toolchain or node-gyp required.
Quickstart
import {
Queue, Worker, BackoffSpec, UnrecoverableError, type Job,
} from "chasquimq"
const conn = { host: "127.0.0.1", port: 6379 }
async function main() {
await using queue = new Queue<{ to: string }, { sentAt: number }>("emails", { connection: conn })
await using worker = new Worker<{ to: string }, { sentAt: number }>("emails",
async (job) => {
const to = job.data.to
console.log(`sending to ${to} (attempt ${job.attemptsMade + 1})`)
if (to.includes("@unrecoverable")) {
const err = new UnrecoverableError(`hard bounce: ${to}`)
throw err
}
return { sentAt: Date.now() / 1000 }
},
{ connection: conn, storeResults: true })
// Plain enqueue.
await queue.add("welcome", { to: "[email protected]" })
// Stable jobId — second call with the same id is a no-op (idempotent).
await queue.addUnique("welcome", { to: "[email protected]" }, { jobId: "welcome:alice" })
// Per-job retry with exponential backoff.
await queue.add("welcome", { to: "[email protected]" }, {
attempts: 3,
backoff: BackoffSpec.exponential(100, { multiplier: 2.0, maxMs: 10_000 }),
})
// Delayed enqueue (milliseconds).
await queue.add("welcome", { to: "[email protected]" }, { delay: 2_000 })
// Block on a single job's result.
const job = await queue.add("welcome", { to: "[email protected]" })
const result = await job.waitForResult({ timeoutMs: 30_000 })
console.log(result)
// Drain the worker (interrupt with SIGINT in real usage).
await worker.run()
}
main()await using (TypeScript 5.2+) calls worker.close() and queue.close() automatically when the block exits — even if a step throws or the user Ctrl+Cs. If you can't use await using yet, fall back to manual try/finally with await worker.close() and await queue.close().
What's in the box
| Surface | What it does |
|---|---|
| Queue<DataType, ResultType, NameType> | Producer + queue inspection. add / addBulk / addUnique / getJobResult / peekDlq / replayDlq / cancelDelayed / getRepeatableJobs / removeRepeatableByKey. [Symbol.asyncDispose]. |
| Worker<DataType, ResultType, NameType> | Consumer pool. tokio-side dispatch, opt-in result storage (storeResults: true), EventEmitter events (completed / failed / error). [Symbol.asyncDispose]. |
| Job<DataType, ResultType, NameType> | Read-only handle. id, name, data, attemptsMade, waitForResult({ timeoutMs, intervalMs, signal }). |
| QueueEvents | Cross-process pub/sub via the events stream. Subscribe to completed / failed / dlq / retry-scheduled / delayed / drained. [Symbol.asyncDispose]. |
| BackoffSpec | Builders: .fixed(delayMs) / .exponential(initialMs, { multiplier, maxMs, jitterMs }). |
| UnrecoverableError | Throw from your handler to bypass retries and route the job directly to DLQ. |
| NotSupportedError | Surfaces from APIs that aren't on the chasquimq roadmap (e.g. parent/child flows). |
Queue.add(name, data, opts) accepts: delay (ms), attempts, backoff, jobId, repeat: { kind: 'cron' | 'every', ... }, missedFires: { kind: 'skip' | 'fire-once' | 'fire-all', maxCatchup? }.
TLS / rediss://
For TLS-fronted Redis (ElastiCache encryption-in-transit, or any non-cluster Redis with TLS), set tls: true on connection, or pass a rediss:// URL directly:
const conn = { host: "my-cluster.cache.amazonaws.com", port: 6379, tls: true }
// or:
const conn = { url: "rediss://my-cluster.cache.amazonaws.com:6379" }Trust roots come from the platform store via rustls-native-certs: keychain on macOS, the OS CA bundle on Linux (probed by openssl-probe), system store on Windows — so AWS Trust CA-signed endpoints work out of the box. For private CAs, point SSL_CERT_FILE at a PEM bundle before launching Node; that env var takes precedence over the platform store.
Rotating IAM tokens
For deployments that use short-lived auth tokens (ElastiCache IAM auth, Vault, AWS Secrets Manager, GCP Secret Manager, ...), pass an async credentialProvider on connection. The engine invokes it on every reconnect / AUTH cycle — never per job, so the hot path is unaffected:
import { ElastiCacheClient } from "@aws-sdk/client-elasticache"
// Sketch — swap in your own token source.
async function fetchIamToken(host: string | null) {
// Real implementations call AWS SDK's `signer.presign` against
// `redis://<replication-group-id>/?Action=connect&User=<user>`, or
// hit Vault's `/database/creds/<role>`. Both return a short-lived
// credential pair; cache + refresh on the schedule your provider
// recommends (ElastiCache rotates every 15 minutes).
return {
username: "iam-user",
password: await mintToken(host),
}
}
const conn = {
url: "rediss://my-cluster.cache.amazonaws.com:6379",
credentialProvider: fetchIamToken,
}A thrown error or rejected Promise from the callback maps to a fred auth error. On the initial connect (the first Queue.add / Worker.run), an auth failure surfaces as a hard rejection on the caller — useful for fail-loud startup of a misconfigured deployment. After the pool is up, the default reconnect_on_auth_error policy treats subsequent auth failures as transient and retries on the next reconnect attempt, so a brief blip in your secrets backend doesn't take the worker down. Permanently broken providers post-startup will retry-loop inside fred until reconnect_max_attempts is exposed to the Node shim.
Power-user surface
The native engine handles ship from the same top-level package:
import { Producer, Consumer, Scheduler } from "chasquimq"There is one user-facing Job — the high-level class returned by Queue.add and passed to your Worker processor. The native binding's Job value-type is internal-only and not re-exported (mirrors the Python shim).
TODOs / known limitations
reconnect_max_attemptsis not yet exposed to the Node shim. A permanently-misconfiguredcredentialProviderwill retry-loop inside fred indefinitely. The engine'sConnectionTuning::reconnect_max_attemptsfield needs a sibling option onConnectionOptions(Node) to cap retries — tracked for a follow-up slice.
See also
- Main repo README — pitch, headline numbers, feature comparison
- Engine internals — retry semantics, delayed jobs, result backends, observability
- Phase 3 design doc — the NAPI-RS binding architecture
License
MIT — see LICENSE at the workspace root.
