npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

@pgflow/dsl

v0.13.2

Published

The TypeScript Domain Specific Language (DSL) for defining type-safe workflow definitions in pgflow.

Readme

@pgflow/dsl

The TypeScript Domain Specific Language (DSL) for defining type-safe workflow definitions in pgflow.

[!NOTE] This project and all its components are licensed under Apache 2.0 license.

Overview

@pgflow/dsl provides a type-safe, fluent interface for defining data-driven workflows with explicit dependencies. The DSL ensures that data flows correctly between steps and maintains type safety throughout the entire workflow definition.

Key features:

  • Type Safety - Complete TypeScript type checking from flow inputs to outputs
  • Fluent Interface - Chainable method calls for defining steps and dependencies
  • Functional Approach - Clean separation between task implementation and flow orchestration
  • JSON-Compatible - All inputs and outputs are JSON-serializable for database storage
  • Immutable Flow Definitions - Each step operation returns a new Flow instance

Usage

Basic Example

import { Flow } from '@pgflow/dsl';

// Define input type for the flow
type Input = {
  url: string;
};

// Define a flow with steps and dependencies
export const AnalyzeWebsite = new Flow<Input>({
  slug: 'analyzeWebsite',
  maxAttempts: 3,
  baseDelay: 5,
  timeout: 10,
})
  .step(
    { slug: 'website' },
    async (flowInput) => await scrapeWebsite(flowInput.url)
  )
  .step(
    { slug: 'sentiment', dependsOn: ['website'] },
    async (deps) => await analyzeSentiment(deps.website.content)
  )
  .step(
    { slug: 'summary', dependsOn: ['website'] },
    async (deps) => await summarizeWithAI(deps.website.content)
  )
  .step(
    { slug: 'saveToDb', dependsOn: ['sentiment', 'summary'] },
    async (deps, ctx) => {
      return await saveToDb({
        websiteUrl: ctx.flowInput.url,
        sentiment: deps.sentiment.score,
        summary: deps.summary.aiSummary,
      });
    }
  );

Understanding Data Flow

In pgflow, step handlers use asymmetric signatures based on whether they have dependencies:

Root steps (no dependencies):

  • First parameter: flowInput - the original flow input directly
  • Second parameter: ctx - context object (env, supabase, flowInput, etc.)

Dependent steps (with dependsOn):

  • First parameter: deps - object with outputs from dependency steps (deps.{stepName})
  • Second parameter: ctx - context object (includes ctx.flowInput if needed)

This design ensures:

  • Root steps receive flow input directly for clean, simple handlers
  • Dependent steps focus on their dependencies without wrapping
  • Original flow input is always accessible via ctx.flowInput when needed
  • Steps can combine dependency outputs with original input via context

Step Methods

The Flow DSL provides three methods for defining steps in your workflow:

.step() - Regular Steps

The standard method for adding steps to a flow. Each step processes input and returns output.

.step(
  { slug: 'process', dependsOn: ['previous'] },
  async (deps, ctx) => {
    // Access deps.previous for dependency output
    // Access ctx.flowInput if original flow input is needed
    return { result: 'processed' };
  }
)

.array() - Array-Returning Steps

A semantic wrapper around .step() that provides type enforcement for steps that return arrays. Useful for data fetching or collection steps that will be processed by map steps.

// Fetch an array of items to be processed
.array(
  { slug: 'fetchItems' },
  async () => [1, 2, 3, 4, 5]
)

// With dependencies - combining data from multiple sources
.array(
  { slug: 'combineResults', dependsOn: ['source1', 'source2'] },
  async (deps) => [...deps.source1, ...deps.source2]
)

Key Points:

  • Return type is enforced to be an array at compile time
  • Commonly used as input for subsequent map steps
  • Can depend on other steps just like regular steps

.map() - Array Processing Steps

Processes arrays element-by-element, similar to JavaScript's Array.map(). The handler receives individual items instead of the full input object.

Two Modes of Operation:

  1. Root Map (no array: property): Processes the flow's input array directly

    • The flow input MUST be an array when using root maps
    • Omitting the array: property tells pgflow to use the flow input
  2. Dependent Map (with array: property): Processes another step's array output

    • The array: property specifies which step's output to process
    • That step must return an array
// ROOT MAP - No array: property means use flow input
// Flow input MUST be an array (e.g., ["hello", "world"])
new Flow<string[]>({ slug: 'processStrings' })
  .map(
    { slug: 'uppercase' }, // No array: property!
    (item) => item.toUpperCase()
  );
// Each string in the input array gets uppercased in parallel

// DEPENDENT MAP - array: property specifies the source step
new Flow<{}>({ slug: 'dataPipeline' })
  .array({ slug: 'numbers' }, () => [1, 2, 3])
  .map(
    { slug: 'double', array: 'numbers' }, // Processes 'numbers' output
    (n) => n * 2
  )
  .map(
    { slug: 'square', array: 'double' }, // Chains from 'double'
    (n) => n * n
  );
// Results: numbers: [1,2,3] → double: [2,4,6] → square: [4,16,36]

Key differences from regular steps:

  • Uses array: to specify dependency (not dependsOn:)
  • When array: is omitted, uses flow input array (root map)
  • Handler signature is (item, context) => result instead of (input, context) => result
  • Return type is always an array
  • Map steps can have at most one dependency (the array source)
  • Generates SQL with step_type => 'map' parameter for pgflow's map processing

