npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

@classytic/streamline

v2.6.0

Published

MongoDB-native durable workflow orchestration engine. Like Temporal but simpler - supports sleep, wait, retry, parallel execution, human-in-the-loop, and crash recovery. Perfect for payment gateways, approval flows, and scheduled tasks.

Readme

@classytic/streamline

MongoDB-native durable workflow engine. Sleep / wait / retry / parallel / human-in-the-loop / event triggers / idempotency / concurrency / crash recovery — zero infrastructure beyond MongoDB.

draft → running → waiting ↔ running → done
                     ↓         ↓
                  failed   cancelled

Install

npm install @classytic/streamline @classytic/mongokit @classytic/primitives @classytic/repo-core mongoose

Peer deps: @classytic/mongokit >=3.14, @classytic/primitives >=0.6, @classytic/repo-core >=0.5, mongoose >=9.4.1. Optional: @opentelemetry/api >=1.0. (mongokit ≥3.14 ships Repository.claim() + MongoOperatorUpdate — both load-bearing in streamline ≥2.4.)

Design philosophy — what streamline is not

Streamline is a state-machine durable workflow engine, not a deterministic-replay one. Each step's input/output is checkpointed to MongoDB; on crash or restart the scheduler resumes from the last completed step. The orchestrator code is not re-executed from an event log to reconstruct state (the model used by Inngest, Temporal, and Vercel Workflow SDK).

What you get from this choice:

  • No compiler magic. Plain async functions. No SWC plugins, no sandboxed VM, no "use workflow" pragmas, no two-bundle splits.
  • No determinism constraints. Date.now(), Math.random(), crypto.randomUUID(), closures over mutable state — all fine inside step handlers. Replay engines forbid these or require step.run wrappers around them.
  • Smaller mental model. A run is a Mongo document; steps are entries in run.steps[]. You can read it, query it, index it, dump it.
  • Zero new infrastructure. Reuses your existing Mongo connection. No Redis, no Postgres, no separate server process.

What you give up:

  • No free time-travel debugging from an event log (the run document is the audit trail; stepLogs is the per-step log).
  • Long-running orchestrators with branchy logic are less memory-efficient than a replay engine — streamline keeps step outputs in the run doc rather than re-deriving them.
  • Cross-step transactional rollback isn't free — you write compensation steps yourself.

If your workload is "background jobs and multi-step workflows on a Mongo-backed app," this is the right trade. If you need deterministic replay across language boundaries or month-long orchestrators, reach for Temporal/Inngest.

Guarantees (Mongo-native)

streamline needs only MongoDB. Redis / Kafka / BullMQ are optional — for cross-process event wake and external fan-out — never required for correctness or durability.

| Property | Guarantee | |---|---| | Durable | Run + step state is persisted to MongoDB with { w: 'majority', j: true } before each transition is acknowledged. A crash/restart loses nothing committed; the scheduler resumes from the exact failed/incomplete step. | | At-least-once step execution | A step runs at least once. Crash recovery, retries, and stale-run reclaim can re-run a step whose side effects partially applied. The engine guarantees a step is claimed by exactly one worker at a time (atomic CAS) — not that its external side effects happen exactly once. | | Idempotent handlers required | Because execution is at-least-once, step + onCompensate handlers must be idempotent for any external side effect (charge, email, API write). Pass ctx.idempotencyKey(scope?) (stable across retries/recovery) as the provider's idempotency key. The engine gives effectively-once for same-cluster Mongo writes via atomic claims; external exactly-once is your handler + the key. | | Exactly-once start (dedup) | start({ idempotencyKey }) yields at most one active run per key (partial-unique index). Reusable once the prior run settles (incl. compensated/compensation_failed). | | Multiple workers | Safe. Every scheduler pickup is workflowId-scoped + a routing guard, and every transition is an atomic compare-and-set, so two workers never double-drive a run or double-resume a step. Recovery is scheduler-push (a polling worker re-drives stale runs) — there is no explicit worker-lease protocol or pulled task queue. | | Ordering | Steps within a run run in order (goto / parallel-join are explicit exceptions). No global cross-run ordering guarantee. | | Strict concurrency | concurrency: { limit, strict: true } is a hard cap (atomic counter); plain limit is best-effort. Counters are global per (workflowId, key) — put the tenant in key for per-tenant caps; repair drift with concurrencyCounterRepo.reconcile(workflowId, key?). |

