npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

@unrdf/yawl-queue

v26.4.4

Published

Distributed YAWL workflow execution using BullMQ and Redis

Readme

@unrdf/yawl-queue

Distributed YAWL workflow execution using BullMQ and Redis.

Overview

@unrdf/yawl-queue bridges UNRDF's YAWL workflow engine with BullMQ's robust distributed queue system, enabling:

  • Distributed Task Execution: Execute YAWL workflows across multiple worker processes
  • Redis-Backed State: Reliable distributed coordination and state management
  • Intelligent Retry Policies: Leverage YAWL cancellation regions for retry logic
  • Cryptographic Receipts: Maintain YAWL's BLAKE3-based proof chains
  • Priority & Delay: Fine-grained control over task scheduling
  • Worker Pools: Scale horizontally with multiple workers

Architecture

┌─────────────────────────────────────────────────────────────┐
│                    YAWLQueueAdapter                         │
│  ┌──────────────┐      ┌──────────────┐                    │
│  │ YAWL Engine  │◄────►│  BullMQ      │                    │
│  │              │      │  Queue       │                    │
│  │ - Workflows  │      │              │                    │
│  │ - Cases      │      │ - Job Queue  │                    │
│  │ - Receipts   │      │ - Priority   │                    │
│  └──────────────┘      └──────┬───────┘                    │
│                               │                             │
└───────────────────────────────┼─────────────────────────────┘
                                │
                        ┌───────▼────────┐
                        │  Redis         │
                        │  - State       │
                        │  - Job Data    │
                        │  - Metadata    │
                        └───────┬────────┘
                                │
                 ┌──────────────┼──────────────┐
                 │              │              │
            ┌────▼────┐    ┌────▼────┐   ┌────▼────┐
            │ Worker 1│    │ Worker 2│   │ Worker 3│
            │         │    │         │   │         │
            │ Process │    │ Process │   │ Process │
            │ YAWL    │    │ YAWL    │   │ YAWL    │
            │ Tasks   │    │ Tasks   │   │ Tasks   │
            └─────────┘    └─────────┘   └─────────┘

Installation

pnpm add @unrdf/yawl-queue

Prerequisites

  • Node.js >= 18.0.0
  • Redis >= 6.0.0
  • pnpm >= 7.0.0

Quick Start

1. Create a Workflow

import { createWorkflow } from '@unrdf/yawl';

const workflow = createWorkflow({
  id: 'data-processing',
  name: 'Data Processing Pipeline',
  tasks: [
    { id: 'fetch', name: 'Fetch Data', priority: 10 },
    { id: 'validate', name: 'Validate', priority: 8 },
    { id: 'transform', name: 'Transform', priority: 5 },
    { id: 'store', name: 'Store Results', priority: 9 },
  ],
  flows: [
    { from: 'fetch', to: 'validate' },
    { from: 'validate', to: 'transform' },
    { from: 'transform', to: 'store' },
  ],
});

2. Setup the Adapter

import { YAWLQueueAdapter } from '@unrdf/yawl-queue';

const adapter = new YAWLQueueAdapter({
  redis: {
    host: 'localhost',
    port: 6379,
  },
  queueName: 'data-processing',
  defaultJobOptions: {
    attempts: 3,
    backoff: {
      type: 'exponential',
      delay: 1000,
    },
  },
});

await adapter.registerWorkflow(workflow);

3. Create Workers

// Define task handlers
const taskHandler = async (job, task) => {
  const taskId = task.taskDefId || task.id;

  switch (taskId) {
    case 'fetch':
      return { data: await fetchData() };
    case 'validate':
      return { valid: validateData(job.data.input) };
    case 'transform':
      return { transformed: transformData(job.data.input) };
    case 'store':
      await storeData(job.data.input);
      return { stored: true };
  }
};

// Create worker pool (3 workers)
for (let i = 0; i < 3; i++) {
  adapter.createWorker({
    concurrency: 2,
    taskHandler,
  });
}

4. Execute Workflow

const { caseId, jobId } = await adapter.executeCase('data-processing', {
  source: 'database',
  batchSize: 100,
});

console.log(`Case ${caseId} started with job ${jobId}`);

// Monitor status
const status = await adapter.getCaseStatus(caseId);
console.log(status);
// {
//   caseId: '...',
//   workflowId: 'data-processing',
//   status: 'running',
//   enabledTasks: 1,
//   activeTasks: 2,
//   completedTasks: 3,
//   receipts: 7
// }

Task-to-Job Mapping

The adapter automatically maps YAWL task lifecycle to BullMQ jobs:

