@krutai/worker
v1.0.0
Published
Background worker with retry mechanism for KrutAI (powered by BullMQ)
Readme
@krutai/worker
Background job producer + worker wrapper for KrutAI, powered by BullMQ.
Install
npm install @krutai/workerRequirements
- Node.js 18+ (uses global
fetch) - Access to a KrutAI backend that exposes worker management endpoints
- KrutAI server URL (example:
http://localhost:8000) - A valid KrutAI API key
Quick start
import { workerService } from '@krutai/worker';
const worker = workerService({
apiKey: process.env.KRUTAI_API_KEY!,
serverUrl: 'http://localhost:8000',
defaultJobOptions: {
attempts: 5,
backoff: { type: 'exponential', delay: 1000 },
},
});
await worker.initialize();initialize() does two things:
- validates your API key
- fetches Redis connection config from the backend (
/worker-manage/config)
Enqueue jobs
const jobId = await worker.addJob(
'emails',
'send-welcome-email',
{ userId: 'u_123', email: '[email protected]' },
{
attempts: 3,
backoff: { type: 'fixed', delay: 2000 },
removeOnComplete: true,
},
);
console.log('enqueued job:', jobId);Register a processor
worker.registerProcessor(
'emails',
async (job) => {
// your business logic
await sendWelcomeEmail(job.data.email);
return { ok: true };
},
{
concurrency: 5,
events: {
onCompleted: async (job, result) => {
console.log('completed', job.id, result);
},
onFailed: async (job, error) => {
console.error('failed', job?.id, error.message);
},
onError: async (error) => {
console.error('worker error', error.message);
},
},
},
);Retry behavior
Retry options are resolved in this order:
- options passed to
addJob(...) defaultJobOptionsfromworkerService(...)config- fallback defaults:
attempts = 3,backoff = { type: 'exponential', delay: 1000 }
Queue controls and metrics
await worker.pauseQueue('emails');
await worker.resumeQueue('emails');
const counts = await worker.getJobCounts('emails');
console.log(counts); // wait, active, completed, failed, delayed, pausedShutdown
Always close workers and queues during app shutdown:
await worker.close();API summary
workerService(config)-> creates aWorkerServiceinitialize()-> validates key + loads Redis configfetchRedisConfig()-> refreshes Redis config manuallyaddJob(queueName, jobName, data, options?)-> enqueue a jobregisterProcessor(queueName, processor, options?)-> attach worker processorpauseQueue(queueName)/resumeQueue(queueName)-> queue state controlgetJobCounts(queueName)-> queue countersclose()-> graceful shutdown
Error handling
- Invalid API key format throws
KrutAIKeyValidationError - Calling queue methods before initialization throws an initialization error
