npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

@eyjs/jobs

v1.0.0

Published

Job queue and worker management package for EyJS framework

Readme

@eyjs/jobs

npm version License: MIT

Job queue and worker management package for the EyJS framework. Provides background job processing, queue management, and seamless integration with EyJS applications using BullMQ and Redis.

Features

  • 🚀 BullMQ Integration - Built on top of the powerful BullMQ job queue
  • 🎯 Decorator-Based Jobs - Simple @WorkerJob() decorator for job handlers
  • 🔧 TypeScript Support - Full TypeScript definitions and type safety
  • EyJS Integration - Works seamlessly with EyJS dependency injection
  • 📝 Event Integration - Integrates with EyJS event bus for job events
  • 🔄 Redis Support - Uses Redis for job persistence and distribution
  • Scheduled Jobs - Support for cron-based scheduled jobs
  • 🛡️ Error Handling - Robust error handling and retry mechanisms
  • 📊 Job Monitoring - Built-in job monitoring and statistics

Installation

# Using Bun (recommended)
bun add @eyjs/jobs

# Using npm
npm install @eyjs/jobs

# Using yarn
yarn add @eyjs/jobs

# Using pnpm
pnpm add @eyjs/jobs

Prerequisites

This package requires:

  • @eyjs/core@^1.0.4 - EyJS core framework
  • @eyjs/event-bus@^1.0.0 - EyJS event bus (for job events)
  • bullmq@^5.52.2 - BullMQ job queue
  • ioredis@^5.6.1 - Redis client
  • moment@^2.30.1 - Date manipulation

Quick Start

1. Redis Setup

First, ensure Redis is running:

# Using Docker
docker run -d -p 6379:6379 redis:alpine

# Or install locally
# Ubuntu/Debian
sudo apt-get install redis-server

# macOS
brew install redis

2. Environment Configuration

# Redis configuration
REDIS_HOST=localhost
REDIS_PORT=6379
REDIS_PASSWORD=
REDIS_DB=0

# Job queue configuration
JOB_QUEUE_CONCURRENCY=5
JOB_QUEUE_RETRY_ATTEMPTS=3
JOB_QUEUE_RETRY_DELAY=1000

3. Basic Job Setup

import { WorkerJob, QueueService } from '@eyjs/jobs'
import { Injectable } from '@eyjs/core'

// Define job data interface
interface EmailJobData {
  to: string
  subject: string
  body: string
}

interface ProcessOrderJobData {
  orderId: string
  customerId: string
  items: any[]
}

// Job handlers
@Injectable()
export class EmailJobHandler {
  @WorkerJob('send-email')
  async handleEmailJob(data: EmailJobData) {
    console.log(`Sending email to ${data.to}: ${data.subject}`)
    
    // Simulate email sending
    await new Promise(resolve => setTimeout(resolve, 1000))
    
    console.log('Email sent successfully')
  }
}

@Injectable()
export class OrderJobHandler {
  @WorkerJob('process-order')
  async handleProcessOrder(data: ProcessOrderJobData) {
    console.log(`Processing order ${data.orderId}`)
    
    // Process the order
    await this.orderService.processOrder(data.orderId)
    
    // Send confirmation email
    await this.queueService.add('send-email', {
      to: data.customerId,
      subject: 'Order Confirmation',
      body: `Your order ${data.orderId} has been processed`
    })
  }
}

4. Using the Queue Service

import { QueueService } from '@eyjs/jobs'
import { Injectable } from '@eyjs/core'

@Injectable()
export class UserService {
  constructor(private readonly queueService: QueueService) {}

  async createUser(data: { email: string; name: string }) {
    const user = await this.userRepository.create(data)
    
    // Add job to queue
    await this.queueService.add('send-welcome-email', {
      userId: user.id,
      email: user.email,
      name: user.name
    })
    
    return user
  }

  async processBulkUsers(userIds: string[]) {
    // Add bulk job
    await this.queueService.addBulk('process-user', 
      userIds.map(id => ({ userId: id }))
    )
  }
}

API Reference

Core Classes

QueueService

Main service for managing job queues.

import { QueueService } from '@eyjs/jobs'

const queueService = new QueueService()

// Add a job
await queueService.add('job-name', { data: 'value' })

// Add a delayed job
await queueService.add('job-name', { data: 'value' }, { delay: 5000 })

// Add a scheduled job
await queueService.add('job-name', { data: 'value' }, { 
  repeat: { cron: '0 0 * * *' } 
})

@WorkerJob(jobName)