Optional external infrastructure (and what each is for, not required for):

  • Redis / Kafka (signal store) — only for ctx.waitFor(event) wake across processes. Same-process events need zero extra infra. Plug in via the SignalStore interface.
  • BullMQ / external job queues — not used and not needed; the scheduler + MongoDB are the durable queue.
  • Monitoring / dashboards / DLQ UI — consume the event bus (step:* / workflow:*) + OpenTelemetry hooks into your own stack.

See docs/DISTRIBUTED-READINESS.md for operating at scale (timer opt-out, payload-size limits, per-tenant concurrency keys, BYO signal store).

Quick start

import mongoose from 'mongoose';
import { createWorkflow } from '@classytic/streamline';

await mongoose.connect('mongodb://localhost/myapp'); // reuse your existing connection

const scraper = createWorkflow('web-scraper', {
  steps: {
    fetch: async (ctx) => ({ html: await fetch(ctx.context.url).then(r => r.text()) }),
    parse: async (ctx) => ({ data: parseHTML(ctx.getOutput('fetch').html) }),
    save:  async (ctx) => ({ saved: await db.save(ctx.getOutput('parse').data) }),
  },
  context: (input: { url: string }) => ({ url: input.url }),
  version: '1.0.0',
});

const run = await scraper.start({ url: 'https://example.com' });
// Auto-executes. Use `await scraper.waitFor(run._id)` to block until done.

Typed step outputs (v2.6)

Declare an outputs interface once and every handler gets typed, typo-checked access to sibling step outputs via ctx.outputs — no getOutput<T>() casts:

interface Outputs {
  fetch: { html: string };
  parse: { data: Item[] };
  save:  { saved: boolean };
}

const scraper = createWorkflow<{ url: string }, { url: string }, Outputs>('web-scraper', {
  steps: {
    fetch: async (ctx) => ({ html: await fetch(ctx.context.url).then(r => r.text()) }),
    parse: async (ctx) => ({ data: parseHTML(ctx.outputs.fetch?.html ?? '') }), // ← typed
    save:  async (ctx) => ({ saved: await db.save(ctx.outputs.parse?.data) }),
  },
  context: (input) => ({ url: input.url }),
});

Declared steps are checked both ways (missing/extra step = compile error, wrong return shape = compile error). Skip the third generic and everything behaves exactly as before — ctx.outputs.x is unknown, getOutput remains for dynamic step IDs.

Step features

Retries, timeouts, conditions

createWorkflow('ci', {
  steps: {
    clone: async (ctx) => ({ repo: 'cloned' }),                 // plain handler
    build: {
      handler: async (ctx) => ({ artifact: 'build.tar.gz' }),
      timeout: 120_000,
      retries: 5,
      retryDelay: 1_000,
      retryBackoff: 'exponential',                               // or 'linear' | multiplier number
    },
    deploy: {
      handler: async (ctx) => ({ deployed: true }),
      skipIf: (ctx) => !ctx.shouldDeploy,                        // typed as your TContext
    },
  },
  context: (input: { deploy: boolean }) => ({ shouldDeploy: input.deploy }),
  defaults: { retries: 3, timeout: 30_000 },
});

Human-in-the-loop

steps: {
  wait: async (ctx) => { await ctx.wait('Please approve', { request: ctx.context.data }); },
  run:  async (ctx) => { const approval = ctx.getOutput('wait'); /* ... */ },
}
// later: await workflow.resume(runId, { approved: true, by: 'admin' });

Sleep (durable timer)

await ctx.sleep(3_600_000); // survives process restart; scheduler re-picks it up

Wait for event

const data = await ctx.waitFor('user-action');
// Resumes on: globalEventBus.emit('user-action', { runId, data }),
// container.eventBus.emit(...), or a cross-process signal store.

Parallel / scatter

import { executeParallel } from '@classytic/streamline';

await executeParallel([
  () => fetch('https://api1'),
  () => fetch('https://api2'),
], { mode: 'all' }); // 'all' | 'race' | 'any' | 'allSettled'

// Checkpointed scatter — resumes from where it crashed:
await ctx.scatter({ a: () => fetchA(), b: () => fetchB() }, { concurrency: 4 });

Durable loop (agent loop)

