npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

@zintrust/workers

v0.1.52

Published

Enterprise-grade worker management system for ZinTrust framework. Provides comprehensive background job processing, monitoring, resilience, and orchestration capabilities.

Downloads

2,588

Readme

@zintrust/workers

Enterprise-grade worker management system for ZinTrust framework. Provides comprehensive background job processing, monitoring, resilience, and orchestration capabilities.

npm version License: MIT

Features

  • 🚀 Core Infrastructure: Worker factory, metrics, registry, distributed locks
  • 💪 Resilience: Circuit breakers, dead letter queues, auto-scaling
  • 📊 Monitoring: Health checks, resource monitoring, observability
  • 🔐 Compliance: GDPR support, data subject management
  • 🎯 Advanced: Canary deployments, multi-datacenter orchestration, versioning
  • 🔌 Extensible: Plugin system, multi-queue support, custom workers

Installation

npm install @zintrust/workers

Peer Dependencies

npm install @zintrust/core

Optional Dependencies

For AI-powered anomaly detection (if using AnomalyDetection module):

npm install brain.js ml.js simple-statistics
# Optional: TensorFlow.js for advanced ML
npm install @tensorflow/tfjs-node

Quick Start

Basic Worker Creation

import { createQueueWorker } from '@zintrust/workers';

const emailWorker = await createQueueWorker({
  name: 'email-sender',
  queueName: 'emails',
  handler: async (job) => {
    Logger.info('Processing email:', job.data);
    // Send email logic here
    return { sent: true };
  },
  config: {
    concurrency: 5,
    maxRetries: 3,
  },
});

Using Worker Factory

import { WorkerFactory } from '@zintrust/workers';

const worker = WorkerFactory.create({
  name: 'data-processor',
  type: 'queue',
  handler: async (job) => {
    // Process job
  },
});

await worker.start();

Core Modules

WorkerFactory

Central factory for creating and managing workers.

import { WorkerFactory } from '@zintrust/workers';

// Create worker
const worker = WorkerFactory.create({
  name: 'my-worker',
  type: 'queue',
  handler: async (job) => {
    /* ... */
  },
});

// List all workers
const workers = WorkerFactory.listWorkers();

// Get worker by name
const myWorker = WorkerFactory.getWorker('my-worker');

// Remove worker
await WorkerFactory.removeWorker('my-worker');

WorkerMetrics

Collect and analyze worker performance metrics.

import { WorkerMetrics } from '@zintrust/workers';

// Record job metrics
WorkerMetrics.recordJob('email-sender', {
  duration: 150,
  success: true,
});

// Get metrics
const metrics = WorkerMetrics.getMetrics('email-sender');
Logger.info(metrics.totalJobs, metrics.avgDuration, metrics.errorRate);

// Get historical data
const history = WorkerMetrics.getHistory('email-sender', {
  from: new Date('2026-01-01'),
  to: new Date(),
});

WorkerRegistry

Register and manage worker metadata.

import { WorkerRegistry } from '@zintrust/workers';

// Register worker
WorkerRegistry.register({
  name: 'email-sender',
  version: '1.0.0',
  type: 'queue',
  metadata: {
    description: 'Sends transactional emails',
    author: '[email protected]',
  },
});

// Get worker info
const info = WorkerRegistry.get('email-sender');

// List all registered workers
const allWorkers = WorkerRegistry.list();

// Check if worker exists
const exists = WorkerRegistry.has('email-sender');

Resilience & Recovery

Circuit Breaker

Prevent cascading failures with circuit breaker pattern.

import { CircuitBreaker } from '@zintrust/workers';

// Configure circuit breaker
CircuitBreaker.configure('external-api', {
  failureThreshold: 5,
  resetTimeout: 60000, // 1 minute
  halfOpenRequests: 3,
});

// Execute with circuit breaker
const result = await CircuitBreaker.execute('external-api', async () => {
  // Call external API
  return await fetch('https://api.example.com');
});

