npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

@node-in-layers/tasks

v1.0.5

Published

A Node In Layers Package for creating and running Asynchronous Tasks and Workflows.

Readme

Distributed Tasks and Workflows - The Node In Layers Package for Creating / Running Asynchronous Tasks and Workflows

Unit Tests Coverage Status This repository provides a standardized interface, models, and functional code for running distributed tasks within the Node in Layers framework. It handles complex workflows with hierarchical task spawning, future scheduling, and event-driven callbacks.

All models support @node-in-layer/core's integer and uuid primary keys based on configuration. (Defaults to UUID).

How To Use

1. Install in your project. (Frontend, backend, or your sdk)

npm install @node-in-layers/tasks

2. Add Domains to your Config

import { CoreNamespace } from '@node-in-layers/core'
// Front and backend safe import.
import { tasksCore, TasksNamespace } from '@node-in-layers/tasks'
// Only for backends.
import * as tasksBackend from '@node-in-layers/tasks/backend/index.js'

const config = {
  [CoreNamespace.root]: {
    apps: [
      // @node-in-layers/data: If you are using a database for a backend, you will want to include this before taskBackend

      // This has all the interfaces, and needs to come before your domains and other @node-in-layers/tasks domains
      tasksCore,
      // Only include if this is a backend, otherwise don't include this.
      tasksBackend,

      // Insert other domains.
    ],
    //...
  },
  // Optional: For any backend based configurations.
  [TasksNamespace.Backend]: {
    ///...
  },
}

3. Basic Usage

To run a distributed task, you must wrap your feature using createTaskFeature from the Tasks Backend feature layer. Because Node In Layers uses closure factories for dependency injection, you do this inside your domain's features.ts.

import { FeaturesContext, Config, CrossLayerProps } from '@node-in-layers/core'
import { TasksNamespace, TasksFeaturesLayer } from '@node-in-layers/tasks'
import { z } from 'zod'

export const create = (
  context: FeaturesContext<Config, TasksFeaturesLayer>
) => {
  // 1. Wrap your feature
  const myDistributedFeature = context.features[
    TasksNamespace.Backend
  ].createTaskFeature(
    {
      domain: 'myDomain',
      functionName: 'myDistributedFeature',
      args: z.object({ someData: z.string() }),
      returns: z.object({ success: z.boolean(), data: z.string() }),
    },
    async (props, crossLayerProps) => {
      const log = context.log.getInnerLogger(
        'myDistributedFeature',
        crossLayerProps
      )
      log.info('Executing distributed task', { someData: props.someData })

      // Your business logic here
      return { success: true, data: props.someData }
    }
  )

  // 2. Execute and await the task
  const anotherLongRunningFunction = async (
    someData: string,
    crossLayerProps?: CrossLayerProps
  ) => {
    // Execute it to get a taskId
    const response = await myDistributedFeature({ someData }, crossLayerProps)

    if (!response.error) {
      // Poll for the result on your behalf
      const result = await context.features[TasksNamespace.Backend].awaitTask(
        { taskId: response.taskId },
        crossLayerProps
      )
      return result
    }
    return response
  }

  // 3. Or use executeTaskAndWait for sub-tasks
  const highLevelTaskWithSubTasks = context.features[
    TasksNamespace.Backend
  ].createTaskFeature(
    {
      domain: 'myDomain',
      functionName: 'highLevelTaskWithSubTasks',
      args: z.object({ someData: z.string() }),
      returns: z.object({ success: z.boolean() }),
    },
    async (someData: string, crossLayerProps?: CrossLayerProps) => {
      const subTaskResults = await context.features[
        TasksNamespace.Backend
      ].executeTaskAndWait(
        {
          taskFunction: myDistributedFeature,
          payload: { someData },
        },
        crossLayerProps
      )

      const anotherSubTaskResult = await context.features[
        TasksNamespace.Backend
      ].executeTaskAndWait(
        {
          taskFunction: context.features.someOtherDomain.myTaskFeature,
          payload: subTaskResults,
        },
        crossLayerProps
      )

      return { success: true }
    }
  )

  return {
    myDistributedFeature,
    anotherLongRunningFunction,
    anotherFunction,
  }
}

Domains

  • Core (TasksNamespace.Core) - Defines Types and Models
  • Backend (TasksNamespace.Backend) - Defines backend Task implementation

