@eyjs/jobs
v1.0.0
Published
Job queue and worker management package for EyJS framework
Maintainers
Readme
@eyjs/jobs
Job queue and worker management package for the EyJS framework. Provides background job processing, queue management, and seamless integration with EyJS applications using BullMQ and Redis.
Features
- 🚀 BullMQ Integration - Built on top of the powerful BullMQ job queue
- 🎯 Decorator-Based Jobs - Simple
@WorkerJob()decorator for job handlers - 🔧 TypeScript Support - Full TypeScript definitions and type safety
- ⚡ EyJS Integration - Works seamlessly with EyJS dependency injection
- 📝 Event Integration - Integrates with EyJS event bus for job events
- 🔄 Redis Support - Uses Redis for job persistence and distribution
- ⏰ Scheduled Jobs - Support for cron-based scheduled jobs
- 🛡️ Error Handling - Robust error handling and retry mechanisms
- 📊 Job Monitoring - Built-in job monitoring and statistics
Installation
# Using Bun (recommended)
bun add @eyjs/jobs
# Using npm
npm install @eyjs/jobs
# Using yarn
yarn add @eyjs/jobs
# Using pnpm
pnpm add @eyjs/jobsPrerequisites
This package requires:
@eyjs/core@^1.0.4- EyJS core framework@eyjs/event-bus@^1.0.0- EyJS event bus (for job events)bullmq@^5.52.2- BullMQ job queueioredis@^5.6.1- Redis clientmoment@^2.30.1- Date manipulation
Quick Start
1. Redis Setup
First, ensure Redis is running:
# Using Docker
docker run -d -p 6379:6379 redis:alpine
# Or install locally
# Ubuntu/Debian
sudo apt-get install redis-server
# macOS
brew install redis2. Environment Configuration
# Redis configuration
REDIS_HOST=localhost
REDIS_PORT=6379
REDIS_PASSWORD=
REDIS_DB=0
# Job queue configuration
JOB_QUEUE_CONCURRENCY=5
JOB_QUEUE_RETRY_ATTEMPTS=3
JOB_QUEUE_RETRY_DELAY=10003. Basic Job Setup
import { WorkerJob, QueueService } from '@eyjs/jobs'
import { Injectable } from '@eyjs/core'
// Define job data interface
interface EmailJobData {
to: string
subject: string
body: string
}
interface ProcessOrderJobData {
orderId: string
customerId: string
items: any[]
}
// Job handlers
@Injectable()
export class EmailJobHandler {
@WorkerJob('send-email')
async handleEmailJob(data: EmailJobData) {
console.log(`Sending email to ${data.to}: ${data.subject}`)
// Simulate email sending
await new Promise(resolve => setTimeout(resolve, 1000))
console.log('Email sent successfully')
}
}
@Injectable()
export class OrderJobHandler {
@WorkerJob('process-order')
async handleProcessOrder(data: ProcessOrderJobData) {
console.log(`Processing order ${data.orderId}`)
// Process the order
await this.orderService.processOrder(data.orderId)
// Send confirmation email
await this.queueService.add('send-email', {
to: data.customerId,
subject: 'Order Confirmation',
body: `Your order ${data.orderId} has been processed`
})
}
}4. Using the Queue Service
import { QueueService } from '@eyjs/jobs'
import { Injectable } from '@eyjs/core'
@Injectable()
export class UserService {
constructor(private readonly queueService: QueueService) {}
async createUser(data: { email: string; name: string }) {
const user = await this.userRepository.create(data)
// Add job to queue
await this.queueService.add('send-welcome-email', {
userId: user.id,
email: user.email,
name: user.name
})
return user
}
async processBulkUsers(userIds: string[]) {
// Add bulk job
await this.queueService.addBulk('process-user',
userIds.map(id => ({ userId: id }))
)
}
}API Reference
Core Classes
QueueService
Main service for managing job queues.
import { QueueService } from '@eyjs/jobs'
const queueService = new QueueService()
// Add a job
await queueService.add('job-name', { data: 'value' })
// Add a delayed job
await queueService.add('job-name', { data: 'value' }, { delay: 5000 })
// Add a scheduled job
await queueService.add('job-name', { data: 'value' }, {
repeat: { cron: '0 0 * * *' }
})@WorkerJob(jobName)
Decorator for marking methods as job handlers.
@WorkerJob('send-email')
async handleEmailJob(data: EmailJobData) {
// Handle the job
}Job Options
interface JobOptions {
delay?: number // Delay in milliseconds
attempts?: number // Number of retry attempts
backoff?: { // Backoff strategy
type: 'fixed' | 'exponential'
delay: number
}
removeOnComplete?: number // Keep N completed jobs
removeOnFail?: number // Keep N failed jobs
repeat?: { // Repeat options
cron?: string // Cron expression
every?: number // Interval in milliseconds
tz?: string // Timezone
}
}Usage Examples
Basic Job Processing
import { WorkerJob, QueueService } from '@eyjs/jobs'
import { Injectable } from '@eyjs/core'
interface NotificationJobData {
userId: string
message: string
type: 'email' | 'sms' | 'push'
}
@Injectable()
export class NotificationJobHandler {
constructor(
private readonly emailService: EmailService,
private readonly smsService: SmsService,
private readonly pushService: PushService
) {}
@WorkerJob('send-notification')
async handleNotification(data: NotificationJobData) {
switch (data.type) {
case 'email':
await this.emailService.send(data.userId, data.message)
break
case 'sms':
await this.smsService.send(data.userId, data.message)
break
case 'push':
await this.pushService.send(data.userId, data.message)
break
}
}
}
// Usage in service
@Injectable()
export class NotificationService {
constructor(private readonly queueService: QueueService) {}
async sendNotification(
userId: string,
message: string,
type: 'email' | 'sms' | 'push'
) {
await this.queueService.add('send-notification', {
userId,
message,
type
})
}
}Scheduled Jobs
import { WorkerJob, QueueService } from '@eyjs/jobs'
import { Injectable } from '@eyjs/core'
@Injectable()
export class ScheduledJobHandler {
@WorkerJob('daily-report')
async handleDailyReport() {
console.log('Generating daily report...')
const report = await this.reportService.generateDailyReport()
await this.emailService.sendReport(report)
console.log('Daily report sent')
}
@WorkerJob('cleanup-old-data')
async handleCleanup() {
console.log('Cleaning up old data...')
const deletedCount = await this.dataService.cleanupOldData()
console.log(`Cleaned up ${deletedCount} records`)
}
}
// Schedule jobs
@Injectable()
export class JobScheduler {
constructor(private readonly queueService: QueueService) {}
async scheduleJobs() {
// Daily report at 9 AM
await this.queueService.add('daily-report', {}, {
repeat: { cron: '0 9 * * *' }
})
// Cleanup every Sunday at 2 AM
await this.queueService.add('cleanup-old-data', {}, {
repeat: { cron: '0 2 * * 0' }
})
// Health check every 5 minutes
await this.queueService.add('health-check', {}, {
repeat: { every: 5 * 60 * 1000 }
})
}
}Job with Retry Logic
import { WorkerJob } from '@eyjs/jobs'
import { Injectable } from '@eyjs/core'
@Injectable()
export class ApiJobHandler {
@WorkerJob('call-external-api')
async handleApiCall(data: { url: string; payload: any }) {
try {
const response = await fetch(data.url, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(data.payload)
})
if (!response.ok) {
throw new Error(`API call failed: ${response.status}`)
}
return await response.json()
} catch (error) {
console.error('API call failed:', error)
throw error // This will trigger retry if configured
}
}
}
// Add job with retry configuration
await this.queueService.add('call-external-api', {
url: 'https://api.example.com/webhook',
payload: { event: 'user.created', userId: '123' }
}, {
attempts: 5,
backoff: {
type: 'exponential',
delay: 2000
}
})Bulk Job Processing
import { WorkerJob, QueueService } from '@eyjs/jobs'
import { Injectable } from '@eyjs/core'
interface BulkEmailJobData {
emails: string[]
subject: string
template: string
}
@Injectable()
export class BulkEmailJobHandler {
@WorkerJob('send-bulk-email')
async handleBulkEmail(data: BulkEmailJobData) {
console.log(`Sending bulk email to ${data.emails.length} recipients`)
const promises = data.emails.map(email =>
this.emailService.sendTemplate(email, data.template, {
subject: data.subject
})
)
await Promise.all(promises)
console.log('Bulk email sent successfully')
}
}
// Usage
@Injectable()
export class MarketingService {
constructor(private readonly queueService: QueueService) {}
async sendNewsletter(emails: string[], subject: string) {
await this.queueService.add('send-bulk-email', {
emails,
subject,
template: 'newsletter'
})
}
}Job Events Integration
import { WorkerJob } from '@eyjs/jobs'
import { Injectable } from '@eyjs/core'
import { EventBus } from '@eyjs/event-bus'
export class JobStartedEvent {
constructor(
public readonly jobId: string,
public readonly jobName: string,
public readonly data: any,
public readonly timestamp: Date = new Date()
) {}
}
export class JobCompletedEvent {
constructor(
public readonly jobId: string,
public readonly jobName: string,
public readonly result: any,
public readonly timestamp: Date = new Date()
) {}
}
export class JobFailedEvent {
constructor(
public readonly jobId: string,
public readonly jobName: string,
public readonly error: string,
public readonly timestamp: Date = new Date()
) {}
}
@Injectable()
export class JobEventHandler {
constructor(private readonly eventBus: EventBus) {}
@WorkerJob('process-payment')
async handlePaymentJob(data: { orderId: string; amount: number }) {
// Emit job started event
await this.eventBus.publish(new JobStartedEvent(
'payment-job-123',
'process-payment',
data
))
try {
// Process payment
const result = await this.paymentService.processPayment(data.orderId, data.amount)
// Emit job completed event
await this.eventBus.publish(new JobCompletedEvent(
'payment-job-123',
'process-payment',
result
))
return result
} catch (error) {
// Emit job failed event
await this.eventBus.publish(new JobFailedEvent(
'payment-job-123',
'process-payment',
error.message
))
throw error
}
}
}Advanced Usage
Custom Job Processor
import { WorkerService } from '@eyjs/jobs'
import { Injectable } from '@eyjs/core'
@Injectable()
export class CustomWorkerService extends WorkerService {
constructor() {
super({
connection: {
host: process.env.REDIS_HOST || 'localhost',
port: parseInt(process.env.REDIS_PORT || '6379'),
password: process.env.REDIS_PASSWORD
},
concurrency: parseInt(process.env.JOB_QUEUE_CONCURRENCY || '5')
})
}
async processJob(jobName: string, data: any) {
// Custom job processing logic
console.log(`Processing custom job: ${jobName}`)
// Call parent method for standard processing
return await super.processJob(jobName, data)
}
}Job Monitoring
import { QueueService } from '@eyjs/jobs'
import { Injectable } from '@eyjs/core'
@Injectable()
export class JobMonitoringService {
constructor(private readonly queueService: QueueService) {}
async getJobStats() {
const queues = await this.queueService.getQueues()
const stats = {}
for (const queue of queues) {
const [waiting, active, completed, failed] = await Promise.all([
queue.getWaiting(),
queue.getActive(),
queue.getCompleted(),
queue.getFailed()
])
stats[queue.name] = {
waiting: waiting.length,
active: active.length,
completed: completed.length,
failed: failed.length
}
}
return stats
}
async getJobDetails(jobId: string) {
const queues = await this.queueService.getQueues()
for (const queue of queues) {
const job = await queue.getJob(jobId)
if (job) {
return {
id: job.id,
name: job.name,
data: job.data,
progress: job.progress,
state: await job.getState(),
createdAt: new Date(job.timestamp),
processedAt: job.processedOn ? new Date(job.processedOn) : null,
failedReason: job.failedReason
}
}
}
return null
}
}Job Priority and Rate Limiting
import { WorkerJob, QueueService } from '@eyjs/jobs'
import { Injectable } from '@eyjs/core'
@Injectable()
export class PriorityJobHandler {
@WorkerJob('high-priority-task')
async handleHighPriorityTask(data: any) {
console.log('Processing high priority task')
// Process high priority task
}
@WorkerJob('low-priority-task')
async handleLowPriorityTask(data: any) {
console.log('Processing low priority task')
// Process low priority task
}
}
// Usage with priority
await this.queueService.add('high-priority-task', data, {
priority: 10 // Higher number = higher priority
})
await this.queueService.add('low-priority-task', data, {
priority: 1 // Lower number = lower priority
})
// Rate limiting
await this.queueService.add('api-call', data, {
rateLimiter: {
max: 100, // Maximum 100 jobs
duration: 60000 // Per minute
}
})Configuration
Redis Configuration
import { QueueService } from '@eyjs/jobs'
const queueService = new QueueService({
connection: {
host: process.env.REDIS_HOST || 'localhost',
port: parseInt(process.env.REDIS_PORT || '6379'),
password: process.env.REDIS_PASSWORD,
db: parseInt(process.env.REDIS_DB || '0')
},
defaultJobOptions: {
removeOnComplete: 100,
removeOnFail: 50,
attempts: 3,
backoff: {
type: 'exponential',
delay: 2000
}
}
})Environment Variables
# Redis Configuration
REDIS_HOST=localhost
REDIS_PORT=6379
REDIS_PASSWORD=
REDIS_DB=0
# Job Queue Configuration
JOB_QUEUE_CONCURRENCY=5
JOB_QUEUE_RETRY_ATTEMPTS=3
JOB_QUEUE_RETRY_DELAY=1000
JOB_QUEUE_REMOVE_ON_COMPLETE=100
JOB_QUEUE_REMOVE_ON_FAIL=50
# Monitoring
JOB_MONITORING_ENABLED=true
JOB_MONITORING_PORT=3001Testing
Unit Tests
import { describe, it, expect, beforeEach } from 'bun:test'
import { QueueService } from '@eyjs/jobs'
describe('QueueService', () => {
let queueService: QueueService
beforeEach(async () => {
queueService = new QueueService({
connection: {
host: 'localhost',
port: 6379
}
})
})
it('should add job to queue', async () => {
const job = await queueService.add('test-job', { data: 'test' })
expect(job).toBeDefined()
expect(job.name).toBe('test-job')
})
it('should process job', async () => {
let processed = false
queueService.registerHandler('test-job', async (data) => {
processed = true
return data
})
await queueService.add('test-job', { data: 'test' })
// Wait for job to be processed
await new Promise(resolve => setTimeout(resolve, 100))
expect(processed).toBe(true)
})
})Integration Tests
import { describe, it, expect } from 'bun:test'
describe('Job Integration', () => {
it('should process email job', async () => {
const emailService = container.get(EmailService)
const queueService = container.get(QueueService)
const sendEmailSpy = spyOn(emailService, 'sendEmail')
await queueService.add('send-email', {
to: '[email protected]',
subject: 'Test',
body: 'Test email'
})
// Wait for job processing
await new Promise(resolve => setTimeout(resolve, 1000))
expect(sendEmailSpy).toHaveBeenCalledWith('[email protected]', 'Test', 'Test email')
})
})Performance
Optimization Tips
- Batch Processing
@WorkerJob('process-batch')
async handleBatch(data: { items: any[] }) {
// Process multiple items in one job
const results = await Promise.all(
data.items.map(item => this.processItem(item))
)
return results
}- Job Concurrency
const queueService = new QueueService({
concurrency: 10 // Process up to 10 jobs simultaneously
})- Memory Management
const queueService = new QueueService({
defaultJobOptions: {
removeOnComplete: 50, // Keep only 50 completed jobs
removeOnFail: 20 // Keep only 20 failed jobs
}
})Troubleshooting
Common Issues
1. "Redis connection failed" error
- Ensure Redis server is running
- Check Redis connection configuration
- Verify network connectivity
2. Jobs not being processed
- Check if workers are running
- Verify job handlers are registered
- Check Redis connection
3. Memory usage issues
- Configure
removeOnCompleteandremoveOnFail - Monitor job queue sizes
- Use job batching for large datasets
Debug Mode
Enable debug logging:
DEBUG=eyjs:jobsContributing
Contributions are welcome! Please read our Contributing Guide for details.
License
This project is licensed under the MIT License - see the LICENSE file for details.