// Get circuit state
const state = CircuitBreaker.getState('external-api');
Logger.info(state); // 'closed' | 'open' | 'half-open'

// Reset circuit
CircuitBreaker.reset('external-api');

Dead Letter Queue

Handle failed jobs gracefully.

import { DeadLetterQueue } from '@zintrust/workers';

// Move job to DLQ
await DeadLetterQueue.add('email-sender', {
  jobId: 'job-123',
  data: { email: '[email protected]' },
  error: 'SMTP connection failed',
  attempts: 3,
});

// List failed jobs
const failedJobs = await DeadLetterQueue.list('email-sender');

// Retry failed job
await DeadLetterQueue.retry('email-sender', 'job-123');

// Get statistics
const stats = await DeadLetterQueue.getStats('email-sender');
Logger.info(stats.totalFailed, stats.retrySuccess);

Auto-Scaling

Automatically scale workers based on load.

import { AutoScaler } from '@zintrust/workers';

// Configure auto-scaling
AutoScaler.configure('email-sender', {
  minWorkers: 2,
  maxWorkers: 10,
  scaleUpThreshold: 80, // CPU %
  scaleDownThreshold: 20,
  cooldownPeriod: 300000, // 5 minutes
});

// Start auto-scaling
await AutoScaler.start('email-sender');

// Get scaling status
const status = AutoScaler.getStatus('email-sender');
Logger.info(status.currentWorkers, status.targetWorkers);

// Stop auto-scaling
await AutoScaler.stop('email-sender');

Monitoring & Observability

Health Monitor

Monitor worker health with configurable checks.

import { HealthMonitor } from '@zintrust/workers';

// Configure health checks
HealthMonitor.configure('email-sender', {
  checks: ['memory', 'cpu', 'queue-depth'],
  interval: 30000, // 30 seconds
  thresholds: {
    memory: 80, // percent
    cpu: 70,
    queueDepth: 1000,
  },
});

// Start monitoring
await HealthMonitor.start('email-sender');

// Get health status
const health = HealthMonitor.getHealth('email-sender');
Logger.info(health.status, health.checks);

// Get health history
const history = HealthMonitor.getHistory('email-sender', {
  hours: 24,
});

Resource Monitor

Track system resource usage.

import { ResourceMonitor } from '@zintrust/workers';

// Start monitoring
ResourceMonitor.start();

// Get current resource usage
const usage = ResourceMonitor.getCurrentUsage();
Logger.info(usage.cpu, usage.memory, usage.disk);

// Get resource trends
const trends = ResourceMonitor.getTrends({ hours: 1 });

// Set resource alerts
ResourceMonitor.setAlert({
  metric: 'memory',
  threshold: 85,
  callback: (usage) => {
    console.warn('Memory usage high:', usage);
  },
});

Observability

Distributed tracing and metrics collection.

import { Observability } from '@zintrust/workers';

// Start a trace
const span = Observability.startSpan('email-send', {
  attributes: {
    'job.id': 'job-123',
    'job.type': 'email',
  },
});

// Add events
span.addEvent('smtp-connect');
span.addEvent('email-sent');

// End trace
span.end();

// Record custom metric
Observability.recordMetric('emails_sent', 1, {
  labels: { type: 'transactional' },
});

// Get metrics (Prometheus format)
const metrics = await Observability.getMetrics();

Advanced Features

Canary Deployments

Safely deploy new worker versions.

import { CanaryController } from '@zintrust/workers';

// Start canary deployment
await CanaryController.start('email-sender', {
  newVersion: '2.0.0',
  trafficPercentage: 10, // 10% to new version
  duration: 600000, // 10 minutes
  successCriteria: {
    maxErrorRate: 2, // percent
    minSuccessRate: 95,
  },
});

// Monitor canary
const status = CanaryController.getStatus('email-sender');
Logger.info(status.trafficPercentage, status.metrics);

// Promote or rollback
if (status.success) {
  await CanaryController.promote('email-sender');
} else {
  await CanaryController.rollback('email-sender');
}