ctx.loop runs a body until it reports done, durably checkpointing state after every iteration (each checkpoint also heartbeats, so long loops never trip the stale detector). Crash/retry recovery resumes from the last committed iteration — completed iterations never re-run.

steps: {
  agent: async (ctx) => {
    const final = await ctx.loop(
      { messages: [seedPrompt] },                       // durable state
      async (state, i) => {
        const reply = await llm.chat(state.messages, {
          idempotencyKey: ctx.idempotencyKey(`iter:${i}`), // effectively-once per iteration
        });
        return {
          state: { messages: [...state.messages, reply] },
          done: reply.stopReason === 'end_turn',
        };
      },
      { maxIterations: 50 },  // hard cap — exceeding it fails NON-retriably
    );
    return { answer: final.messages.at(-1) };
  },
}

Each iteration is at-least-once (an interrupted iteration re-runs from its start) — pass ctx.idempotencyKey('iter:N') to external side effects. ctx.loop owns the step's checkpoint slot; don't mix it with ctx.checkpoint() / ctx.scatter() in the same step.

Goto & rewind

await ctx.goto('retry-step');                    // jump inside a step handler
await workflow.rewindTo(runId, 'some-earlier');  // external rewind

Long-running steps

Heartbeats fire automatically every 30 s. Use ctx.heartbeat() inside tight loops to push a beat between batches. After TIMING.HEARTBEAT_FAILURE_ABORT_THRESHOLD (default 5) consecutive heartbeat-write failures, the executor aborts the step so the stale-detector can't cause a double execution.

Step middleware (observability seam)

Cross-cutting hooks around every step — metrics, tracing, token metering, structured logging — without wrapping handlers. Observability-only: a hook that throws is logged and swallowed; middleware can never veto, fail, or suspend a step.

createWorkflow('agent', {
  steps: { /* ... */ },
  middleware: [
    {
      name: 'token-meter',
      beforeStep: ({ runId, stepId }) => tracer.startSpan(stepId),
      afterStep: ({ stepId, output, durationMs }) => {
        metrics.timing(stepId, durationMs);
        const usage = (output as { usage?: { totalTokens?: number } })?.usage;
        if (usage?.totalTokens) meter.record(stepId, usage.totalTokens);
      },
      onStepError: ({ stepId, error }) => alerting.report(stepId, error),
      onWait: ({ stepId, waitType }) => metrics.inc(`wait.${waitType}`),
    },
  ],
});

Streaming progress (ctx.stream)

Non-durable, at-most-once frames for live UIs — LLM tokens, percent-complete. Emitted as step:stream on the container bus (+ cross-process signal store); never persisted, never load-bearing. Arc's SSE endpoint delivers them to browsers out of the box.

steps: {
  generate: async (ctx) => {
    for await (const token of llm.stream(prompt)) {
      ctx.stream({ token }); // { runId, stepId, seq, frame } on 'step:stream'
    }
    return { text: full };
  },
}

Payload limits (16MB doc safety)

Outputs, checkpoints, context and history live inline on the run document (Mongo caps documents at 16MB BSON). Outputs/checkpoints over 1MB log a warning naming the run + step. Opt into a hard cap to fail loudly instead of dying later at the BSON limit:

createWorkflow('render', {
  steps: { /* ... */ },
  maxPayloadBytes: 2_000_000, // over this → step fails NON-retriably
});
// For large artifacts store a reference (S3 key, file path, doc id), not the payload.

Non-retriable failures

import { NonRetriableError } from '@classytic/streamline';
if (!valid(ctx.input)) throw new NonRetriableError('Bad input — no retry');

Distributed primitives

Idempotent start

// Only one active run per key. Reusable after completion/failure.
await workflow.start(input, { idempotencyKey: `order:${id}` });

Concurrency limit

createWorkflow('charge', {
  steps: { /* ... */ },
  concurrency: { limit: 5, key: (input) => input.userId },
  // Excess queued as draft; auto-promoted when a slot frees (no scheduler wait).
});

Throttle (best-effort start-rate smoothing — not a strict rate limiter)

createWorkflow('send-receipt', {
  steps: { /* ... */ },
  concurrency: {
    key: (input) => input.userId,
    throttle: { limit: 10, windowMs: 60_000 }, // smooth starts toward 10/user/min
  },
});
// First `limit` starts in any rolling window fire immediately. Excess starts
// queue as scheduled drafts and are spread by `windowMs / limit` (1 every 6 s
// in this example) so a burst of 100 doesn't all land on a single future slot.
// The scheduler picks them up — no dropped calls.