Tasks are executed at the "feature" level, ensuring all business logic remains decoupled from the distributed queueing infrastructure.

Architecture

The @node-in-layers/tasks package decouples the request to run a feature from its execution. This allows features to be executed asynchronously, distributed across multiple workers, and retried automatically.

flowchart LR
    subgraph API["API / Caller"]
        caller["Caller Feature"]
        wrapper["Task Wrapper"]
        db[("Database")]
    end

    queue[("Queue / Event Bus")]

    subgraph Worker["Worker Node"]
        consumer["Queue Consumer"]
        target["Target Feature"]
        callbacks["Event / Callback Enqueuer"]
    end

    caller --> wrapper
    wrapper --> db
    wrapper --> queue

    queue --> consumer
    consumer --> db
    consumer --> target
    consumer --> callbacks
    callbacks -->|Enqueues Event Task| queue

The Execution Flow

  1. Invocation: A caller invokes a wrapped feature (e.g., myFeature(args)).
  2. Task Creation: The wrapper intercepts the call, creates a Task record in the database (status: Pending), and saves the arguments as the task's payload.
  3. Enqueueing: The wrapper enqueues the Task onto the distributed Queue (e.g., BullMQ) and synchronously returns the { taskId } to the caller.
  4. Consumption: A separate long-running Consumer process (typically a CLI application) polls the Queue and dequeues the Task.
  5. Execution: The Consumer updates the Task status to Running, looks up the registered feature, and executes it with the original payload.
  6. Completion: Once the feature finishes, the Consumer updates the Task status to Completed (or Failed) and saves the result.
  7. Event Callbacks: The Consumer triggers the callback lifecycle. It queries the database for any TaskCallbacks linked to the task. For each matching callback (based on success/failure conditions), it enqueues a new Task for the callback feature. These callbacks act as an event-driven infrastructure.
sequenceDiagram
    participant Caller
    participant Task Wrapper
    participant Database
    participant Queue
    participant Consumer
    participant Feature

    Caller->>Task Wrapper: Call Feature (args)
    Task Wrapper->>Database: Create Task (Pending)
    Task Wrapper->>Queue: Enqueue Task
    Task Wrapper-->>Caller: Return { taskId }

    Note over Queue, Consumer: Asynchronous / Distributed
    Queue->>Consumer: Dequeue Task
    Consumer->>Database: Update Task (Running)
    Consumer->>Feature: Execute Feature (payload)
    Feature-->>Consumer: Return Result / Error
    Consumer->>Database: Update Task (Completed/Failed)

    Note over Consumer, Queue: Event / Callback Lifecycle
    Consumer->>Database: Fetch TaskCallbacks
    loop For each matching Event/Callback condition
        Consumer->>Database: Create Event Task (Pending)
        Consumer->>Queue: Enqueue Event Task
    end

    Note over Queue, Consumer: Event Execution
    Queue->>Consumer: Dequeue Event Task
    Consumer->>Feature: Execute Event Feature

Implementations

While the system can be customized to use any implementation, the out-of-the-box implementation uses BullMQ for queueing and Docker or Local for task runners.

Adding or Changing the Queue Service

BullMQ is provided by default. To change the queue service, you need to provide a domain that has QueueService implemented. You then set the configuration correctly to the domain name that the queue service should come from.

import { TasksNamespace } from '@node-in-layers/tasks'

const config = {
  // ...
  [TasksNamespace.Backend]: {
    queue: {
      // Set this to the domain name that implements QueueService
      enqueueService: 'myCustomQueueDomain',
    },
  },
}

The Consumer Domain

To actually process tasks, the implementing system must create a long-running consumer. This is typically a separate domain (e.g., @events or @workers) with a feature that polls the queue and calls tasks.features.registerTaskConsumer(). A CLI application then executes this consumer feature to keep the worker alive.

Features & Capabilities

  • Smart Wrappers: Wrap any annotatedFunction to automatically make it a distributed task.
  • Synchronous Bypass: Callers can pass { tasks: { executeSync: true } } to bypass the queue and run the feature immediately in the current process.
  • Event-Driven Callbacks: Register callbacks that fire onSuccess, onFailure, or onAnyCompletion. Callbacks are enqueued as their own distributed tasks.
  • Retries & Backoff: Configure automatic retries for flaky tasks.
  • Hierarchical Tasks: Tasks can spawn sub-tasks, and parent tasks are automatically evaluated when all children complete.