qios
v0.1.0
Published
A flexible queue system built on top of BullMQ with bypass mode and extensible job handling
Maintainers
Readme
qios
Typed, producer-first BullMQ wrapper: bypass mode, bulk-first, multi-job, idempotent enqueue, and observability tags.
Install
pnpm add qios bullmq
# or
npm i qios bullmqQuick start
import { Qios } from "qios";
type Payload = { id: string; message: string };
type Result = { ok: boolean; at: number };
const jobFn = async (data: Payload): Promise<Result> => ({
ok: true,
at: Date.now(),
});
const queue = new Qios<Payload, Result>({
name: "emails",
jobName: "send-email",
bypass: process.env.NODE_ENV !== "production",
});
await queue.enqueue({ id: "1", message: "Hello" });
await queue.enqueueBulk([{ id: "2", message: "Hi" }]);Multi-job queues
type JobName = "send-email" | "send-sms";
const q = new Qios<Payload, Result, JobName>({
name: "notifications",
jobName: "send-email",
});
await q.enqueueAs("send-email", { id: "e1", message: "Email" });
await q.enqueueBulkAs("send-sms", [
{ id: "s1", message: "SMS 1" },
{ id: "s2", message: "SMS 2" },
]);Idempotency (dedup)
await queue.enqueue(
{ id: "abc", message: "Hello" },
{ idempotencyKey: "user-123:hello" } // maps to jobId if none is set
);Observability tags in results
const r = await queue.enqueue({ id: "x", message: "A" });
// r = { mode: 'queued' | 'bypass', jobId?, result?, queueName: 'notifications', jobName: 'send-email' }Typing with job-name unions
type Name = "JOB_A" | "JOB_B";
const q = new Qios<Payload, Result, Name>({ name: "pipe", jobName: "JOB_A" });
const r = await q.enqueueAs("JOB_B", { id: "1", message: "ok" }); // r.jobName typed as NameConfiguration
- Minimal:
new Enqios<Payload, Result>({
name: "queue",
jobName: "default",
bypass: false,
});- Advanced:
new Enqios<Payload, Result>({
name: "queue",
jobName: "default",
bypass: false,
retry: { attempts: 5, backoff: { type: "exponential", delay: 2000 } }, // also 'fixed' | 'custom'
defaultJobOptions: {
priority: 0,
delay: 0,
removeOnComplete: 100,
removeOnFail: 100,
},
// Redis via env:
// REDIS_HOST, REDIS_PORT, REDIS_DB, REDIS_PASSWORD, REDIS_USERNAME
// REDIS_TLS=true, REDIS_TLS_REJECT_UNAUTHORIZED=false
});API
- new Qios<T, R, N extends string>()
- enqueue(data, options?)
- enqueueBulk(items, options?)
- enqueueAs(name, data, options?)
- enqueueBulkAs(name, items, options?)
- options extends BullMQ JobsOptions with idempotencyKey?: string
- lifecycle: start(), stop(opts?), cleanup(graceMs?, limit?), getStatus()
- create({ jobFunction?, config? })
- Alias: export { Qios as ExtensibleQueue }
Backoff
- Producer sets retry.backoff: 'exponential' | 'fixed' | 'custom'
- For 'custom', define worker-side strategy via BullMQ backoffStrategies
Production notes
- Prefer bulk enqueues
- Use idempotency for at-least-once pipelines
- Use TLS/auth via env
- Add jitter in worker custom backoff to avoid retry storms
License
MIT
