npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

stepkit

v0.2.3

Published

Type-safe pipeline builder for Node.js/TypeScript with powerful step composition

Readme

stepkit

A super minimal, type-safe pipeline builder for TypeScript. Built for AI SDK but works everywhere.

Installation

npm install stepkit
# or pnpm, yarn, bun

Core Idea: Pipelines

Build small, named steps that pass a typed context forward. Each step returns a plain object merged into the context. Keep it obvious and composable.

import { stepkit } from 'stepkit'
import { openai } from '@ai-sdk/openai'
import { generateText } from 'ai'

const signup = stepkit<{ email: string }>()
  .step('normalize-email', ({ email }) => ({ normalizedEmail: email.trim().toLowerCase() }))
  .step('check-email-deliverability', async ({ normalizedEmail }) => ({
    isDeliverable: await verifyEmail(normalizedEmail),
  }))
  .branchOn(
    'delivery-route',
    {
      name: 'deliverable',
      when: ({ isDeliverable }) => isDeliverable,
      then: (b) =>
        b
          .step('draft-welcome', async ({ normalizedEmail }) => {
            const { text } = await generateText({
              model: openai('gpt-4.1'),
              prompt: `Write a friendly one-line welcome for ${normalizedEmail}. Max 12 words.`,
            })
            return { welcomeSubject: 'Welcome to Acme', welcomeBody: text }
          })
          .step(
            'send-welcome-email',
            async ({ normalizedEmail, welcomeSubject, welcomeBody }) => {
              const subject = welcomeSubject ?? 'Welcome!'
              const body = welcomeBody ?? 'Welcome aboard!'
              const id = await sendEmail({ to: normalizedEmail, subject, body })
              return { welcomeEmailId: id }
            },
          ),
    },
    {
      name: 'undeliverable',
      default: (b) =>
        b.step('request-verification', async ({ normalizedEmail }) => ({
          verificationRequestId: await requestVerification(normalizedEmail),
        })),
    },
  )

await signup.run({ email: '[email protected]' })

Examples

Parallel Execution + Conditional Steps

import { openai } from '@ai-sdk/openai'
import { generateText } from 'ai'

const evaluator = stepkit<{ idea: string }>()
  // Run market signals in parallel
  .step(
    'gather-market-signals',
    async ({ idea }) => ({ marketSize: await fetchMarketSize(idea) }),
    async ({ idea }) => ({ competitors: await fetchCompetitors(idea) }),
  )
  // Conditional: only run forecasting when the market is large
  .step(
    { name: 'run-forecast', condition: ({ marketSize }) => marketSize === 'large' },
    async ({ idea }) => ({ forecast: await forecastROI(idea) }),
  )
  .step('evaluate', async ({ idea, marketSize, competitors, forecast }) => {
    const { text } = await generateText({
      model: openai('gpt-4.1'),
      prompt: `Rate this idea (1-10): "${idea}"\nMarket: ${marketSize}\nCompetitors: ${competitors.length}\nForecast: ${forecast ?? 'n/a'}`,
    })
    return { evaluation: text }
  })

await evaluator.run({ idea: 'AI-powered plant waterer' })

Tip: Use { parallelMode: 'settled' } on a step to continue merging successful parallel outputs even if some functions fail.

Logging & Stopwatch

Enable structured logs with per-step durations and a performance summary by passing { log: { stopwatch: true } } at runtime.

const sleep = (ms: number) => new Promise((r) => setTimeout(r, ms))

const pipeline = stepkit<{ userId: string }>()
  .step('fetch-user', async ({ userId }) => {
    await sleep(150)
    return { user: { id: userId, email: '[email protected]' } }
  })
  .step(
    'fetch-data',
    async ({ user }) => {
      await sleep(120)
      return { orders: [{ id: 'o1' }, { id: 'o2' }, { id: 'o3' }] }
    },
    async ({ user }) => {
      await sleep(80)
      return { alerts: ['notice'] }
    }
  )
  .step({ name: 'maybe-slow', timeout: 200, onError: 'continue' }, async () => {
    await sleep(300) // will time out and continue
    return { slow: true }
  })
  .step('process', ({ orders }) => ({ orderCount: orders?.length ?? 0 }))
  .branchOn(
    'route',
    {
      name: 'has-orders',
      when: ({ orderCount }) => (orderCount ?? 0) > 0,
      then: (b) => b.step('compute-total', () => ({ total: 99.5 }))
    },
    { name: 'no-orders', default: (b) => b.step('show-empty', () => ({ total: 0 })) }
  )
  .transform('finalize', ({ user, total }) => ({ userId: user.id, total }))

