npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

@gravito/flux

v2.0.0

Published

Platform-agnostic workflow engine for Gravito

Readme

@gravito/flux

Gravito 的高效能工作流程引擎,跨平台、型別安全的狀態機,強調可追蹤、可重播與可靠重試。

定位與價值

  • 工作流標準化:在框架內用同一套模型描述流程,而不是每個服務自行手寫流程。
  • 可靠性與可觀測:狀態機 + 重試 + trace sink,讓流程可追蹤、可審計、可恢復。
  • 可抽離與可複用:Flux 可獨立於框架使用,讓 workflow 本身成為可移植的 workflow as code。

API 語意(input / step / commit)

  • input<T>():定義輸入資料型別,提供型別推斷與編輯器提示。
  • step(name, handler, options):一般步驟,會依序執行,可設定重試/逾時/條件。
  • commit(name, handler, options):具副作用的步驟(寫庫、扣款、通知),在重播或重跑時語意更明確。

最佳實務與注意事項

  • 儲存介面:Memory 僅適合開發與測試,正式環境請使用持久化 storage。
  • 版本變更:workflow 定義改動時,既有流程的重跑需評估相容性。
  • 重試策略:外部依賴錯誤才重試,業務邏輯錯誤建議直接 fail。
  • Step vs Commit:有副作用的操作應放在 commit,確保重播一致性。

核心特色

  • 純狀態機模型 - 以明確狀態描述流程,讓每一步清晰可控
  • 鏈式 Builder API - 型別安全的流程定義方式
  • 儲存介面 - Memory、Bun SQLite,其他儲存可自訂
  • 重試與逾時 - Step 可設定 retries/timeout/when,面對不穩定依賴也能自動恢復
  • 同步/非同步 - 同時支援同步流程與非同步長流程
  • 事件 Hooks - 監聽流程與步驟生命週期
  • 事件 Hooks - 監聽流程與步驟生命周期
  • 雙平台支援 - Bun 與 Node.js 均可使用

安裝

bun add @gravito/flux

# npm
npm install @gravito/flux

快速開始

import { FluxEngine, createWorkflow } from '@gravito/flux'

const orderFlow = createWorkflow('order-process')
  .input<{ orderId: string }>()
  .step('fetch', async (ctx) => {
    ctx.data.order = await db.orders.find(ctx.input.orderId)
  })
  .step('validate', async (ctx) => {
    if (!ctx.data.order.isPaid) throw new Error('Unpaid order')
  })
  .commit('fulfill', async (ctx) => {
    await fulfillment.ship(ctx.data.order)
  })

const engine = new FluxEngine()
const result = await engine.execute(orderFlow, { orderId: '123' })

範例

訂單履約

const orderWorkflow = createWorkflow('order-fulfillment')
  .input<{ orderId: string; items: Item[] }>()
  .step('validate', async (ctx) => {
    for (const item of ctx.input.items) {
      const stock = await db.products.getStock(item.productId)
      if (stock < item.qty) throw new Error(`Out of stock: ${item.productId}`)
    }
  })
  .step(
    'payment',
    async (ctx) => {
      ctx.data.payment = await payment.charge(ctx.input.orderId)
    },
    { retries: 3, timeout: 30_000 }
  )
  .commit('deduct', async (ctx) => {
    await inventory.deduct(ctx.data.reservationIds)
  })

影像處理

const uploadWorkflow = createWorkflow('image-processing')
  .input<{ fileBuffer: Buffer; fileName: string; userId: string }>()
  .step('validate', async (ctx) => {
    if (ctx.input.fileBuffer.length > 10 * 1024 * 1024) {
      throw new Error('File size exceeds 10MB')
    }
  })
  .step('resize', async (ctx) => {
    ctx.data.thumbnail = await sharp(ctx.input.fileBuffer).resize(200).toBuffer()
  })
  .commit('upload', async (ctx) => {
    ctx.data.url = await s3.upload(ctx.input.fileName, ctx.data.thumbnail)
  })

報表生成

const reportWorkflow = createWorkflow('generate-report')
  .input<{ reportType: string; dateRange: DateRange; requestedBy: string }>()
  .step('fetch-data', async (ctx) => {
    ctx.data.sales = await db.orders.aggregate(ctx.input.dateRange)
  }, { timeout: 60_000 })
  .step('calculate', async (ctx) => {
    ctx.data.metrics = { revenue: 0, orders: 0 }
  })
  .commit('upload', async (ctx) => {
    ctx.data.url = await s3.upload(`reports/${ctx.id}.pdf`, ctx.data.pdf)
  })

