npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2025 – Pkg Stats / Ryan Hefner

@memberjunction/queue

v2.128.0

Published

MemberJunction: Queue Library for managing server side queues

Readme

@memberjunction/queue

A flexible queue management system for MemberJunction applications that enables background task processing, job scheduling, and asynchronous execution with database persistence.

Overview

The @memberjunction/queue package provides a robust framework for implementing persistent queues in MemberJunction applications. It offers:

  • Database-backed task persistence
  • Automatic queue creation and management
  • Concurrent task processing with configurable limits
  • Heartbeat monitoring for process health
  • Type-safe task definitions
  • Extensible queue implementations

Installation

npm install @memberjunction/queue

Dependencies

This package requires the following MemberJunction packages:

  • @memberjunction/core - Core functionality and entity management
  • @memberjunction/global - Global utilities and class registration
  • @memberjunction/core-entities - Entity type definitions
  • @memberjunction/ai - AI functionality (for AI-related queues)
  • @memberjunction/aiengine - AI Engine integration

Additional dependencies:

  • uuid - For generating unique identifiers

Core Components

TaskBase

The TaskBase class represents an individual task in a queue:

export class TaskBase {
  constructor(
    taskRecord: QueueTaskEntity,
    data: any,
    options: TaskOptions
  )
  
  // Properties
  ID: string              // Unique task identifier
  Status: TaskStatus      // Current task status
  Data: any              // Task payload data
  Options: TaskOptions   // Task configuration
  TaskRecord: QueueTaskEntity // Database entity
}

TaskStatus

Available task statuses:

export const TaskStatus = {
  Pending: 'Pending',
  InProgress: 'InProgress',
  Complete: 'Complete',
  Failed: 'Failed',
  Cancelled: 'Cancelled',
} as const;

QueueBase

The abstract QueueBase class serves as the foundation for all queue implementations:

export abstract class QueueBase {
  constructor(
    QueueRecord: QueueEntity,
    QueueTypeID: string,
    ContextUser: UserInfo
  )
  
  // Public methods
  AddTask(task: TaskBase): boolean
  FindTask(ID: string): TaskBase
  
  // Protected abstract method to implement
  protected abstract ProcessTask(
    task: TaskBase, 
    contextUser: UserInfo
  ): Promise<TaskResult>
}

QueueManager

The QueueManager is a singleton that manages all active queues:

export class QueueManager {
  // Singleton access
  static get Instance(): QueueManager
  
  // Static methods
  static async Config(contextUser: UserInfo): Promise<void>
  static async AddTask(
    QueueType: string,
    data: any,
    options: any,
    contextUser: UserInfo
  ): Promise<TaskBase | undefined>
  
  // Instance methods
  async AddTask(
    QueueTypeID: string,
    data: any,
    options: any,
    contextUser: UserInfo
  ): Promise<TaskBase | undefined>
}

TaskResult

Structure returned by task processing:

export class TaskResult {
  success: boolean      // Whether task completed successfully
  userMessage: string   // User-friendly message
  output: any          // Task output data
  exception: any       // Exception details if failed
}

Usage Examples

Basic Queue Implementation

Create a custom queue by extending QueueBase:

import { QueueBase, TaskBase, TaskResult } from '@memberjunction/queue';
import { RegisterClass } from '@memberjunction/global';
import { UserInfo } from '@memberjunction/core';

// Register your queue with a specific queue type name
@RegisterClass(QueueBase, 'Email Notification')
export class EmailNotificationQueue extends QueueBase {
  protected async ProcessTask(
    task: TaskBase, 
    contextUser: UserInfo
  ): Promise<TaskResult> {
    try {
      // Extract task data
      const { recipient, subject, body } = task.Data;
      
      // Implement your email sending logic here
      console.log(`Sending email to ${recipient}`);
      console.log(`Subject: ${subject}`);
      
      // Simulate email sending
      await this.sendEmail(recipient, subject, body);
      
      // Return success result
      return {
        success: true,
        userMessage: 'Email sent successfully',
        output: { sentAt: new Date() },
        exception: null
      };
    } catch (error) {
      // Return failure result
      return {
        success: false,
        userMessage: `Failed to send email: ${error.message}`,
        output: null,
        exception: error
      };
    }
  }
  