await pipeline.run({ userId: '42' }, { log: { stopwatch: true } })

Output:

🚀 Starting pipeline with input: {
  userId: "42",
}

📍 Step: fetch-user
✅ fetch-user completed in 178ms
   Output: user

📍 Step: fetch-data
✅ fetch-data completed in 121ms
   Output: orders, alerts

📍 Step: maybe-slow
❌ maybe-slow failed after 201ms
   Error: ... Step 'maybe-slow' timed out after 200ms

📍 Step: process
✅ process completed in 0ms
   Output: orderCount

🔀 Branch: route
   ↳ Executing: has-orders

📍 Step: has-orders/compute-total
✅ has-orders/compute-total completed in 0ms
   Output: total
✅ route completed in 2ms
   Output: total

🔄 Transform: finalize
✅ finalize completed in 0ms
   Output: userId, total

⏱️  Performance Summary:
┌──────────────────────────────────────────────────────┐
│ ✅ fetch-user                                  178ms │
│ ✅ fetch-data                                  121ms │
│ ❌ maybe-slow                                  201ms │
│ ✅ process                                       0ms │
│ ✅ has-orders/compute-total                      0ms │
│ ✅ route                                         2ms │
│ ✅ finalize                                      0ms │
└──────────────────────────────────────────────────────┘

📊 Statistics:
   Average: 50ms
   Slowest: fetch-user (178ms)
   Fastest: process (0ms)

⏰ Total Pipeline Time: 511ms

✨ Pipeline completed successfully

Branching Logic

const moderator = stepkit<{ content: string; userId: string }>()
  .step('classify-content', async ({ content }) => {
    const { text } = await generateText({
      model: openai('gpt-4.1'),
      prompt: `Classify content as safe, suspicious, or dangerous.\n\n${content}`,
    })
    return { riskLevel: text.trim().toLowerCase() as 'safe' | 'suspicious' | 'dangerous' }
  })
  .branchOn(
    'policy-route',
    {
      name: 'safe',
      when: ({ riskLevel }) => riskLevel === 'safe',
      then: (b) =>
        b.step('publish', async () => ({ action: 'published' as const })),
    },
    {
      name: 'suspicious',
      when: ({ riskLevel }) => riskLevel === 'suspicious',
      then: (b) =>
        b
          .step('queue-review', async () => ({ reviewTicketId: await createReviewTicket() }))
          .step('notify-moderators', async ({ reviewTicketId }) => ({
            moderatorNotified: await notifyModerators(reviewTicketId),
          }))
          .step('hold', () => ({ action: 'held-for-review' as const })),
    },
    {
      name: 'dangerous',
      default: (b) =>
        b
          .step('block-user', async ({ userId }) => ({ blocked: await blockUser(userId) }))
          .step('send-user-email', async ({ blocked }) => ({
            userMessaged: blocked ? await sendUserEmail('Your content was blocked') : false,
          }))
          .step('notify-admin', async () => ({ adminNotified: await notifyAdmin() }))
          .step('finalize', () => ({ action: 'blocked' as const })),
    },
  )
  .transform('format', ({ action, reviewTicketId, moderatorNotified, adminNotified }) => ({
    status: action,
    reviewTicketId,
    moderatorNotified,
    adminNotified,
  }))

await moderator.run({ content: 'Check this out!' })

Transform: Replace Context

Clean the context: drop intermediate or sensitive fields and keep only what the next steps need.