Honest contract. This is best-effort smoothing, not a distributed strict rate limiter:

  • Sequential safety: ✅ Within one process, sequential start() calls produce strictly-staggered queued slots — tail.executionTime + windowMs / limit. Bursts smooth correctly.
  • Parallel races: ⚠️ Two concurrent start() calls can read the same tail before either persists, so they reserve the same future slot. Burst correctness is bounded by parallelism, not by limit.
  • What you get: "starts will be smoothed toward limit / windowMs under typical load."
  • What you don't get: "at most limit starts will ever fire in any windowMs window."

If you need a strict distributed rate limit (payment captures, partner API quotas), wrap start() in your own atomic counter / Redis token-bucket / semaphore — streamline's throttle is for "don't overload the embedding API," not "exactly N or fail."

Debounce (collapse rapid bursts)

createWorkflow('rebuild-search-index', {
  steps: { /* ... */ },
  concurrency: {
    key: (input) => input.tenantId,
    debounce: { windowMs: 30_000 }, // fire once 30s after the last start call
  },
});
// Trailing-edge: each start atomically pushes the timer forward and overwrites
// `input` / `context` with the latest values. Lodash semantics. The single run
// that eventually fires sees the most recent input. Use a global key
// (`key: () => 'global'`) for workflow-wide debounce.

throttle and debounce are mutually exclusive — debounce already collapses bursts to one fire per quiet window. Both require key. key: () => 'global' gives you a workflow-wide bucket.

Event triggers & auto-cancel

createWorkflow('onboard', {
  steps: { /* ... */ },
  trigger:  { event: 'user.created' },         // auto-start
  cancelOn: [{ event: 'user.cancelled' }],     // auto-cancel
});

Priority

await workflow.start(input, { priority: 10 }); // higher = picked up first

Child workflows

await ctx.startChildWorkflow('billing', { orderId });
// Parent waits for child. Child's output becomes the step's output.

Race semantics — what's closed, what isn't

Distributed primitives have specific guarantees. Honest summary:

| Primitive | Guarantee | Mechanism | |---|---|---| | idempotencyKey | Race-safe. Concurrent starts with the same key produce exactly one active run. | Partial unique index on idempotencyKey (filtered to non-terminal statuses) + E11000 catch in repository.create(). The losing insert returns the winning run instead of throwing. | | Scheduler claim (waiting → running, draft → running) | Race-safe. A given run is claimed by exactly one worker. | Atomic findOneAndUpdate with status guard in the filter. Plugin hooks fire on the claim. | | concurrency.limit | Best-effort, advisory. Steady-state correct; under bursts of concurrent starts/promotions the limit can briefly oversubscribe. | Count-then-create (and count-then-promote on slot release). Not wrapped in a transaction or counter doc. | | concurrency.throttle | Best-effort smoothing, not a strict distributed rate limiter. Sequential bursts get strictly-staggered slots (tail + windowMs/limit). Parallel concurrent starts can reserve the same slot — bounded by parallelism, not by limit. Use an external token-bucket / counter for strict guarantees. | | concurrency.debounce | Race-safe for the bump path. Each start is a single atomic findOneAndUpdate against (workflowId, concurrencyKey); the timer push is serialized by Mongo. The fall-through "no draft existed yet → create one" can race two creates if the first start in a quiet window arrives twice within the same millisecond — bounded to two drafts max, both at the same executionTime. |

If your workload is concurrency-sensitive in the strict sense — payment captures, ticket reservations, license-seat allocation — use concurrency: { limit, strict: true } (atomic counter; repair drift with concurrencyCounterRepo.reconcile(workflowId, key?)), or wrap the start call in your own gate (Redis lock, partial-unique resource index per PACKAGE_RULES §32). The best-effort concurrency.limit is fine for "don't overload the embedding API" but not for "exactly N seats."

Running at scale / multi-worker / multi-tenant? Read docs/DISTRIBUTED-READINESS.md — it covers per-workflow scheduler scoping (v2.4), strict vs best-effort concurrency, tenant-scoped concurrency keys, the scheduler.inMemoryTimers opt-out, inline payload size limits, and bring-your-own signal-store/monitoring. streamline is durable and multi-worker-capable, but not Temporal-grade distributed by default — that doc states each boundary plainly.