Decorator for marking methods as job handlers.

@WorkerJob('send-email')
async handleEmailJob(data: EmailJobData) {
  // Handle the job
}

Job Options

interface JobOptions {
  delay?: number           // Delay in milliseconds
  attempts?: number       // Number of retry attempts
  backoff?: {             // Backoff strategy
    type: 'fixed' | 'exponential'
    delay: number
  }
  removeOnComplete?: number // Keep N completed jobs
  removeOnFail?: number    // Keep N failed jobs
  repeat?: {               // Repeat options
    cron?: string         // Cron expression
    every?: number        // Interval in milliseconds
    tz?: string           // Timezone
  }
}

Usage Examples

Basic Job Processing

import { WorkerJob, QueueService } from '@eyjs/jobs'
import { Injectable } from '@eyjs/core'

interface NotificationJobData {
  userId: string
  message: string
  type: 'email' | 'sms' | 'push'
}

@Injectable()
export class NotificationJobHandler {
  constructor(
    private readonly emailService: EmailService,
    private readonly smsService: SmsService,
    private readonly pushService: PushService
  ) {}

  @WorkerJob('send-notification')
  async handleNotification(data: NotificationJobData) {
    switch (data.type) {
      case 'email':
        await this.emailService.send(data.userId, data.message)
        break
      case 'sms':
        await this.smsService.send(data.userId, data.message)
        break
      case 'push':
        await this.pushService.send(data.userId, data.message)
        break
    }
  }
}

// Usage in service
@Injectable()
export class NotificationService {
  constructor(private readonly queueService: QueueService) {}

  async sendNotification(
    userId: string, 
    message: string, 
    type: 'email' | 'sms' | 'push'
  ) {
    await this.queueService.add('send-notification', {
      userId,
      message,
      type
    })
  }
}

Scheduled Jobs

import { WorkerJob, QueueService } from '@eyjs/jobs'
import { Injectable } from '@eyjs/core'

@Injectable()
export class ScheduledJobHandler {
  @WorkerJob('daily-report')
  async handleDailyReport() {
    console.log('Generating daily report...')
    
    const report = await this.reportService.generateDailyReport()
    await this.emailService.sendReport(report)
    
    console.log('Daily report sent')
  }

  @WorkerJob('cleanup-old-data')
  async handleCleanup() {
    console.log('Cleaning up old data...')
    
    const deletedCount = await this.dataService.cleanupOldData()
    console.log(`Cleaned up ${deletedCount} records`)
  }
}

// Schedule jobs
@Injectable()
export class JobScheduler {
  constructor(private readonly queueService: QueueService) {}

  async scheduleJobs() {
    // Daily report at 9 AM
    await this.queueService.add('daily-report', {}, {
      repeat: { cron: '0 9 * * *' }
    })

    // Cleanup every Sunday at 2 AM
    await this.queueService.add('cleanup-old-data', {}, {
      repeat: { cron: '0 2 * * 0' }
    })

    // Health check every 5 minutes
    await this.queueService.add('health-check', {}, {
      repeat: { every: 5 * 60 * 1000 }
    })
  }
}

Job with Retry Logic

import { WorkerJob } from '@eyjs/jobs'
import { Injectable } from '@eyjs/core'

@Injectable()
export class ApiJobHandler {
  @WorkerJob('call-external-api')
  async handleApiCall(data: { url: string; payload: any }) {
    try {
      const response = await fetch(data.url, {
        method: 'POST',
        headers: { 'Content-Type': 'application/json' },
        body: JSON.stringify(data.payload)
      })

      if (!response.ok) {
        throw new Error(`API call failed: ${response.status}`)
      }

      return await response.json()
    } catch (error) {
      console.error('API call failed:', error)
      throw error // This will trigger retry if configured
    }
  }
}

// Add job with retry configuration
await this.queueService.add('call-external-api', {
  url: 'https://api.example.com/webhook',
  payload: { event: 'user.created', userId: '123' }
}, {
  attempts: 5,
  backoff: {
    type: 'exponential',
    delay: 2000
  }
})

Bulk Job Processing

import { WorkerJob, QueueService } from '@eyjs/jobs'
import { Injectable } from '@eyjs/core'

interface BulkEmailJobData {
  emails: string[]
  subject: string
  template: string
}