| YAWL Action | BullMQ Job | Priority | Delay | |-------------|------------|----------|-------| | Enable Task | enable-{taskId} | From task.priority | 0 | | Start Task | start-{taskId} | From task.priority | From task.delay | | Complete Task | complete-{taskId} | From task.priority | 0 | | Cancel Task | cancel-{taskId} | High (99) | 0 |

Job Data Schema

Each BullMQ job contains:

{
  caseId: string,
  workflowId: string,
  taskId: string,         // Task definition ID
  workItemId: string,     // Task instance ID
  action: 'enable' | 'start' | 'complete' | 'cancel',
  input: object,          // Task input data
  output: object,         // Task output data (for complete)
  actor: string,          // Optional actor identifier
  reason: string          // Optional reason (for cancel)
}

Distributed Execution Flow

Sequential Execution

const workflow = createWorkflow({
  id: 'sequential',
  tasks: [
    { id: 'task1', splitType: 'sequence' },
    { id: 'task2', splitType: 'sequence' },
    { id: 'task3', splitType: 'sequence' },
  ],
  flows: [
    { from: 'task1', to: 'task2' },
    { from: 'task2', to: 'task3' },
  ],
});

// Execution: task1 → task2 → task3 (one after another)

Parallel Execution

const workflow = createWorkflow({
  id: 'parallel',
  tasks: [
    { id: 'start', splitType: 'and' },        // Split
    { id: 'task-a', joinType: 'and' },
    { id: 'task-b', joinType: 'and' },
    { id: 'join', joinType: 'and' },          // Join
  ],
  flows: [
    { from: 'start', to: 'task-a' },
    { from: 'start', to: 'task-b' },
    { from: 'task-a', to: 'join' },
    { from: 'task-b', to: 'join' },
  ],
});

// Execution: start → [task-a, task-b] (parallel) → join
// Multiple workers can process task-a and task-b concurrently

Retry Policies

The adapter uses YAWL's cancellation regions to implement intelligent retry policies:

const workflow = createWorkflow({
  id: 'with-retries',
  tasks: [
    {
      id: 'fetch',
      cancellationRegion: 'fetch-region',  // Retry on failure
      timeout: 5000,
    },
    {
      id: 'process',
      cancellationRegion: 'process-region',
      timeout: 10000,
    },
  ],
  flows: [{ from: 'fetch', to: 'process' }],
});

Retry behavior:

  • With cancellation region: Uses defaultJobOptions.attempts for retry count
  • Without cancellation region: Retries based on BullMQ job options
  • After max retries: Task is cancelled in YAWL engine

YAWL Receipts

The adapter maintains YAWL's cryptographic receipt chain:

const { caseId } = await adapter.executeCase('workflow-id');

// Receipts are automatically generated for each transition
const receipts = adapter.receipts.get(caseId);

receipts.forEach((receipt, index) => {
  console.log(`Receipt ${index}:`, {
    id: receipt.id,                    // BLAKE3 hash
    timestamp: receipt.timestamp,      // Nanosecond precision
    eventType: receipt.eventType,      // 'task:enabled', 'task:completed', etc.
    previousHash: receipt.previousHash // Chain to previous receipt
  });
});

Receipt chain guarantees:

  • Immutability: Each receipt is cryptographically signed
  • Ordering: Receipts form a hash chain
  • Verification: Can verify entire execution history
  • Audit Trail: Complete provenance of all state transitions

Complete Example: ETL Pipeline

See src/examples/data-pipeline.mjs for a full ETL pipeline with:

  • 7 tasks: ingest, validate, transform (3 parallel), aggregate, store
  • 5 workers: Distributed across multiple processes
  • Priority scheduling: High-priority tasks processed first
  • Error handling: Automatic retries with exponential backoff
  • Status monitoring: Real-time pipeline progress tracking

Run the example:

# Start Redis
docker run -d -p 6379:6379 redis:7-alpine

# Run pipeline
pnpm example

Expected output:

================================================================================
YAWL-Queue: Distributed ETL Data Pipeline
================================================================================

Registered workflow: etl-pipeline (7 tasks)

Creating 5 workers...
Created 5 workers

================================================================================
Executing ETL Pipeline...
================================================================================

Case created: case-etl-pipeline-1703...
Initial job queued: case-etl-pipeline-1703-start-...

[Worker 12345] INGEST: Loading raw data...
[Worker 12345] INGEST: Loaded 6 records

[Worker 12346] VALIDATE: Checking data quality...
[Worker 12346] VALIDATE: 6/6 valid