Multi-tenant

import { createContainer, createWorkflow } from '@classytic/streamline';

const container = createContainer({
  repository: { multiTenant: { tenantField: 'context.tenantId', strict: true } },
});

const wf = createWorkflow('my-flow', { steps: { /* ... */ }, container });
// Every read/write/updateMany/deleteMany is tenant-scoped automatically.
// `bypassTenant: true` for admin ops; `staticTenantId` for single-tenant deployments.

Works with the MongoKit plugin surface (cache, audit, observability, field-filter, etc.). Every read and write — including atomic claims like the scheduler's findOneAndUpdate race — routes through inherited Repository<TDoc> methods, so plugin hooks fire on engine writes, not just user writes. Tenant scope applies to updateMany / deleteMany and to the atomic-claim path; bypassTenant: true is the explicit opt-out for _id-scoped admin operations.

Indexing

import { WorkflowRunModel } from '@classytic/streamline';

await WorkflowRunModel.collection.createIndexes([
  { key: { 'context.tenantId': 1, status: 1 } },
  { key: { 'context.url': 1, workflowId: 1 } },
  // TTL: auto-delete finished runs after 30 days
  { key: { updatedAt: 1 }, expireAfterSeconds: 30 * 86400,
    partialFilterExpression: { status: { $in: ['done', 'failed', 'cancelled'] } } },
]);

Retention — TTL, tenant compounds, stale sweeper

createContainer({ retention }) consolidates the three operational knobs hosts previously had to wire by hand: a TTL on terminal runs, the tenant-prefixed compound multi-tenant deployments need, and a sweep that terminates runs whose worker crashed.

import { createContainer } from '@classytic/streamline';

const container = createContainer({
  repository: { multiTenant: { tenantField: 'context.organizationId', strict: true } },
  retention: {
    terminalRunsTtlSeconds: 30 * 24 * 60 * 60,   // GC done/failed/cancelled after 30d
    staleHeartbeatThresholdMs: 30 * 60 * 1000,   // mark as failed after 30 min no heartbeat
    staleRunSweepIntervalMs: 60_000,             // sweep every minute
    staleRunAction: 'fail',                      // or 'cancel'
  },
});

// Deploy-time — call ONCE after mongoose connects (PACKAGE_RULES §32):
await mongoose.connect(uri);
await container.syncRetentionIndexes();          // idempotent, drop+rebuild on TTL change

// Graceful shutdown — sweeper timer is unref()'d, so this is optional:
container.dispose();

| Knob | Default | Effect | | ----------------------------- | ------- | ---------------------------------------------------------------------------------------------- | | terminalRunsTtlSeconds | off | TTL index {endedAt:1} with partialFilterExpression: {endedAt:{$exists:true}, status:{$in:[done,failed,cancelled]}}. | | multiTenantIndexes | true when repo is multi-tenant | Builds {<tenantField>:1, workflowId:1, createdAt:-1} so org-scoped lists hit a covering index. | | staleHeartbeatThresholdMs | off | Setting this auto-starts StaleRunSweeper. Pick well above the engine heartbeat AND any step's max execution time. | | staleRunSweepIntervalMs | 60_000 | Self-rescheduling setTimeout (no overlap), unref()'d. | | staleRunAction | 'fail'| 'fail' ⇒ status failed + error.code === 'stale_heartbeat'; 'cancel' ⇒ status cancelled. | | staleRunBatchSize | 100 | Cap per sweep cycle. | | maxStaleRecoveries | 5 | After N recoveries, sweeper marks the run failed with error.code === 'dead_lettered' instead of recycling it. |

The sweeper terminates via the repository's atomic markStaleAsFailed() CAS, so it can't race the engine's recoverStale() (which re-executes from the last heartbeat). Run both with different thresholds — recover at 5 min for transient crashes, terminate at 30 min as a backstop — and the longer threshold acts as a give-up signal so the scheduler can move on.

Both paths bump WorkflowRun.recoveryAttempts atomically. Once it hits maxStaleRecoveries, the next sweep cycle dead-letters the run with error.code === 'dead_lettered' instead of recycling it — closes the crash-recover-crash-recover loop bug for permanently broken runs. Hosts build their "stuck runs" dashboard off this field directly.

In-flight version pinning