const cleaner = stepkit<{ token: string }>()
  .step('fetch-user', async ({ token }) => ({
    user: await getUser(token),
    token, // still present for now
    debugInfo: { fetchedAt: Date.now() },
  }))
  .step('fetch-settings', async ({ user }) => ({
    rawSettings: await getSettings(user.id),
    transient: 'will-be-removed',
  }))
  // Replace the entire context to remove clutter and sensitive data
  .transform('clean-context', ({ user, rawSettings }) => ({
    userId: user.id,
    email: user.email,
    theme: rawSettings.theme ?? 'system',
    isPro: rawSettings.plan === 'pro',
  }))
  .step('use-clean', ({ userId, theme, isPro }) => ({
    profileReady: true,
    message: `${isPro ? 'Pro' : 'Free'} user ${userId} prefers ${theme} theme`,
  }))

await cleaner.run({ token: 'secret' })

Composable Pipelines

import { StepOutput } from 'stepkit'

// Classify input
const classify = stepkit<{ prompt: string }>()
  .step('classify', async ({ prompt }) => {
    const { text } = await generateText({
      model: openai('gpt-4.1'),
      prompt: `Is this a question or statement? One word.\n\n${prompt}`,
    })
    return { type: text.trim().toLowerCase() }
  })

// Extract type for reusable branches
type Classified = StepOutput<typeof classify, 'classify'>

// Reusable pipelines (can live in separate files)
const handleQuestion = stepkit<Classified>()
  .step('answer', async ({ prompt }) => {
    const { text } = await generateText({
      model: openai('gpt-4.1'),
      prompt: `Answer: ${prompt}`,
    })
    return { response: text }
  })

const handleStatement = stepkit<Classified>()
  .step('acknowledge', () => ({ response: 'Thanks for sharing!' }))

// Compose with full type safety (branch)
const responder = classify
  .branchOn(
    {
      name: 'question',
      when: ({ type }) => type === 'question',
      then: handleQuestion,
    },
    { name: 'statement', default: handleStatement },
  )
  .step('finalize', ({ response }) => ({ done: true, response }))

await responder.run({ prompt: 'What is AI?' })

Nested Pipelines

// Session sub-pipeline: load session and permissions
const sessionPipeline = stepkit<{ sessionId: string }>()
  .step('fetch-session', async ({ sessionId }) => ({ session: await getSession(sessionId) }))
  .step('fetch-permissions', async ({ session }) => ({
    permissions: await getPermissions(session.userId),
  }))

// Main pipeline composes the session pipeline and continues
const main = stepkit<{ sessionId: string }>()
  .step('load-session', sessionPipeline)
  .step('use-permissions', ({ permissions }) => ({ canPublish: permissions.includes('publish') }))

await main.run({ sessionId: 'abc123' })

Notes:

  • Nested pipelines merge outputs using the wrapping step's mergePolicy (default: override).
  • Nested step names are prefixed for typing, e.g. some-other/sub appears in StepNames and StepOutput.

Error Handling & Retries

Let a step fail without breaking the pipeline, and retry transient errors.

const fetchWithRetry = stepkit()
  .step(
    {
      name: 'fetch-resource',
      onError: 'continue',
      retries: 2,
      retryDelayMs: 250,
      shouldRetry: (err) => /429|timeout/i.test(String(err?.message ?? err)),
    },
    async () => {
      // imagine a flaky network call
      const ok = Math.random() > 0.5
      if (!ok) throw new Error('429: too many requests')
      return { data: { id: '42' } }
    },
  )
  .step('continue-anyway', ({ data }) => ({ hasData: !!data }))

await fetchWithRetry.run({})

Timeouts & Abort

Guard slow steps and support cancelling the whole pipeline.

const ac = new AbortController()

const guarded = stepkit()
  .step(
    { name: 'third-party-api-request', timeout: 1500, onError: 'continue' },
    async () => {
      // Simulate an external API that may be slow
      await new Promise((r) => setTimeout(r, 2000))
      return { thirdPartyOk: true }
    },
  )
  .step('after', ({ thirdPartyOk }) => ({
    status: thirdPartyOk ? 'used-third-party' : 'skipped-third-party',
  }))

