npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

@celom/prose

v0.3.2

Published

Declarative workflow DSL for orchestrating complex business operations in Javascript/Typescript.

Readme

@celom/prose

Declarative workflow DSL for orchestrating complex business operations in Node.js.

Define multi-step business logic as type-safe pipelines with built-in retries, timeouts, transactions, event publishing, and observability — using plain async/await.

import { createFlow, ValidationError } from '@celom/prose';

const onboardUser = createFlow<{ email: string; name: string }>('onboard-user')
  .validate('checkEmail', (ctx) => {
    if (!ctx.input.email.includes('@'))
      throw ValidationError.single('email', 'Invalid email');
  })
  .step('createAccount', async (ctx) => {
    const user = await db.createUser(ctx.input);
    return { user };
  })
  .withRetry({ maxAttempts: 3, delayMs: 200, backoffMultiplier: 2 })
  .step('sendWelcome', async (ctx) => {
    await mailer.send(ctx.state.user.email, 'Welcome!');
  })
  .event('users', (ctx) => ({
    eventType: 'user.onboarded',
    userId: ctx.state.user.id,
  }))
  .build();

const result = await onboardUser.execute(
  { email: '[email protected]', name: 'Alice' },
  { db, eventPublisher }
);

Install

npm install @celom/prose

Features

  • Type-safe state threading — each step's return type merges into ctx.state, giving you full autocomplete and compile-time checks across the entire pipeline
  • Retries with exponential backoff — per-step retry policies with configurable delays, backoff multipliers, caps, and conditional retry predicates
  • Timeouts — flow-level and step-level timeouts backed by AbortSignal, with actual interruption of async operations
  • Cooperative cancellation — pass an external AbortSignal to cancel a running flow
  • Database transactions — wrap steps in db.transaction() with any ORM (Drizzle, Knex, Prisma)
  • Event publishing — emit domain events to named channels with automatic correlation IDs
  • Parallel execution — run independent steps concurrently with configurable merge strategies
  • Conditional steps & early exit — skip steps based on runtime conditions or short-circuit the flow entirely
  • Composable sub-flows — extract and reuse step sequences via .pipe()
  • Observability hooks — plug in logging, metrics, or tracing through the observer interface
  • Zero dependencies — runs in-process with no external infrastructure

Guide

Creating a flow

createFlow returns a builder. Chain steps onto it and call .build() to get an executable flow.

import { createFlow } from '@celom/prose';

const flow = createFlow<{ orderId: string }>('process-order')
  .step('fetch', async (ctx) => {
    const order = await db.getOrder(ctx.input.orderId);
    return { order };
  })
  .step('charge', async (ctx) => {
    const receipt = await payments.charge(ctx.state.order.total);
    return { receipt };
  })
  .build();

The generic parameter defines the input shape. TypeScript infers the state type as steps accumulate — after the fetch step, ctx.state.order is available with full type information.

Running a flow

const result = await flow.execute(
  { orderId: 'ord_123' },    // input
  { db, eventPublisher },     // dependencies
  { timeout: 30_000 }         // options (optional)
);

Execution options:

| Option | Type | Description | |--------|------|-------------| | timeout | number | Max duration for the entire flow (ms) | | stepTimeout | number | Default max duration per step (ms) | | signal | AbortSignal | External signal for cancellation | | observer | FlowObserver | Lifecycle hooks for logging/metrics | | throwOnError | boolean | false returns partial state instead of throwing | | correlationId | string | Custom ID propagated to events and observers | | errorHandling | object | Control behavior for missing deps (see below) |

Validation

Validation steps run before processing and are never retried. Throw ValidationError to fail fast.

import { ValidationError } from '@celom/prose';

flow.validate('checkInput', (ctx) => {
  if (ctx.input.amount <= 0)
    throw ValidationError.single('amount', 'Must be positive');
});

ValidationError accepts an optional array of issues for multi-field validation:

throw new ValidationError('Validation failed', [
  { field: 'email', message: 'Required' },
  { field: 'age', message: 'Must be at least 18' },
]);

Retries

Chain .withRetry() after any step to add a retry policy.

flow
  .step('callExternalApi', async (ctx) => {
    const data = await api.fetch(ctx.input.url);
    return { data };
  })
  .withRetry({
    maxAttempts: 5,
    delayMs: 100,
    backoffMultiplier: 2,
    maxDelayMs: 5_000,
    shouldRetry: (err) => err.status !== 400,
    stepTimeout: 10_000, // override the flow-level stepTimeout for this step
  })