Multi-Datacenter Orchestration

Deploy workers across multiple datacenters.

import { DatacenterOrchestrator } from '@zintrust/workers';

// Register datacenter
DatacenterOrchestrator.registerDatacenter({
  id: 'us-east-1',
  region: 'us-east',
  capacity: 100,
  latency: 50, // ms
});

// Place worker
const placement = await DatacenterOrchestrator.placeWorker('email-sender', {
  requirements: {
    minCapacity: 10,
    maxLatency: 100,
    preferredRegions: ['us-east', 'us-west'],
  },
});

Logger.info('Worker placed in:', placement.datacenterId);

// Get topology
const topology = DatacenterOrchestrator.getTopology();

Worker Versioning

Manage multiple worker versions.

import { WorkerVersioning } from '@zintrust/workers';

// Register version
WorkerVersioning.registerVersion('email-sender', {
  version: '2.0.0',
  handler: async (job) => {
    /* new logic */
  },
  metadata: {
    releaseDate: new Date(),
    changes: ['Added retry logic', 'Improved error handling'],
  },
});

// List versions
const versions = WorkerVersioning.listVersions('email-sender');

// Get active version
const active = WorkerVersioning.getActiveVersion('email-sender');

// Deprecate version
await WorkerVersioning.deprecateVersion('email-sender', '1.0.0');

Plugin System

Extend worker functionality with plugins.

import { PluginManager } from '@zintrust/workers';

// Register plugin
PluginManager.register('email-sender', {
  name: 'rate-limiter',
  hooks: {
    beforeJob: async (job) => {
      // Rate limiting logic
      if (await isRateLimited(job.data.email)) {
        throw new Error('Rate limit exceeded');
      }
    },
    afterJob: async (job, result) => {
      // Track sent emails
      await trackEmail(job.data.email);
    },
  },
});

// Enable plugin
await PluginManager.enable('email-sender', 'rate-limiter');

// List plugins
const plugins = PluginManager.list('email-sender');

// Disable plugin
await PluginManager.disable('email-sender', 'rate-limiter');

Multi-Queue Worker

Process jobs from multiple queues.

import { MultiQueueWorker } from '@zintrust/workers';

// Create multi-queue worker
const worker = await MultiQueueWorker.create({
  name: 'notifications',
  queues: [
    { name: 'emails', priority: 1, concurrency: 5 },
    { name: 'sms', priority: 2, concurrency: 3 },
    { name: 'push', priority: 3, concurrency: 10 },
  ],
  handler: async (job, queueName) => {
    Logger.info(`Processing ${queueName} job:`, job.data);
    // Route to appropriate handler
  },
});

// Get queue stats
const stats = await worker.getQueueStats('emails');
Logger.info(stats.pending, stats.active, stats.completed);

// Pause queue
await worker.pauseQueue('sms');

// Resume queue
await worker.resumeQueue('sms');

Compliance & Security

Compliance Manager (GDPR)

Handle GDPR compliance for worker data.

import { ComplianceManager } from '@zintrust/workers';

// Register data subject
ComplianceManager.registerDataSubject({
  subjectId: 'user-123',
  type: 'user',
  metadata: { email: '[email protected]' },
});

// Record consent
ComplianceManager.recordConsent({
  subjectId: 'user-123',
  purpose: 'email-marketing',
  granted: true,
});

// Check compliance
const compliant = await ComplianceManager.checkCompliance('email-sender', {
  subjectId: 'user-123',
});

// Create access request
await ComplianceManager.createAccessRequest({
  subjectId: 'user-123',
  type: 'data-export',
});

// Get audit logs
const logs = await ComplianceManager.getAuditLogs({
  subjectId: 'user-123',
  from: new Date('2026-01-01'),
});

Specialized Workers

Broadcast Worker

Send jobs to multiple handlers.

import { BroadcastWorker } from '@zintrust/workers';

const worker = await BroadcastWorker.create({
  name: 'system-events',
  handlers: [
    async (job) => {
      /* Log to file */
    },
    async (job) => {
      /* Send to analytics */
    },
    async (job) => {
      /* Update dashboard */
    },
  ],
});