Type Safety: The .map() method provides full TypeScript type inference for array elements:

type User = { id: number; name: string };

new Flow<{}>({ slug: 'userFlow' })
  .array({ slug: 'users' }, (): User[] => [
    { id: 1, name: 'Alice' },
    { id: 2, name: 'Bob' }
  ])
  .map({ slug: 'greet', array: 'users' }, (user) => {
    // TypeScript knows user is of type User
    return `Hello, ${user.name} (ID: ${user.id})`;
  });

Common Patterns:

// Batch processing - process multiple items in parallel
new Flow<number[]>({ slug: 'batchProcessor' })
  .map({ slug: 'validate' }, (item) => {
    if (item < 0) throw new Error('Invalid item');
    return item;
  })
  .map({ slug: 'process', array: 'validate' }, async (item) => {
    // Each item processed in its own task
    return await expensiveOperation(item);
  });

// Data transformation pipeline
new Flow<{}>({ slug: 'etlPipeline' })
  .step({ slug: 'fetchUrls' }, () => ['url1', 'url2', 'url3'])
  .map({ slug: 'scrape', array: 'fetchUrls' }, async (url) => {
    return await fetchContent(url);
  })
  .map({ slug: 'extract', array: 'scrape' }, (html) => {
    return extractData(html);
  })
  .step({ slug: 'aggregate', dependsOn: ['extract'] }, (deps) => {
    // deps.extract is the aggregated array from all map tasks
    return consolidateResults(deps.extract);
  });

Limitations:

  • Can only depend on a single array-returning step
  • TypeScript may not track type transformations between chained maps (use type assertions if needed)
  • Root maps require the entire flow input to be an array

Context Object

Step handlers can optionally receive a second parameter - the context object - which provides access to platform resources and runtime information.

.step(
  { slug: 'saveToDb' },
  async (flowInput, ctx) => {
    // Access platform resources through context
    const result = await ctx.sql`SELECT * FROM users WHERE id = ${flowInput.userId}`;
    return result[0];
  }
)

Core Context Resources

All platforms provide these core resources:

  • ctx.env - Environment variables (Record<string, string | undefined>)
  • ctx.flowInput - Original flow input (typed as the flow's input type)
  • ctx.shutdownSignal - AbortSignal for graceful shutdown handling
  • ctx.rawMessage - Original pgmq message with metadata
    interface PgmqMessageRecord<T> {
      msg_id: number;
      read_ct: number;
      enqueued_at: Date;
      vt: Date;
      message: T;
    }
  • ctx.stepTask - Current step task details (flow handlers only)
    interface StepTaskRecord<TFlow> {
      flow_slug: string;
      run_id: string;
      step_slug: string;
      msg_id: number;
    }
  • ctx.workerConfig - Resolved worker configuration with all defaults applied
    // Provides access to worker settings like retry limits
    const isLastAttempt = ctx.rawMessage.read_ct >= ctx.workerConfig.retry.limit;

Supabase Platform Resources

When using the Supabase platform with EdgeWorker, additional resources are available:

  • ctx.sql - PostgreSQL client (postgres.js)
  • ctx.supabase - Supabase client with service role key for full database access

To use Supabase resources, import the Flow class from the Supabase preset:

import { Flow } from '@pgflow/dsl/supabase';

const MyFlow = new Flow<{ userId: string }>({
  slug: 'myFlow',
}).step({ slug: 'process' }, async (flowInput, ctx) => {
  // TypeScript knows ctx includes Supabase resources
  const { data } = await ctx.supabase
    .from('users')
    .select('*')
    .eq('id', flowInput.userId);

  // Use SQL directly
  const stats = await ctx.sql`
      SELECT COUNT(*) as total FROM events
      WHERE user_id = ${flowInput.userId}
    `;

  return { user: data[0], eventCount: stats[0].total };
});

[!NOTE] Context is optional - handlers that don't need platform resources can omit the second parameter for backward compatibility.

For more details on available resources and platform configuration, see the @pgflow/edge-worker documentation.

Flow Configuration

Configure flows and steps with runtime options:

new Flow<Input>({
  slug: 'myFlow', // Required: Unique flow identifier
  maxAttempts: 3, // Optional: Maximum retry attempts (default: 1)
  baseDelay: 5, // Optional: Base delay in seconds for retries (default: 1)
  timeout: 10, // Optional: Task timeout in seconds (default: 30)
});

Compiling Flows

Use the compileFlow utility to convert a flow definition into SQL statements:

import { compileFlow } from '@pgflow/dsl';

const sqlStatements = compileFlow(MyFlow);
console.log(sqlStatements.join('\n'));

Alternatively, use the pgflow CLI to compile flows directly to migration files:

npx pgflow compile path/to/flow.ts

Requirements

  • All step inputs and outputs MUST be JSON-serializable
  • Use only: primitive types, plain objects, and arrays
  • Convert dates to ISO strings (new Date().toISOString())
  • Avoid: class instances, functions, symbols, undefined values, and circular references

Building

Run nx build dsl to build the library.

Running unit tests

Run nx test dsl to execute the unit tests via Vitest.

Documentation

For detailed documentation on the Flow DSL, visit: