@objectstack/service-queue
v9.11.0
Published
Queue Service for ObjectStack — implements IQueueService with in-memory and durable DB-backed (sys_job_queue) adapters
Maintainers
Readme
@objectstack/service-queue
Queue Service for ObjectStack — implements IQueueService with an in-memory
adapter and a durable, database-backed adapter (sys_job_queue).
Adapters
| Adapter | Durable | Multi-node | Use |
| --- | --- | --- | --- |
| memory | No (in-process) | No | dev / test / ephemeral work |
| db | Yes (sys_job_queue) | Yes (lease-based claim) | production default |
| auto (default) | — | — | db when an ObjectQL engine is present, else memory |
The db adapter persists messages, retries, and the dead-letter queue to the
sys_job_queue object. Multiple worker processes claim messages from the
shared table with a lease (leaseMs), so it works across a multi-node
deployment without any external broker — no Redis required. Studio can list
and replay the DLQ because sys_job_queue is a first-class object.
A BullMQ/Redis adapter is not shipped. The durable path is the DB adapter; it rides on the same datasource the runtime already uses. If you genuinely need a Redis-backed broker, register a custom
IQueueServiceviactx.registerService('queue', myAdapter).
Installation
pnpm add @objectstack/service-queueUsage
import { ObjectKernel } from '@objectstack/core';
import { QueueServicePlugin } from '@objectstack/service-queue';
const kernel = new ObjectKernel();
// 'auto' (default): durable DbQueueAdapter when ObjectQL is available, else memory
kernel.use(new QueueServicePlugin({ adapter: 'auto' }));
await kernel.bootstrap();
const queue = kernel.getService('queue'); // IQueueService
// Publish a message
await queue.subscribe('email', async (msg) => {
await sendEmail(msg.data);
});
await queue.publish('email', { to: '[email protected]', template: 'welcome' }, {
// delay / priority / retries (see QueuePublishOptions)
attempts: 3,
});Configuration
// Force the durable DB adapter (requires an ObjectQL engine)
new QueueServicePlugin({
adapter: 'db',
db: {
pollIntervalMs: 1000, // worker poll cadence
batchSize: 10, // messages claimed per tick
leaseMs: 30000, // lease before another worker may reclaim
idempotencyWindowMs: 24 * 60 * 60 * 1000,
},
});
// In-process only (non-durable) — dev / test
new QueueServicePlugin({ adapter: 'memory' });Service API
Implements IQueueService from @objectstack/spec/contracts:
interface IQueueService {
publish<T>(queue: string, data: T, options?: QueuePublishOptions): Promise<string>;
subscribe<T>(queue: string, handler: QueueHandler<T>): Promise<void>;
unsubscribe(queue: string): Promise<void>;
getQueueSize?(queue: string): Promise<number>;
purge?(queue: string): Promise<void>;
// Dead-letter queue (db adapter)
listFailed?(queue?: string, options?: { limit?: number; offset?: number }): Promise<QueueMessageRecord[]>;
replay?(messageId: string): Promise<void>;
purgeFailed?(messageId: string): Promise<void>;
}Best Practices
- Idempotent handlers — messages may be re-delivered after a lease expires.
- Small payloads — keep message data compact for fast serialization.
- Handle the DLQ — monitor
listFailed()andreplay()poisoned messages. - Use
dbin production —memoryloses in-flight work on restart and does not coordinate across nodes.
License
Apache-2.0. See LICENSING.md.
See Also
- @objectstack/spec/contracts
- @objectstack/trigger-schedule — cron/interval triggers on top of the job service