await worker.broadcast({ event: 'user-signup', userId: '123' });

Notification Worker

Send notifications through multiple channels.

import { NotificationWorker } from '@zintrust/workers';

const worker = await NotificationWorker.create({
  name: 'notifications',
  channels: {
    email: { provider: 'sendgrid', apiKey: 'xxx' },
    sms: { provider: 'twilio', apiKey: 'yyy' },
    push: { provider: 'fcm', apiKey: 'zzz' },
  },
});

await worker.send({
  userId: '123',
  channels: ['email', 'push'],
  subject: 'Welcome!',
  message: 'Thanks for signing up.',
});

Utilities

Cluster Lock

Distributed locking for worker coordination.

import { ClusterLock } from '@zintrust/workers';

// Acquire lock
const acquired = await ClusterLock.acquire('critical-section', {
  ttl: 30000, // 30 seconds
  waitTimeout: 10000, // Wait up to 10 seconds
});

if (acquired) {
  try {
    // Critical section
    await processExclusiveTask();
  } finally {
    // Release lock
    await ClusterLock.release('critical-section');
  }
}

Priority Queue

Handle jobs with different priorities.

import { PriorityQueue } from '@zintrust/workers';

// Add high priority job
await PriorityQueue.add('tasks', {
  data: { taskId: '123' },
  priority: 1, // Higher number = higher priority
});

// Add normal priority job
await PriorityQueue.add('tasks', {
  data: { taskId: '456' },
  priority: 5,
});

// Jobs with priority 1 will be processed before priority 5

HTTP API

The package includes a full REST API for worker management. Register routes in your ZinTrust application:

import { registerWorkerRoutes } from '@zintrust/workers';
import { Router } from '@zintrust/core';

// Register all worker routes
registerWorkerRoutes(Router);

// Routes available at:
// GET    /api/workers - List workers
// POST   /api/workers/create - Create worker
// GET    /api/workers/:name - Get worker details
// POST   /api/workers/:name/start - Start worker
// POST   /api/workers/:name/stop - Stop worker
// POST   /api/workers/:name/restart - Restart worker
// DELETE /api/workers/:name - Remove worker
// ... and many more endpoints

Processor specs can be file paths or URL specs (recommended for production). Remote processors must export a named ZinTrustProcessor function.

Workers support activeStatus to pause without deletion; inactive workers do not auto-start.

See the API Reference section for all available endpoints.

Lifecycle Management

Worker Initialization

Initialize the worker system at application startup:

import { WorkerInit } from '@zintrust/workers';

// Initialize workers
await WorkerInit.initialize({
  redis: {
    host: 'localhost',
    port: 6379,
  },
  autoStart: true, // Auto-start registered workers
  healthChecks: true, // Enable health monitoring
});

Graceful Shutdown

Shutdown workers gracefully:

import { WorkerShutdown } from '@zintrust/workers';

// Shutdown all workers
await WorkerShutdown.shutdown({
  gracePeriod: 30000, // Wait up to 30 seconds for jobs to complete
  force: false, // Don't force terminate
});

Configuration

Environment Variables

# Redis connection
REDIS_HOST=localhost
REDIS_PORT=6379
REDIS_PASSWORD=secret

# Worker API URL (for HTTP clients)
WORKER_API_URL=http://localhost:3001

# Worker connection timeout (milliseconds)
WORKER_CONNECTION_TIMEOUT=5000

# Monitoring
ENABLE_METRICS=true
ENABLE_HEALTH_CHECKS=true

# Observability
OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:4318

# Processor spec resolver
REMOTE_PROCESSOR_ALLOWLIST=wk.zintrust.com
PROCESSOR_FETCH_TIMEOUT=30000
PROCESSOR_FETCH_MAX_SIZE=524288
PROCESSOR_FETCH_RETRY_ATTEMPTS=3
PROCESSOR_FETCH_RETRY_BACKOFF_MS=1000
PROCESSOR_CACHE_DEFAULT_TTL=3600
PROCESSOR_CACHE_MAX_TTL=604800
PROCESSOR_CACHE_MAX_SIZE=52428800