@Injectable()
export class BulkEmailJobHandler {
  @WorkerJob('send-bulk-email')
  async handleBulkEmail(data: BulkEmailJobData) {
    console.log(`Sending bulk email to ${data.emails.length} recipients`)
    
    const promises = data.emails.map(email => 
      this.emailService.sendTemplate(email, data.template, {
        subject: data.subject
      })
    )
    
    await Promise.all(promises)
    console.log('Bulk email sent successfully')
  }
}

// Usage
@Injectable()
export class MarketingService {
  constructor(private readonly queueService: QueueService) {}

  async sendNewsletter(emails: string[], subject: string) {
    await this.queueService.add('send-bulk-email', {
      emails,
      subject,
      template: 'newsletter'
    })
  }
}

Job Events Integration

import { WorkerJob } from '@eyjs/jobs'
import { Injectable } from '@eyjs/core'
import { EventBus } from '@eyjs/event-bus'

export class JobStartedEvent {
  constructor(
    public readonly jobId: string,
    public readonly jobName: string,
    public readonly data: any,
    public readonly timestamp: Date = new Date()
  ) {}
}

export class JobCompletedEvent {
  constructor(
    public readonly jobId: string,
    public readonly jobName: string,
    public readonly result: any,
    public readonly timestamp: Date = new Date()
  ) {}
}

export class JobFailedEvent {
  constructor(
    public readonly jobId: string,
    public readonly jobName: string,
    public readonly error: string,
    public readonly timestamp: Date = new Date()
  ) {}
}

@Injectable()
export class JobEventHandler {
  constructor(private readonly eventBus: EventBus) {}

  @WorkerJob('process-payment')
  async handlePaymentJob(data: { orderId: string; amount: number }) {
    // Emit job started event
    await this.eventBus.publish(new JobStartedEvent(
      'payment-job-123',
      'process-payment',
      data
    ))

    try {
      // Process payment
      const result = await this.paymentService.processPayment(data.orderId, data.amount)
      
      // Emit job completed event
      await this.eventBus.publish(new JobCompletedEvent(
        'payment-job-123',
        'process-payment',
        result
      ))
      
      return result
    } catch (error) {
      // Emit job failed event
      await this.eventBus.publish(new JobFailedEvent(
        'payment-job-123',
        'process-payment',
        error.message
      ))
      
      throw error
    }
  }
}

Advanced Usage

Custom Job Processor

import { WorkerService } from '@eyjs/jobs'
import { Injectable } from '@eyjs/core'

@Injectable()
export class CustomWorkerService extends WorkerService {
  constructor() {
    super({
      connection: {
        host: process.env.REDIS_HOST || 'localhost',
        port: parseInt(process.env.REDIS_PORT || '6379'),
        password: process.env.REDIS_PASSWORD
      },
      concurrency: parseInt(process.env.JOB_QUEUE_CONCURRENCY || '5')
    })
  }

  async processJob(jobName: string, data: any) {
    // Custom job processing logic
    console.log(`Processing custom job: ${jobName}`)
    
    // Call parent method for standard processing
    return await super.processJob(jobName, data)
  }
}

Job Monitoring

import { QueueService } from '@eyjs/jobs'
import { Injectable } from '@eyjs/core'

@Injectable()
export class JobMonitoringService {
  constructor(private readonly queueService: QueueService) {}

  async getJobStats() {
    const queues = await this.queueService.getQueues()
    const stats = {}

    for (const queue of queues) {
      const [waiting, active, completed, failed] = await Promise.all([
        queue.getWaiting(),
        queue.getActive(),
        queue.getCompleted(),
        queue.getFailed()
      ])

      stats[queue.name] = {
        waiting: waiting.length,
        active: active.length,
        completed: completed.length,
        failed: failed.length
      }
    }

    return stats
  }

  async getJobDetails(jobId: string) {
    const queues = await this.queueService.getQueues()
    
    for (const queue of queues) {
      const job = await queue.getJob(jobId)
      if (job) {
        return {
          id: job.id,
          name: job.name,
          data: job.data,
          progress: job.progress,
          state: await job.getState(),
          createdAt: new Date(job.timestamp),
          processedAt: job.processedOn ? new Date(job.processedOn) : null,
          failedReason: job.failedReason
        }
      }
    }
    
    return null
  }
}

Job Priority and Rate Limiting

import { WorkerJob, QueueService } from '@eyjs/jobs'
import { Injectable } from '@eyjs/core'

@Injectable()
export class PriorityJobHandler {
  @WorkerJob('high-priority-task')
  async handleHighPriorityTask(data: any) {
    console.log('Processing high priority task')
    // Process high priority task
  }