// ac.abort() would cancel; pass the signal at run time
await guarded.run({}, { signal: ac.signal })

Checkpoints & Resume

Resume a pipeline from any completed step using checkpoints emitted by onStepComplete. You can shallowly override fields when resuming.

import { stepkit } from 'stepkit'

const calc = stepkit<{ a: number; b?: number }>()
  .step('add-one', ({ a }) => ({ a: a + 1 }))
  .step('double', ({ a }) => ({ a: a * 2 }))
  .step('finish', ({ a, b }) => ({ sum: (a ?? 0) + (b ?? 0) }))

let checkpoint = ''
await calc.run(
  { a: 1 },
  {
    onStepComplete: (e) => {
      if (e.stepName === 'double') checkpoint = e.checkpoint
    },
  },
)

// Resume later with an override
const resumed = await calc.runCheckpoint({ checkpoint, overrideData: { b: 10 } })

Human approval (mock flow)

Use a checkpoint to pause after generating a draft, store the checkpoint, and resume on approval.

import { stepkit } from 'stepkit'

// Mocks — replace with real services
const kv: Record<string, string> = {}
const save = async (id: string, cp: string) => (kv[id] = cp)
const get = async (id: string) => kv[id] ?? null
const del = async (id: string) => { delete kv[id] }
const sendEmail = async ({ to, body }: { to: string; body: string }) => {
  console.log('Sending email to', to, 'with body:', body)
}

const replyFlow = stepkit<{ body: string }>()
  .step('generate', async ({ body }) => ({ reply: `Reply: ${body}` }))
  .step('send', async ({ reply }) => {
    await sendEmail({ to: '[email protected]', body: reply })
  })

export const start = async (body: string) => {
  let approvalId: string | null = null
  await replyFlow.run(
    { body },
    {
      async onStepComplete(e) {
        if (e.stepName.endsWith('generate')) {
          approvalId = `apr_${Date.now()}`
          await save(approvalId, e.checkpoint)
          e.stopPipeline()
        }
      },
    },
  )
  return { approvalId }
}

export const approve = async (approvalId: string) => {
  const checkpoint = await get(approvalId)
  if (!checkpoint) throw new Error('Not found')
  await replyFlow.runCheckpoint(checkpoint)
  await del(approvalId)
}

export const reject = async (approvalId: string) => {
  await del(approvalId)
}

Stop Pipeline Early

Use e.stopPipeline() from onStepComplete to end the run after a specific step.

await stepkit<{ n: number }>()
  .step('s1', ({ n }) => ({ n: n + 1 }))
  .step('s2', ({ n }) => ({ n: n + 1 }))
  .step('s3', ({ n }) => ({ n: n + 1 }))
  .run(
    { n: 0 },
    {
      onStepComplete: (e) => {
        if (e.stepName === 's2') e.stopPipeline()
      },
    },
  )
// => { n: 2 }

Type Helpers

Infer names, inputs, and outputs anywhere you need them.

import { StepNames, StepInput, StepOutput } from 'stepkit'

const simple = stepkit<{ id: string }>()
  .step('fetch-user', ({ id }) => ({ name: 'John', id }))
  .step('process', ({ name }) => ({ result: name.toUpperCase() }))

type Names = StepNames<typeof simple> // 'fetch-user' | 'process'
type ProcessInput = StepInput<typeof simple, 'process'> // { id: string; name: string }
type AfterFetch = StepOutput<typeof simple, 'fetch-user'> // { id: string; name: string }
type FinalOutput = StepOutput<typeof simple> // { id: string; name: string; result: string }

Features

  • Type-safe — Types flow through each step automatically
  • Parallel execution — Run steps concurrently when possible
  • Branching — Conditional logic with reusable pipeline branches
  • Composable — Import and combine pipelines from separate files
  • Observable — Opt-in logging with timing and performance tracking
  • Zero dependencies — Minimal and simple, understand instantly

Why?

Built as a lightweight alternative to larger frameworks. No ceremony, just compose pipelines with full type safety.

License

MIT