  private async sendEmail(to: string, subject: string, body: string) {
    // Your email service integration here
  }
}

Adding Tasks to Queue

import { QueueManager } from '@memberjunction/queue';
import { UserInfo } from '@memberjunction/core';

// Initialize queue manager (typically done once at app startup)
await QueueManager.Config(contextUser);

// Add a task using queue type name
const task = await QueueManager.AddTask(
  'Email Notification',  // Queue type name
  {                     // Task data
    recipient: '[email protected]',
    subject: 'Welcome to MemberJunction',
    body: 'Thank you for joining!'
  },
  {                     // Task options
    priority: 1
  },
  contextUser
);

if (task) {
  console.log(`Task created with ID: ${task.ID}`);
}

AI Action Queue Example

The package includes built-in queues for AI operations:

import { AIActionQueue, EntityAIActionQueue } from '@memberjunction/queue';

// These queues are automatically registered and available for use
// Add an AI action task
const aiTask = await QueueManager.AddTask(
  'AI Action',
  {
    actionName: 'GenerateText',
    prompt: 'Write a product description',
    parameters: { maxTokens: 100 }
  },
  {},
  contextUser
);

// Add an entity-specific AI action
const entityAITask = await QueueManager.AddTask(
  'Entity AI Action',
  {
    entityName: 'Products',
    entityID: 123,
    actionName: 'GenerateDescription'
  },
  {},
  contextUser
);

Database Schema

The queue system requires the following database tables:

Queue Types Table (__mj.QueueType)

Stores definitions of different queue types (e.g., "Email Notification", "AI Action")

Queues Table (__mj.Queue)

Tracks active queue instances with process information:

  • Queue type reference
  • Process details (PID, platform, hostname)
  • Heartbeat timestamp
  • Network information

Queue Tasks Table (__mj.QueueTask)

Stores individual tasks:

  • Queue reference
  • Task status
  • Task data (JSON)
  • Task options (JSON)
  • Output and error information

Process Management

The QueueManager automatically captures process information for monitoring:

  • Process ID (PID)
  • Platform and version
  • Working directory
  • Network interfaces
  • Operating system details
  • User information
  • Heartbeat timestamps

This information helps track queue health and enables failover scenarios.

Configuration

Queue behavior can be configured through the implementation:

export class CustomQueue extends QueueBase {
  private _maxTasks = 5;        // Maximum concurrent tasks
  private _checkInterval = 500;  // Check interval in milliseconds
  
  // Override these values in your constructor
  constructor(QueueRecord: QueueEntity, QueueTypeID: string, ContextUser: UserInfo) {
    super(QueueRecord, QueueTypeID, ContextUser);
    // Customize queue behavior
    this._maxTasks = 10;
    this._checkInterval = 1000;
  }
}

Best Practices

  1. Task Data Structure: Keep task data serializable as JSON
  2. Error Handling: Always return proper TaskResult with error details
  3. Queue Registration: Use @RegisterClass decorator for automatic registration
  4. Idempotency: Design tasks to be safely retryable
  5. Resource Cleanup: Clean up resources in finally blocks
  6. Monitoring: Check heartbeat timestamps for queue health

Integration with MemberJunction

The queue system integrates seamlessly with:

  • Entity System: Use entities for task data and processing
  • User Context: All operations respect user permissions
  • Global Registry: Automatic queue discovery via class registration
  • AI Engine: Built-in support for AI task processing

Build & Development

# Build the package
npm run build

# Development mode with auto-reload
npm run start

# TypeScript compilation only
npm run build

License

ISC