Long-running workflows survive engine deploys. Every run carries definitionVersion (snapshotted from WorkflowDefinition.version at create-time); on resume the engine routes execution to the version-pinned engine via workflowRegistry.lookupVersion(workflowId, version).

// Deploy v1 — registers automatically.
const v1 = createWorkflow('billing', {
  version: '1.0.0',
  steps: { charge: chargeV1 },
});

const run = await v1.start({ orderId: '123' });   // run.definitionVersion === '1.0.0'

// Later: deploy v2 in the SAME process (rolling node).
const v2 = createWorkflow('billing', {
  version: '2.0.0',
  steps: { charge: chargeV2, refund: refundV2 },
});

await v1.engine.execute(run._id);   // routes to v1 — chargeV1 runs
await v2.start({ orderId: '456' }); // run.definitionVersion === '2.0.0'

When the original engine isn't registered (you stopped registering v1 because no v1 runs should exist), provide migrateRun:

createWorkflow('billing', {
  version: '2.0.0',
  steps: { charge: chargeV2 },
  migrateRun: async (run) => {
    if (run.definitionVersion === '1.0.0' && run.currentStepId === 'oldCharge') {
      return {
        currentStepId: 'charge',
        steps: run.steps.map((s) =>
          s.stepId === 'oldCharge' ? { ...s, stepId: 'charge' } : s,
        ),
      };
    }
    return null; // fall through — engine fails the run with VERSION_MISMATCH
  },
});

Runs created before v2.3.3 have no definitionVersion; the engine falls through to the active definition (back-compat preserved).

Query & cleanup

// Query directly via the Mongoose model:
await WorkflowRunModel.find({ workflowId: 'web-scraper', status: 'running' })
  .sort({ createdAt: -1 }).limit(50).lean();

// Or via the repository (v2.2 class primitives, tenant-scoped):
import { workflowRunRepository as repo } from '@classytic/streamline';
await repo.deleteMany({ status: { $in: ['done', 'failed'] },
                       updatedAt: { $lt: cutoff } });
await repo.updateMany({ status: 'waiting' }, { $set: { status: 'cancelled' } });

Webhooks

import { createHook, resumeHook } from '@classytic/streamline';

// In a step — MUST pass `hookToken` to ctx.wait so resumeHook can validate it.
// Without it, resumeHook fails closed (no token in the run = no resume).
const hook = createHook(ctx, 'awaiting-approval');
console.log(hook.path); // /hooks/<token>
await ctx.wait('Awaiting approval', { hookToken: hook.token });

// In your HTTP handler:
app.post('/hooks/:token', async (req, res) => {
  const { runId, run } = await resumeHook(req.params.token, req.body);
  res.json({ runId, status: run.status });
});

Observability

Legacy event bus (in-process)

import { globalEventBus } from '@classytic/streamline';

globalEventBus.on('workflow:started',   ({ runId }) => metrics.inc('wf.start', { runId }));
globalEventBus.on('workflow:completed', ({ runId }) => metrics.inc('wf.done', { runId }));
globalEventBus.on('workflow:failed',    ({ runId, data }) => alert({ runId, error: data.error }));
globalEventBus.on('engine:error',       ({ runId, error, context }) => log.error({ runId, context, error }));

Events: workflow:started|completed|failed|waiting|resumed|cancelled|recovered|retry|compensating, step:started|completed|failed|waiting|skipped|retry-scheduled|compensated, engine:error, scheduler:error|circuit-open.

Arc-compatible transport (cross-process)

import { createContainer, createWorkflow } from '@classytic/streamline';
import { RedisEventTransport } from '@classytic/arc/events';

const container = createContainer({
  eventTransport: new RedisEventTransport({ url: process.env.REDIS_URL }),
});

await container.eventTransport.subscribe('streamline:workflow.*', async (event) => {
  // { type: 'streamline:workflow.completed', payload: { runId }, meta: { id, timestamp, resourceId, userId, ... } }
  await auditLog.write(event);
});

const wf = createWorkflow('order', { steps: { /* ... */ }, container });

Canonical names: streamline:<resource>.<verb>. Without eventTransport, streamline uses an in-process InProcessStreamlineBus (same matchEventPattern as arc). The legacy globalEventBus keeps working unchanged — the transport is an additive bridge.

OpenTelemetry

Import from @classytic/streamline/telemetry; see package exports.

Logger

