npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

@blunge/fluid-workflows

v0.25.0

Published

Type-safe workflow building + job-queue execution with adapters.

Downloads

547

Readme

fluid-workflows

Type-safe durable workflows with fluid builder syntax and optional job-queue execution with adapters.

The main problem this project attempts to solve is the separation of workflow definition from execution. Execution can involve persisting intermediary steps through a storage adapter (e.g. redis, or cloudflare durable objects) or using queues with a queueing adapater (e.g. BullMq, or cloudflare queues).

Install

npm install @blunge/fluid-workflows bullmq ioredis zod

Quick Start

This defines a simple workflow with a single step that adds two numbers together.

import fwf from '@blunge/fluid-workflows';

const wf = fwf.create({ name: 'add', version: 1 })
  .step(async ({ a, b }: { a: number; b: number }) => ({ sum: a + b }));

// Run the workflow directly (uses in-memory storage by default)
const out = await wf.run({ a: 2, b: 3 });
console.log(out.sum); // 5

BullMQ Adapter

To use the BullMQAdapter you need to have redis running and the REDIS_URL env variable defined.

For example, to run tests:

REDIS_URL=redis://127.0.0.1:6379 npm run test

Using job queues

import fwf from '@blunge/fluid-workflows';

// Workflow definitions

const child = fwf.create({ name: 'child', version: 1 })
  .step(async ({ s }: { s: string }) => ({ s2: `child(${s})` }));

const parent = fwf.create({ name: 'parent', version: 1 })
  .step(async ({ n }: { n: number }) => ({ s: `n=${n}` }))
  .step(child)
  .step(async ({ s2 }) => ({ out: s2 }));

// Configuration

const { worker, dispatcher } = fwf.jobQueueConfig({
  engine: new fwf.BullMqAdapter({ attempts: 1, lockTimeoutMs: 8000 }),
  workflows: [parent],
  queues: { parent: 'parent-queue', child: 'child-queue' },
  jobTimeoutMs: 60000, // Maximum time a job is expected to run (also used for state TTL)
});

// On a worker machine, run parent workflows
const stopParent = worker.run(['parent-queue']);

// Possibly on a separate machine, run child workflows
const stopChild = worker.run(['child-queue']);

// Alternatively, run all workflows on the same machine
// const stopAll = worker.run('all');

// On the app server, dispatch workflows and optionally await the output
const result = await dispatcher.dispatchAwaitingOutput(parent, { n: 5 });
console.log(result.out); // 'child(n=5)'

await stopParent();
await stopChild();

Advanced features

Parallel execution

Use .parallel() to run multiple child workflows in parallel. Each child receives the full accumulated state, and their outputs are merged into the result with the specified keys.

const child1 = fwf.create({ name: 'child1', version: 1 })
  .step(async ({ n }: { n: number }) => `child1(${n})`);

const child2 = fwf.create({ name: 'child2', version: 1 })
  .step(async ({ n }: { n: number }) => `child2(${n})`);

const parent = fwf.create({ name: 'parent', version: 1 })
  .step(async ({ n }: { n: number }) => ({ n2: n * 2 }))
  .parallel({ c1: child1, c2: child2 })
  .step(async ({ n, n2, c1, c2 }) => ({ 
    out: `${c1}, ${c2}` // 'child1(5), child2(5)'
  }));

Zod schema support

You may have noticed that we need to specify the type of the input to the first step explicitly. You can pass a zod schema to automatically infer the input type to the first step. The inputs to the workflow will also be validated against the schema during workflow execution.

const inputSchema = z.object({ a: z.number(), b: z.number() });

const parent = fwf.workflow({ name: 'parent', version: 1, inputSchema })
  .step(async ({ a, b }) => ({ 
     ...
  }));

Control flow

  • restart - allows a workflow to be restarted with new input data
  • complete - allows the workflow to be completed early with the given result

The control flow functions only wrap the argument and return it, and the wrapped result has to be returned by the step to indicate to the workflow runner what shoud happen next.

const wf = fwf.workflow({ name: 'iterate', version: 1, inputSchema })
  .step(async ({ iterations }, { restart, complete }) => ({
     if (iterations < 10) {
       return restart({ iterations: iterations + 1 });
     }
     // will skip any following steps
     return complete({ iterations });
  }))
  .step(async () => {
     // will never reach here
  });

const out = await wf.run({ iterations: 0 });
console.log(out.iterations); // 10

update

The update function allows the step's input data to be updated, such that if the step does not run to completion, it will be retried with the updated input data. Additional information, such as the current workflow progress, can be reported to job status listeners by passing an info property in the options object.

const parent = fwf.workflow({ name: 'parent', version: 1, inputSchema })
  .step(async ({ iterations, maxIterations }, { update }) => ({
     for (; iterations > 0; iterations--) {

       // by convention we pass a value from 0 to 1 to report the current progress
       const progress = Math.min(1, iterations / maxIterations);

       // calling update here updates the input data and reports the current progress
       const { interrupt } = await update({ input: { iterations }, info: { phase: 'iterating', progress } });

       // or just report progress, without updating the step's input data
       // const { interrupt } = await update({ info: { phase: 'iterating', progress } });

        if (interrupt) {
          throw Error('interrupted');
        }
      }
   }));

subscribe

The subscribe method allows job status listeners to receive real-time updates when a job's state changes (e.g., progress updates from update() calls or job completion).

const unsubscribe = await wf.subscribe(jobId, (status) => {
  console.log('Job status update:', status);
});

// Later, stop listening
unsubscribe();

Note: This example uses in-memory storage by default. To use Redis pub/sub for status updates across processes, pass a RedisStorage instance when creating the workflow.

License

MIT © Blunge