API

createWorkflow(name)

const flow = createWorkflow('my-workflow')
  .input<{ value: number }>()
  .step('step1', handler)
  .step('step2', handler, { retries: 3 })
  .commit('save', handler)

FluxEngine

const engine = new FluxEngine({
  storage: new MemoryStorage(),
  defaultRetries: 3,
  defaultTimeout: 30_000,
  logger: new FluxConsoleLogger(),
  on: {
    stepStart: (step, ctx) => {},
    stepComplete: (step, ctx, result) => {},
    stepError: (step, ctx, error) => {},
    workflowComplete: (ctx) => {},
    workflowError: (ctx, error) => {},
  },
})

Step Options

.step('name', handler, {
  retries: 5,
  timeout: 60_000,
  when: (ctx) => ctx.data.x > 0,
})

暫停與信號 (Async Signals)

流程可以暫停並等待外部事件(如人工審核、Webhook 回調)。

.step('wait-approval', async () => {
    // 掛起流程,狀態轉為 suspended,釋放資源
    return Flux.wait('approval-signal')
})

// 喚醒流程
await engine.signal(workflow, id, 'approval-signal', { approved: true })

Saga Pattern (自動補償)

支援分散式事務的最終一致性。當流程失敗時,引擎會自動反向執行已定義的 compensate 邏輯。

.step('reserve-flight', 
  async (ctx) => {
    ctx.data.flightId = await api.bookFlight()
  },
  { 
    // 若後續步驟失敗,會自動執行此回滾邏輯
    compensate: async (ctx) => {
      await api.cancelFlight(ctx.data.flightId)
    }
  }
)

重跑指定步驟

const first = await engine.execute(flow, { orderId: 'ORD-001' })
await engine.retryStep(flow, first.id, 'charge')

與隊列搭配的應用

隊列消費者只負責觸發 Flux,Flux 會把任務拆成多步驟 workflow。當某一步失敗時,只重試該步,不會重跑已完成的步驟。

queue.on('message', async (job) => {
  await flux.execute(orderFlow, job.data)
})

搭配持久化 storage,已完成的步驟會被保留,避免重複執行。

Kafka 完整案例(事件 → 消費者 → Flux)

1) 事件觸發:把任務丟進 Kafka

import { Kafka } from 'kafkajs'

const kafka = new Kafka({ clientId: 'orders', brokers: ['localhost:9092'] })
const producer = kafka.producer()

await producer.connect()
await producer.send({
  topic: 'order.created',
  messages: [
    { key: 'ORD-001', value: JSON.stringify({ orderId: 'ORD-001', userId: 'u_1' }) },
  ],
})

2) 消費者接到任務後交給 Flux

import { Kafka } from 'kafkajs'
import { createWorkflow, FluxEngine, MemoryStorage } from '@gravito/flux'

const orderFlow = createWorkflow('order-flow')
  .input<{ orderId: string; userId: string }>()
  .step('validate', async (ctx) => {
    ctx.data.validated = true
  })
  .step('reserve', async (ctx) => {
    ctx.data.reserved = true
  })
  .commit('notify', async () => {})

const flux = new FluxEngine({ storage: new MemoryStorage() })

const kafka = new Kafka({ clientId: 'orders-worker', brokers: ['localhost:9092'] })
const consumer = kafka.consumer({ groupId: 'order-workers' })

await consumer.connect()
await consumer.subscribe({ topic: 'order.created' })
await consumer.run({
  eachMessage: async ({ message }) => {
    const payload = JSON.parse(message.value?.toString() ?? '{}')
    await flux.execute(orderFlow, payload)
  },
})

分支流程(多支點)

const flow = createWorkflow('event-routing')
  .input<{ payload: EventPayload }>()
  .step('classify', async (ctx) => {
    ctx.data.route = classify(ctx.input.payload)
  })
  .step(
    'auto-handle',
    async (ctx) => {
      ctx.data.result = await autoProcess(ctx.input.payload)
    },
    { when: (ctx) => ctx.data.route === 'auto' }
  )
  .step(
    'manual-review',
    async (ctx) => {
      ctx.data.ticketId = await ticketing.create(ctx.input.payload)
    },
    { when: (ctx) => ctx.data.route === 'manual' }
  )
  .step(
    'risk-audit',
    async (ctx) => {
      ctx.data.auditId = await auditQueue.enqueue(ctx.input.payload)
    },
    { when: (ctx) => ctx.data.route === 'risk' }
  )
  .commit('notify', async (ctx) => {
    await notifier.send(ctx.data)
  })