import { configureStreamlineLogger } from '@classytic/streamline';
configureStreamlineLogger({ enabled: false });          // silence
configureStreamlineLogger({ level: 'debug' });          // verbose
configureStreamlineLogger({ transport: pinoAdapter });  // pino/winston/etc.

Scheduler

Adaptive polling (10 s–5 min by load), circuit breaker, stale-running recovery, priority-ordered pickup, timezone-aware scheduling with DST handling (luxon).

Recurring schedules (v2.6)

SchedulingService schedules one-shot AND recurring runs; the engine drives the chain — after the scheduler claims a recurring occurrence, the claim winner spawns the next one as an idempotency-keyed draft (crash- and multi-worker-safe; no double-spawn).

import { SchedulingService } from '@classytic/streamline';

const scheduler = new SchedulingService(workflow.definition, handlers);
await scheduler.schedule({
  scheduledFor: '2026-07-01T09:00:00',          // local wall-clock time
  timezone: 'America/New_York',                  // stays 9am across DST
  input: { report: 'weekly' },
  recurrence: { pattern: 'weekly', daysOfWeek: [1] },  // Mondays
  // or: { pattern: 'daily' } · { pattern: 'monthly', dayOfMonth: 31 } (clamped)
  // or: { pattern: 'custom', cronExpression: '0 9 * * 1' } (tz-aware cron)
  // end the chain with `until: Date` or `count: N`
});

Semantics: no catch-up (occurrences missed while down are skipped, not replayed); occurrences are independent runs — add concurrency: { limit: 1 } to prevent overlap of long jobs. Malformed patterns throw at schedule() time. Preview the next firing with computeNextOccurrence(run.scheduling).

workflow.engine.configure({ scheduler: { maxConcurrentExecutions: 10 } });
// Cap concurrent runs. All slots full → scheduler skips the poll cycle.

API summary

Workflow (from createWorkflow): start(input, opts?) · execute(runId) · resume(runId, payload?) · get(runId) · cancel(runId) · pause(runId) · rewindTo(runId, stepId) · waitFor(runId, opts?) · shutdown()

StepContext (inside handlers): ctx.outputs (typed) · ctx.set · ctx.getOutput · ctx.wait · ctx.waitFor · ctx.sleep · ctx.loop · ctx.stream · ctx.heartbeat · ctx.emit · ctx.log · ctx.checkpoint · ctx.getCheckpoint · ctx.scatter · ctx.joinBranches · ctx.goto · ctx.startChildWorkflow · ctx.outputHistory · ctx.pinOutput · ctx.idempotencyKey · ctx.signal (AbortSignal)

Public exports (main entry): createWorkflow, WorkflowEngine, createContainer, createWorkflowRepository, WorkflowRunModel, WorkflowRunRepository, CommonQueries, executeParallel, createHook / resumeHook, globalEventBus, WorkflowEventBus, createEventSink, tenantFilterPlugin / singleTenantPlugin, SchedulingService, TimezoneHandler, computeNextOccurrence / validateRecurrence, WorkflowSteps (type). Update-doc builders: MongoUpdate, normalizeUpdate, runSet, runSetUnset, buildStepUpdateOps. Arc-compatible events: InProcessStreamlineBus, createEvent, bridgeBusToTransport, STREAMLINE_EVENTS, LEGACY_TO_CANONICAL, DomainEvent, EventTransport, EventHandler. Errors: WorkflowError, WorkflowNotFoundError, StepNotFoundError, StepTimeoutError, InvalidStateError, DataCorruptionError, MaxRetriesExceededError, NonRetriableError, ErrorCode.

Subpath entries: @classytic/streamline/fastify, @classytic/streamline/telemetry.

Error handling

WorkflowError (and every subclass) implements HttpError from @classytic/repo-core/errors. Three equivalent ways to identify an error — pick whichever matches your codebase:

import { WorkflowError, ErrorCode, ErrorCodeHierarchical } from '@classytic/streamline';