| Option | Type | Default | Description | |--------|------|---------|-------------| | maxAttempts | number | — | Total attempts (including the first) | | delayMs | number | — | Initial delay between retries | | backoffMultiplier | number | 1 | Multiplier applied to delay after each retry | | maxDelayMs | number | Infinity | Upper bound on delay | | shouldRetry | (error) => boolean | — | Predicate to conditionally retry | | stepTimeout | number | — | Timeout override for this step |

Timeouts & cancellation

const controller = new AbortController();

const result = await flow.execute(input, deps, {
  timeout: 30_000,       // abort if the flow exceeds 30s
  stepTimeout: 5_000,    // abort any step that exceeds 5s
  signal: controller.signal, // cancel from outside
});

// later, to cancel:
controller.abort();

Inside step handlers, ctx.signal exposes the combined signal so you can pass it to fetch, database calls, or check ctx.signal.aborted for cooperative cancellation.

flow.step('longOperation', async (ctx) => {
  const resp = await fetch(url, { signal: ctx.signal });
  return { data: await resp.json() };
});

Conditional steps

stepIf runs the handler only when the condition returns true. Skipped steps don't affect state and don't consume retry attempts.

flow
  .step('checkCache', (ctx) => {
    return { cached: cache.has(ctx.input.key) };
  })
  .stepIf('fromCache', (ctx) => ctx.state.cached, (ctx) => {
    return { value: cache.get(ctx.input.key) };
  })
  .stepIf('fromDb', (ctx) => !ctx.state.cached, async (ctx) => {
    return { value: await db.get(ctx.input.key) };
  })

Early exit with breakIf

breakIf short-circuits the flow, skipping all remaining steps and the .map() transformer. An optional second argument defines the return value.

flow
  .step('findUser', async (ctx) => {
    const existing = await db.findByEmail(ctx.input.email);
    return { existing };
  })
  .breakIf(
    (ctx) => ctx.state.existing != null,
    (ctx) => ({ user: ctx.state.existing, created: false })
  )
  .step('createUser', async (ctx) => {
    const user = await db.createUser(ctx.input);
    return { user };
  })
  .map((input, state) => ({ user: state.user, created: true }))
  .build();

Database transactions

Use .transaction() to wrap a step in db.transaction(). The transaction client is passed as the second argument.

flow.transaction('persist', async (ctx, tx) => {
  const id = await tx.insert('users', { name: ctx.input.name });
  return { userId: id };
});

Requires a db dependency conforming to:

interface DatabaseClient {
  transaction<T>(fn: (tx: TransactionClient) => Promise<T>): Promise<T>;
}

Works with Drizzle, Knex, Prisma, or any ORM exposing a transaction() method.

Event publishing

Emit domain events to named channels. Events are automatically enriched with correlationId.

// single event
flow.event('orders', (ctx) => ({
  eventType: 'order.created',
  orderId: ctx.state.orderId,
}));

// multiple events on the same channel
flow.events('notifications', [
  (ctx) => ({ eventType: 'email.send', to: ctx.input.email }),
  (ctx) => ({ eventType: 'sms.send', to: ctx.input.phone }),
]);

Requires an eventPublisher dependency conforming to:

interface FlowEventPublisher {
  publish(channel: string, event: FlowEvent): Promise<void> | void;
}

Parallel execution

Run independent handlers concurrently and merge results into state.

flow.parallel('fetchAll', 'deep',
  async (ctx) => ({ users: await fetchUsers() }),
  async (ctx) => ({ posts: await fetchPosts() }),
);
// ctx.state now has both `users` and `posts`

Merge strategies:

| Strategy | Behavior | |----------|----------| | 'shallow' | Object.assign() — later results override earlier ones | | 'error-on-conflict' | Throws if any keys overlap between results | | 'deep' | Recursive merge; arrays are concatenated |

Output transformation

.map() transforms the accumulated state into a custom output shape.

flow
  .step('fetch', async (ctx) => {
    const user = await db.getUser(ctx.input.id);
    return { user };
  })
  .map((input, state) => ({
    id: state.user.id,
    displayName: state.user.name,
  }))
  .build();

Composable sub-flows with .pipe()