多節點任務(類似 n8n)

const flow = createWorkflow('multi-node')
  .input<{ type: 'email' | 'slack' | 'webhook'; payload: unknown }>()
  .step('classify', async (ctx) => {
    ctx.data.route = ctx.input.type
  })
  .step(
    'send-email',
    async (ctx) => {
      await email.send(ctx.input.payload)
    },
    { when: (ctx) => ctx.data.route === 'email' }
  )
  .step(
    'send-slack',
    async (ctx) => {
      await slack.send(ctx.input.payload)
    },
    { when: (ctx) => ctx.data.route === 'slack' }
  )
  .step(
    'call-webhook',
    async (ctx) => {
      await webhook.post(ctx.input.payload)
    },
    { when: (ctx) => ctx.data.route === 'webhook' }
  )
  .commit('audit', async (ctx) => {
    await audit.save(ctx.data)
  })

上圖對應 when 條件:只會走符合條件的分支,未符合者會被標記為 skipped。

可執行分支範例

bun run examples/branching.ts

本地開發視覺化

import { FluxEngine, JsonFileTraceSink } from '@gravito/flux'

const engine = new FluxEngine({
  trace: new JsonFileTraceSink({ path: './.flux/trace.ndjson', reset: true }),
})
flux dev --trace ./.flux/trace.ndjson --port 4280

驗證流程(本 repo)

bun run examples/trace-viewer.ts
bun run dev:viewer

企業級追蹤

透過 FluxTraceSink 可以把事件流送到你自己的監控、排程或分析模組,建立完整的執行查詢、重播與告警能力。

以微服務或 AWS Lambda 部署

Flux 可抽離成無狀態 workflow runner。把 workflow 定義與 input 交給函式執行,再將狀態與 trace 寫入外部儲存即可:

import { FluxEngine, JsonFileTraceSink, MemoryStorage, createWorkflow } from '@gravito/flux'

const workflow = createWorkflow('lambda-flow')
  .input<{ orderId: string }>()
  .step('prepare', async (ctx) => {
    ctx.data.ready = true
  })
  .commit('notify', async () => {})

const engine = new FluxEngine({
  storage: new MemoryStorage(),
  trace: new JsonFileTraceSink({ path: '/tmp/flux-trace.ndjson', reset: false }),
})

export const handler = async (event: { orderId: string }) => {
  const result = await engine.execute(workflow, { orderId: event.orderId })
  return { status: result.status, id: result.id }
}

其他雲端函式範例

GCP Cloud Functions(HTTP,需安裝對應套件並確認最新版本):

import type { HttpFunction } from '@google-cloud/functions-framework'

export const runFlux: HttpFunction = async (req, res) => {
  const payload = req.body ?? {}
  const result = await engine.execute(workflow, payload)
  res.json({ status: result.status, id: result.id })
}

Azure Functions(HTTP Trigger,需安裝對應套件並確認最新版本):

import type { AzureFunction, Context, HttpRequest } from '@azure/functions'

const httpTrigger: AzureFunction = async (context: Context, req: HttpRequest) => {
  const payload = req.body ?? {}
  const result = await engine.execute(workflow, payload)
  context.res = { status: 200, body: { status: result.status, id: result.id } }
}

export default httpTrigger

儲存介面

MemoryStorage

import { FluxEngine, MemoryStorage } from '@gravito/flux'
const engine = new FluxEngine({ storage: new MemoryStorage() })

BunSQLiteStorage (Bun only)

import { FluxEngine } from '@gravito/flux'
import { BunSQLiteStorage } from '@gravito/flux/bun'

const engine = new FluxEngine({
  storage: new BunSQLiteStorage({ path: './data/workflows.db' })
})

Gravito 整合

import { OrbitFlux } from '@gravito/flux'

const core = await PlanetCore.boot({
  orbits: [
    new OrbitFlux({
      storage: 'sqlite',
      dbPath: './data/workflows.db',
    })
  ]
})

const flux = core.services.get<FluxEngine>('flux')
await flux.execute(myWorkflow, input)

平台支援

| Feature | Bun | Node.js | |---------|-----|---------| | FluxEngine | ✅ | ✅ | | MemoryStorage | ✅ | ✅ | | BunSQLiteStorage | ✅ | ❌ | | OrbitFlux | ✅ | ✅ |