[Worker 12347] TRANSFORM-1: Processing batch 1...
[Worker 12348] TRANSFORM-2: Processing batch 2...
[Worker 12349] TRANSFORM-3: Processing batch 3...

[Worker 12347] TRANSFORM-1: Processed 2 records
[Worker 12348] TRANSFORM-2: Processed 2 records
[Worker 12349] TRANSFORM-3: Processed 2 records

[Worker 12345] AGGREGATE: Combining transformed batches...
[Worker 12345] AGGREGATE: Combined 6 records

[Worker 12346] STORE: Persisting final data...
[Worker 12346] STORE: Saved aggregated data

================================================================================
Pipeline Completed Successfully!
================================================================================

API Reference

YAWLQueueAdapter

Constructor

new YAWLQueueAdapter(config)

Config Options:

{
  redis?: {
    host?: string;          // Default: 'localhost'
    port?: number;          // Default: 6379
    password?: string;
    db?: number;
  };
  queueName?: string;       // Default: 'yawl-workflows'
  defaultJobOptions?: {
    attempts?: number;      // Default: 3
    backoff?: {
      type: 'exponential' | 'fixed';
      delay: number;        // Milliseconds
    };
    removeOnComplete?: boolean;
    removeOnFail?: boolean;
  };
  engineConfig?: object;    // YAWL engine config
}

Methods

registerWorkflow(workflow)

Register a YAWL workflow definition.

await adapter.registerWorkflow(workflow);
executeCase(workflowId, initialData, options)

Execute a new workflow case.

const { caseId, jobId } = await adapter.executeCase(
  'workflow-id',
  { input: 'data' },
  { caseId: 'custom-id' }
);
getCaseStatus(caseId)

Get current case status.

const status = await adapter.getCaseStatus(caseId);
createWorker(options)

Create a worker to process jobs.

const worker = adapter.createWorker({
  concurrency: 2,
  taskHandler: async (job, task) => {
    // Process task
    return { result: 'data' };
  },
});
getStats()

Get adapter statistics.

const stats = await adapter.getStats();
// {
//   queue: { name, waiting, active, completed, failed },
//   workers: { count, ids },
//   engine: { casesCreated, tasksCompleted, ... },
//   receipts: { totalCases, totalReceipts }
// }
close()

Close all workers and connections.

await adapter.close();

Testing

Run tests:

# Unit tests
pnpm test

# Watch mode
pnpm test:watch

# Coverage
pnpm test:coverage

Test requirements:

  • Redis server running on localhost:6379
  • Database 15 is used for tests (isolated from production)

Performance Considerations

Worker Scaling

// CPU-bound tasks: workers ≈ CPU cores
for (let i = 0; i < os.cpus().length; i++) {
  adapter.createWorker({ concurrency: 1 });
}

// I/O-bound tasks: higher concurrency per worker
for (let i = 0; i < 4; i++) {
  adapter.createWorker({ concurrency: 10 });
}

Job Options

{
  // Fast retry for transient failures
  attempts: 3,
  backoff: {
    type: 'exponential',
    delay: 1000  // 1s, 2s, 4s
  },

  // Cleanup completed jobs
  removeOnComplete: {
    age: 3600,  // Keep 1 hour
    count: 1000 // Keep last 1000
  },

  // Retain failed jobs for debugging
  removeOnFail: false
}

Priority Queue

Higher priority = processed first (1-100 scale):

{
  id: 'critical-task',
  priority: 99  // Highest priority
}

Troubleshooting

Jobs Not Processing

  1. Check Redis connection:

    redis-cli ping  # Should return PONG
  2. Verify workers are running:

    console.log(adapter.workers.size);  // Should be > 0
  3. Check queue status:

    const stats = await adapter.getStats();
    console.log(stats.queue);

Task Failures

Enable verbose logging:

adapter.engine.on('TASK_FAILED', (event) => {
  console.error('Task failed:', event);
});

worker.on('failed', (job, err) => {
  console.error('Job failed:', job?.id, err.message);
});

Receipt Verification

import { verifyReceipt, verifyChainLink } from '@unrdf/yawl/receipt';

const receipts = adapter.receipts.get(caseId);

for (let i = 1; i < receipts.length; i++) {
  const valid = await verifyChainLink(receipts[i - 1], receipts[i]);
  console.log(`Receipt ${i} chain valid:`, valid);
}

License

MIT

Contributing

Contributions welcome! Please read CONTRIBUTING.md first.

Related Packages

Acknowledgments

Built on top of:

  • YAWL (van der Aalst et al.) - Workflow patterns and semantics
  • BullMQ - Robust Redis-based queue
  • UNRDF - Unified RDF framework