Extract reusable step sequences as functions and compose them with .pipe().

function withAuth(builder) {
  return builder
    .step('validateToken', async (ctx) => {
      const session = await auth.verify(ctx.input.token);
      return { session };
    })
    .step('loadUser', async (ctx) => {
      const user = await db.getUser(ctx.state.session.userId);
      return { user };
    });
}

const flow = createFlow<{ token: string }>('protected-action')
  .pipe(withAuth)
  .step('doAction', (ctx) => {
    // ctx.state.user is fully typed here
    return { result: `Hello, ${ctx.state.user.name}` };
  })
  .build();

Observability

Pass an observer to hook into flow and step lifecycle events.

import { PinoFlowObserver } from '@celom/prose';
import pino from 'pino';

const logger = pino();
const observer = new PinoFlowObserver(logger);

await flow.execute(input, deps, { observer });

Observer hooks:

| Hook | Called when | |------|------------| | onFlowStart | Flow begins | | onFlowComplete | Flow finishes successfully | | onFlowError | Flow fails | | onFlowBreak | Flow exits early via breakIf | | onStepStart | Step begins | | onStepComplete | Step finishes | | onStepError | Step fails (after exhausting retries) | | onStepRetry | Step is about to be retried | | onStepSkipped | Conditional step is skipped |

All hooks are optional — implement only what you need:

await flow.execute(input, deps, {
  observer: {
    onStepComplete: (name, _result, duration) =>
      console.log(`${name} took ${duration}ms`),
  },
});

Built-in observers: DefaultObserver (console), NoOpObserver (silent), PinoFlowObserver (structured logging).

Error handling

By default, step errors are wrapped in FlowExecutionError and thrown.

import { FlowExecutionError, ValidationError, TimeoutError } from '@celom/prose';

try {
  await flow.execute(input, deps);
} catch (err) {
  if (err instanceof ValidationError) {
    // fail-fast validation — err.issues has field-level details
  } else if (err instanceof TimeoutError) {
    // flow or step exceeded its timeout
  } else if (err instanceof FlowExecutionError) {
    // step execution failure — err.stepName, err.originalError
  }
}

Set throwOnError: false to return partial state instead of throwing:

const result = await flow.execute(input, deps, { throwOnError: false });

Control behavior when optional dependencies are missing:

await flow.execute(input, deps, {
  errorHandling: {
    throwOnMissingDatabase: false,        // warn instead of throwing
    throwOnMissingEventPublisher: false,   // warn instead of throwing
  },
});

Flow metadata

Every step handler receives ctx.meta with runtime metadata:

flow.step('example', (ctx) => {
  ctx.meta.flowName;      // 'process-order'
  ctx.meta.currentStep;   // 'example'
  ctx.meta.startedAt;     // Date
  ctx.meta.correlationId; // auto-generated or custom
});

What this isn't

Prose is an in-process workflow orchestration library. It runs inside your existing Node.js process with zero external dependencies. Before adopting it, it's worth understanding what it does not try to be:

Not a durable execution engine. If you need workflows that survive process restarts, resume after hours or days, or coordinate across distributed services, look at Temporal, Inngest, or Trigger.dev. These require infrastructure (servers, queues, databases) but give you persistence and replay guarantees that an in-process library fundamentally cannot.

Not a full effect system. Effect-TS is more powerful in every technical dimension — typed errors in the return signature, type-level dependency injection via Layers, fibers, streams, and a massive standard library. If your team can invest in learning its functional programming model, Effect is the more capable choice. Prose trades that power for simplicity: pure async/await, no monads, no new paradigms to learn.

Not a state machine. XState models workflows as finite state machines with explicit states, transitions, and guards — ideal for complex non-linear flows with many possible state transitions. Prose is designed for sequential (or branching) business logic pipelines where a state machine's verbosity would be overhead.

Not a result type library. Libraries like neverthrow or fp-ts encode errors in return types (Result<T, E>, Either<E, A>). Prose does not — steps throw, and failures are wrapped in FlowExecutionError. If typed error channels are critical to you, Effect or neverthrow are better fits.

Where Prose fits

Prose is for teams building backend services with multi-step business logic (process an order, onboard a user, handle a payment) who want structured retries, timeouts, transactions, observability, and type-safe state threading — without adopting new infrastructure or a new programming paradigm.

License

MIT