  @WorkerJob('low-priority-task')
  async handleLowPriorityTask(data: any) {
    console.log('Processing low priority task')
    // Process low priority task
  }
}

// Usage with priority
await this.queueService.add('high-priority-task', data, {
  priority: 10 // Higher number = higher priority
})

await this.queueService.add('low-priority-task', data, {
  priority: 1 // Lower number = lower priority
})

// Rate limiting
await this.queueService.add('api-call', data, {
  rateLimiter: {
    max: 100,      // Maximum 100 jobs
    duration: 60000 // Per minute
  }
})

Configuration

Redis Configuration

import { QueueService } from '@eyjs/jobs'

const queueService = new QueueService({
  connection: {
    host: process.env.REDIS_HOST || 'localhost',
    port: parseInt(process.env.REDIS_PORT || '6379'),
    password: process.env.REDIS_PASSWORD,
    db: parseInt(process.env.REDIS_DB || '0')
  },
  defaultJobOptions: {
    removeOnComplete: 100,
    removeOnFail: 50,
    attempts: 3,
    backoff: {
      type: 'exponential',
      delay: 2000
    }
  }
})

Environment Variables

# Redis Configuration
REDIS_HOST=localhost
REDIS_PORT=6379
REDIS_PASSWORD=
REDIS_DB=0

# Job Queue Configuration
JOB_QUEUE_CONCURRENCY=5
JOB_QUEUE_RETRY_ATTEMPTS=3
JOB_QUEUE_RETRY_DELAY=1000
JOB_QUEUE_REMOVE_ON_COMPLETE=100
JOB_QUEUE_REMOVE_ON_FAIL=50

# Monitoring
JOB_MONITORING_ENABLED=true
JOB_MONITORING_PORT=3001

Testing

Unit Tests

import { describe, it, expect, beforeEach } from 'bun:test'
import { QueueService } from '@eyjs/jobs'

describe('QueueService', () => {
  let queueService: QueueService

  beforeEach(async () => {
    queueService = new QueueService({
      connection: {
        host: 'localhost',
        port: 6379
      }
    })
  })

  it('should add job to queue', async () => {
    const job = await queueService.add('test-job', { data: 'test' })
    
    expect(job).toBeDefined()
    expect(job.name).toBe('test-job')
  })

  it('should process job', async () => {
    let processed = false
    
    queueService.registerHandler('test-job', async (data) => {
      processed = true
      return data
    })

    await queueService.add('test-job', { data: 'test' })
    
    // Wait for job to be processed
    await new Promise(resolve => setTimeout(resolve, 100))
    
    expect(processed).toBe(true)
  })
})

Integration Tests

import { describe, it, expect } from 'bun:test'

describe('Job Integration', () => {
  it('should process email job', async () => {
    const emailService = container.get(EmailService)
    const queueService = container.get(QueueService)
    
    const sendEmailSpy = spyOn(emailService, 'sendEmail')
    
    await queueService.add('send-email', {
      to: '[email protected]',
      subject: 'Test',
      body: 'Test email'
    })
    
    // Wait for job processing
    await new Promise(resolve => setTimeout(resolve, 1000))
    
    expect(sendEmailSpy).toHaveBeenCalledWith('[email protected]', 'Test', 'Test email')
  })
})

Performance

Optimization Tips

  1. Batch Processing
@WorkerJob('process-batch')
async handleBatch(data: { items: any[] }) {
  // Process multiple items in one job
  const results = await Promise.all(
    data.items.map(item => this.processItem(item))
  )
  return results
}
  1. Job Concurrency
const queueService = new QueueService({
  concurrency: 10 // Process up to 10 jobs simultaneously
})
  1. Memory Management
const queueService = new QueueService({
  defaultJobOptions: {
    removeOnComplete: 50,  // Keep only 50 completed jobs
    removeOnFail: 20       // Keep only 20 failed jobs
  }
})

Troubleshooting

Common Issues

1. "Redis connection failed" error

  • Ensure Redis server is running
  • Check Redis connection configuration
  • Verify network connectivity

2. Jobs not being processed

  • Check if workers are running
  • Verify job handlers are registered
  • Check Redis connection

3. Memory usage issues

  • Configure removeOnComplete and removeOnFail
  • Monitor job queue sizes
  • Use job batching for large datasets

Debug Mode

Enable debug logging:

DEBUG=eyjs:jobs

Contributing

Contributions are welcome! Please read our Contributing Guide for details.

License

This project is licensed under the MIT License - see the LICENSE file for details.

Support