try { await workflow.resume(runId, payload); }
catch (err) {
  if (!(err instanceof WorkflowError)) throw err;

  // (A) Cleanest in HTTP layers — the status is right on the error.
  return res.status(err.status).json({ code: err.code, message: err.message, meta: err.meta });

  // (B) Hierarchical (HttpError-canonical) code — switch on `'workflow.X'` ids.
  switch (err.code) {
    case ErrorCodeHierarchical.WORKFLOW_NOT_FOUND: return res.status(404).end();
    case ErrorCodeHierarchical.INVALID_STATE:      return res.status(400).end();
    case ErrorCodeHierarchical.STEP_TIMEOUT:       return res.status(408).end();
    default: throw err;
  }

  // (C) Legacy SCREAMING_SNAKE — kept for backwards compat. New code should
  // prefer (A) or (B). The legacy value lives on `legacyCode` post-migration.
  switch (err.legacyCode) {
    case ErrorCode.WORKFLOW_NOT_FOUND: return res.status(404).end();
    // ...
  }
}

Status mapping (also exported as ERROR_STATUS_MAP):

| Code | Status | Hierarchical id | |---|---|---| | WORKFLOW_NOT_FOUND | 404 | workflow.not_found | | STEP_NOT_FOUND | 404 | workflow.step.not_found | | STEP_TIMEOUT | 408 | workflow.step.timeout | | WORKFLOW_CANCELLED / WORKFLOW_ALREADY_COMPLETED / EXECUTION_ABORTED | 409 | workflow.cancelled / | | INVALID_STATE / INVALID_TRANSITION / VALIDATION_ERROR | 400 | workflow.invalid_state / | | DATA_CORRUPTION / STEP_FAILED / MAX_RETRIES_EXCEEDED | 500 | workflow.data_corruption / |

Type-safe exports

import type {
  Workflow, WorkflowConfig, StepConfig, WaitForOptions,
  WorkflowRun, WorkflowEventName, EventPayloadMap,
} from '@classytic/streamline';

export const myWorkflow: Workflow<MyCtx, MyInput> = createWorkflow('my', { /* ... */ });

Examples

See docs/examples/: hello-world, wait, sleep, parallel, conditional, newsletter automation, AI pipeline.

Testing your workflows

npm test            # dev loop — unit + integration (fast)
npm run test:all    # before publish — unit + integration + long (full gate; prepublishOnly runs this)
npm run test:long   # nightly / slow CI — long-running scenarios only

See TESTING.md for tier conventions + helpers, and docs/contributing/PUBLISH_CHECKLIST.md for the release flow.

What's new in 2.6

  • Typed step outputs — declare a TOutputs interface once; ctx.outputs.stepName is fully typed and typo-checked, returns checked against the declaration. Zero change when undeclared.
  • ctx.loop — durable agent-loop primitive: per-iteration checkpoint + heartbeat, crash-resume from the last committed iteration, non-retriable maxIterations cap.
  • Recurring schedulesdaily/weekly/monthly/cron recurrence is now driven by the engine (idempotency-keyed next-occurrence spawn; DST-aware; no catch-up; until/count).
  • maxPayloadBytes — opt-in hard cap on step output/checkpoint size (non-retriable failure with a clear message); 1MB warn-only by default.
  • Heartbeat resilience — in-tick retry + overlap guard; a single transient DB blip no longer counts toward the abort threshold.
  • Step middleware — observability-only beforeStep/afterStep/onStepError/onWait hooks (metrics, tracing, token metering); hook errors are swallowed, never affecting execution.
  • ctx.stream — non-durable, at-most-once progress frames (step:stream event) for live UIs; abort-safe (post-cancel frames are silently dropped).

See CHANGELOG.md for details.

What's new in 2.2

  • Arc-compatible event transport — drop any arc transport (Memory / Redis / Kafka / BullMQ) into createContainer({ eventTransport }) and subscribe glob-style.
  • mongokit 3.11 / repo-core 0.2 surfaceupdateMany / deleteMany as class primitives, portable Update IR, UpdatePatch<TDoc> rename.
  • ctx.waitFor() resumes from globalEventBus (was broken in 2.1).
  • Tenant scope on bulk opstenantFilterPlugin now hooks updateMany / deleteMany. Security-relevant if you were on 2.1 with bulk writes.
  • Heartbeat backpressure — aborts a step after N consecutive heartbeat failures to prevent double execution via stale re-claim.
  • updateOne() guardrail — rejects mixed { $set, rawField } updates instead of letting Mongo silently drop keys.
  • Exhaustive LEGACY_TO_CANONICAL map — adding a new event without mapping it is a compile error.

See CHANGELOG.md for the full list.

License

MIT · Issues + PRs welcome at github.com/classytic/streamline