Testing

Unit Tests

import { describe, it, expect } from 'vitest';
import { WorkerFactory } from '@zintrust/workers';

describe('WorkerFactory', () => {
  it('should create worker', () => {
    const worker = WorkerFactory.create({
      name: 'test-worker',
      type: 'queue',
      handler: async (job) => ({ success: true }),
    });

    expect(worker.name).toBe('test-worker');
  });
});

API Reference

Core Worker Operations

POST /api/workers/create - Create worker
POST /api/workers/:name/start - Start worker
POST /api/workers/:name/stop - Stop worker
POST /api/workers/:name/restart - Restart worker
POST /api/workers/:name/pause - Pause worker
POST /api/workers/:name/resume - Resume worker
DELETE /api/workers/:name - Remove worker

Worker Information

GET /api/workers - List all workers
GET /api/workers/:name - Get worker details
GET /api/workers/:name/status - Worker status
GET /api/workers/:name/metrics - Performance metrics
GET /api/workers/:name/health - Health metrics

Health Monitoring

POST /api/workers/:name/monitoring/start - Start monitoring
POST /api/workers/:name/monitoring/stop - Stop monitoring
GET /api/workers/:name/monitoring/history - Health history
GET /api/workers/:name/monitoring/trend - Health trend
PUT /api/workers/:name/monitoring/config - Update config

Versioning

POST /api/workers/:name/versions - Register version
GET /api/workers/:name/versions - List versions
GET /api/workers/:name/versions/:version - Get version
POST /api/workers/:name/versions/:version/deprecate - Deprecate
POST /api/workers/:name/versions/:version/activate - Activate
POST /api/workers/:name/versions/:version/deactivate - Deactivate

See the complete API documentation for all endpoints.

Performance

  • Handles 10,000+ jobs per second per worker
  • Sub-millisecond job routing latency
  • Horizontal scaling across multiple nodes
  • Minimal memory footprint (~50MB per worker)
  • Efficient connection pooling

Best Practices

  1. Use appropriate concurrency: Don't over-provision workers
  2. Enable health checks: Monitor worker health proactively
  3. Implement circuit breakers: Prevent cascading failures
  4. Use DLQ: Handle failures gracefully
  5. Monitor metrics: Track performance and errors
  6. Version workers: Use versioning for safe deployments
  7. Test canaries: Always test new versions with canary deployments
  8. Set resource limits: Prevent resource exhaustion
  9. Enable observability: Use distributed tracing for debugging
  10. Comply with regulations: Use ComplianceManager for GDPR

Troubleshooting

Worker not starting

// Check worker status
const status = WorkerFactory.getWorker('my-worker').status;
Logger.info(status);

// Check health
const health = HealthMonitor.getHealth('my-worker');
Logger.info(health);

Jobs stuck in queue

// Check queue depth
const metrics = WorkerMetrics.getMetrics('my-worker');
Logger.info(metrics.queueDepth);

// Check worker load
const load = ResourceMonitor.getCurrentUsage();
Logger.info(load);

High error rate

// Check DLQ
const failed = await DeadLetterQueue.list('my-worker');
Logger.info(failed);

// Check circuit breaker state
const state = CircuitBreaker.getState('my-worker');
Logger.info(state);

Contributing

See CONTRIBUTING.md for development guidelines.

License

MIT © ZinTrust

Support

  • Documentation: https://doc.zintrust.com/workers
  • Issues: https://github.com/ZinTrust/zintrust/issues
  • Discord: https://discord.gg/zintrust

Status: Production-ready (26/32 tasks completed)

Coming Soon:

  • SLA Monitoring
  • AI-Powered Anomaly Detection
  • Chaos Engineering Tools
  • Telemetry Dashboard UI
  • Queue-Monitor Worker UI