npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

@classytic/streamline

v2.3.3

Published

MongoDB-native durable workflow orchestration engine. Like Temporal but simpler - supports sleep, wait, retry, parallel execution, human-in-the-loop, and crash recovery. Perfect for payment gateways, approval flows, and scheduled tasks.

Readme

@classytic/streamline

MongoDB-native durable workflow engine. Sleep / wait / retry / parallel / human-in-the-loop / event triggers / idempotency / concurrency / crash recovery — zero infrastructure beyond MongoDB.

draft → running → waiting ↔ running → done
                     ↓         ↓
                  failed   cancelled

Install

npm install @classytic/streamline @classytic/mongokit @classytic/primitives mongoose

Peer deps: @classytic/mongokit >=3.13, @classytic/primitives >=0.1, mongoose >=9.4.1. Optional: @opentelemetry/api >=1.0. (3.13 ships Repository.claim() + MongoOperatorUpdate — both load-bearing in streamline ≥2.3.)

Design philosophy — what streamline is not

Streamline is a state-machine durable workflow engine, not a deterministic-replay one. Each step's input/output is checkpointed to MongoDB; on crash or restart the scheduler resumes from the last completed step. The orchestrator code is not re-executed from an event log to reconstruct state (the model used by Inngest, Temporal, and Vercel Workflow SDK).

What you get from this choice:

  • No compiler magic. Plain async functions. No SWC plugins, no sandboxed VM, no "use workflow" pragmas, no two-bundle splits.
  • No determinism constraints. Date.now(), Math.random(), crypto.randomUUID(), closures over mutable state — all fine inside step handlers. Replay engines forbid these or require step.run wrappers around them.
  • Smaller mental model. A run is a Mongo document; steps are entries in run.steps[]. You can read it, query it, index it, dump it.
  • Zero new infrastructure. Reuses your existing Mongo connection. No Redis, no Postgres, no separate server process.

What you give up:

  • No free time-travel debugging from an event log (the run document is the audit trail; stepLogs is the per-step log).
  • Long-running orchestrators with branchy logic are less memory-efficient than a replay engine — streamline keeps step outputs in the run doc rather than re-deriving them.
  • Cross-step transactional rollback isn't free — you write compensation steps yourself.

If your workload is "background jobs and multi-step workflows on a Mongo-backed app," this is the right trade. If you need deterministic replay across language boundaries or month-long orchestrators, reach for Temporal/Inngest.

Quick start

import mongoose from 'mongoose';
import { createWorkflow } from '@classytic/streamline';

await mongoose.connect('mongodb://localhost/myapp'); // reuse your existing connection

const scraper = createWorkflow('web-scraper', {
  steps: {
    fetch: async (ctx) => ({ html: await fetch(ctx.context.url).then(r => r.text()) }),
    parse: async (ctx) => ({ data: parseHTML(ctx.getOutput('fetch').html) }),
    save:  async (ctx) => ({ saved: await db.save(ctx.getOutput('parse').data) }),
  },
  context: (input: { url: string }) => ({ url: input.url }),
  version: '1.0.0',
});

const run = await scraper.start({ url: 'https://example.com' });
// Auto-executes. Use `await scraper.waitFor(run._id)` to block until done.

Step features

Retries, timeouts, conditions

createWorkflow('ci', {
  steps: {
    clone: async (ctx) => ({ repo: 'cloned' }),                 // plain handler
    build: {
      handler: async (ctx) => ({ artifact: 'build.tar.gz' }),
      timeout: 120_000,
      retries: 5,
      retryDelay: 1_000,
      retryBackoff: 'exponential',                               // or 'linear' | multiplier number
    },
    deploy: {
      handler: async (ctx) => ({ deployed: true }),
      skipIf: (ctx) => !ctx.shouldDeploy,                        // typed as your TContext
    },
  },
  context: (input: { deploy: boolean }) => ({ shouldDeploy: input.deploy }),
  defaults: { retries: 3, timeout: 30_000 },
});

Human-in-the-loop

steps: {
  wait: async (ctx) => { await ctx.wait('Please approve', { request: ctx.context.data }); },
  run:  async (ctx) => { const approval = ctx.getOutput('wait'); /* ... */ },
}
// later: await workflow.resume(runId, { approved: true, by: 'admin' });

Sleep (durable timer)

await ctx.sleep(3_600_000); // survives process restart; scheduler re-picks it up

Wait for event

const data = await ctx.waitFor('user-action');
// Resumes on: globalEventBus.emit('user-action', { runId, data }),
// container.eventBus.emit(...), or a cross-process signal store.

Parallel / scatter

import { executeParallel } from '@classytic/streamline';

await executeParallel([
  () => fetch('https://api1'),
  () => fetch('https://api2'),
], { mode: 'all' }); // 'all' | 'race' | 'any' | 'allSettled'

// Checkpointed scatter — resumes from where it crashed:
await ctx.scatter({ a: () => fetchA(), b: () => fetchB() }, { concurrency: 4 });

Goto & rewind

await ctx.goto('retry-step');                    // jump inside a step handler
await workflow.rewindTo(runId, 'some-earlier');  // external rewind

Long-running steps

Heartbeats fire automatically every 30 s. Use ctx.heartbeat() inside tight loops to push a beat between batches. After TIMING.HEARTBEAT_FAILURE_ABORT_THRESHOLD (default 5) consecutive heartbeat-write failures, the executor aborts the step so the stale-detector can't cause a double execution.

Non-retriable failures

import { NonRetriableError } from '@classytic/streamline';
if (!valid(ctx.input)) throw new NonRetriableError('Bad input — no retry');

Distributed primitives

Idempotent start

// Only one active run per key. Reusable after completion/failure.
await workflow.start(input, { idempotencyKey: `order:${id}` });

Concurrency limit

createWorkflow('charge', {
  steps: { /* ... */ },
  concurrency: { limit: 5, key: (input) => input.userId },
  // Excess queued as draft; auto-promoted when a slot frees (no scheduler wait).
});

Throttle (best-effort start-rate smoothing — not a strict rate limiter)

createWorkflow('send-receipt', {
  steps: { /* ... */ },
  concurrency: {
    key: (input) => input.userId,
    throttle: { limit: 10, windowMs: 60_000 }, // smooth starts toward 10/user/min
  },
});
// First `limit` starts in any rolling window fire immediately. Excess starts
// queue as scheduled drafts and are spread by `windowMs / limit` (1 every 6 s
// in this example) so a burst of 100 doesn't all land on a single future slot.
// The scheduler picks them up — no dropped calls.

Honest contract. This is best-effort smoothing, not a distributed strict rate limiter:

  • Sequential safety: ✅ Within one process, sequential start() calls produce strictly-staggered queued slots — tail.executionTime + windowMs / limit. Bursts smooth correctly.
  • Parallel races: ⚠️ Two concurrent start() calls can read the same tail before either persists, so they reserve the same future slot. Burst correctness is bounded by parallelism, not by limit.
  • What you get: "starts will be smoothed toward limit / windowMs under typical load."
  • What you don't get: "at most limit starts will ever fire in any windowMs window."

If you need a strict distributed rate limit (payment captures, partner API quotas), wrap start() in your own atomic counter / Redis token-bucket / semaphore — streamline's throttle is for "don't overload the embedding API," not "exactly N or fail."

Debounce (collapse rapid bursts)

createWorkflow('rebuild-search-index', {
  steps: { /* ... */ },
  concurrency: {
    key: (input) => input.tenantId,
    debounce: { windowMs: 30_000 }, // fire once 30s after the last start call
  },
});
// Trailing-edge: each start atomically pushes the timer forward and overwrites
// `input` / `context` with the latest values. Lodash semantics. The single run
// that eventually fires sees the most recent input. Use a global key
// (`key: () => 'global'`) for workflow-wide debounce.

throttle and debounce are mutually exclusive — debounce already collapses bursts to one fire per quiet window. Both require key. key: () => 'global' gives you a workflow-wide bucket.

Event triggers & auto-cancel

createWorkflow('onboard', {
  steps: { /* ... */ },
  trigger:  { event: 'user.created' },         // auto-start
  cancelOn: [{ event: 'user.cancelled' }],     // auto-cancel
});

Priority

await workflow.start(input, { priority: 10 }); // higher = picked up first

Child workflows

await ctx.startChildWorkflow('billing', { orderId });
// Parent waits for child. Child's output becomes the step's output.

Race semantics — what's closed, what isn't

Distributed primitives have specific guarantees. Honest summary:

| Primitive | Guarantee | Mechanism | |---|---|---| | idempotencyKey | Race-safe. Concurrent starts with the same key produce exactly one active run. | Partial unique index on idempotencyKey (filtered to non-terminal statuses) + E11000 catch in repository.create(). The losing insert returns the winning run instead of throwing. | | Scheduler claim (waiting → running, draft → running) | Race-safe. A given run is claimed by exactly one worker. | Atomic findOneAndUpdate with status guard in the filter. Plugin hooks fire on the claim. | | concurrency.limit | Best-effort, advisory. Steady-state correct; under bursts of concurrent starts/promotions the limit can briefly oversubscribe. | Count-then-create (and count-then-promote on slot release). Not wrapped in a transaction or counter doc. | | concurrency.throttle | Best-effort smoothing, not a strict distributed rate limiter. Sequential bursts get strictly-staggered slots (tail + windowMs/limit). Parallel concurrent starts can reserve the same slot — bounded by parallelism, not by limit. Use an external token-bucket / counter for strict guarantees. | | concurrency.debounce | Race-safe for the bump path. Each start is a single atomic findOneAndUpdate against (workflowId, concurrencyKey); the timer push is serialized by Mongo. The fall-through "no draft existed yet → create one" can race two creates if the first start in a quiet window arrives twice within the same millisecond — bounded to two drafts max, both at the same executionTime. |

If your workload is concurrency-sensitive in the strict sense — payment captures, ticket reservations, license-seat allocation — wrap the start call in your own gate (Redis lock, partial-unique resource index per PACKAGE_RULES §32) instead of relying on concurrency.limit. The streamline gate is fine for "don't overload the embedding API" but not for "exactly N seats."

Multi-tenant

import { createContainer, createWorkflow } from '@classytic/streamline';

const container = createContainer({
  repository: { multiTenant: { tenantField: 'context.tenantId', strict: true } },
});

const wf = createWorkflow('my-flow', { steps: { /* ... */ }, container });
// Every read/write/updateMany/deleteMany is tenant-scoped automatically.
// `bypassTenant: true` for admin ops; `staticTenantId` for single-tenant deployments.

Works with the MongoKit plugin surface (cache, audit, observability, field-filter, etc.). Every read and write — including atomic claims like the scheduler's findOneAndUpdate race — routes through inherited Repository<TDoc> methods, so plugin hooks fire on engine writes, not just user writes. Tenant scope applies to updateMany / deleteMany and to the atomic-claim path; bypassTenant: true is the explicit opt-out for _id-scoped admin operations.

Indexing

import { WorkflowRunModel } from '@classytic/streamline';

await WorkflowRunModel.collection.createIndexes([
  { key: { 'context.tenantId': 1, status: 1 } },
  { key: { 'context.url': 1, workflowId: 1 } },
  // TTL: auto-delete finished runs after 30 days
  { key: { updatedAt: 1 }, expireAfterSeconds: 30 * 86400,
    partialFilterExpression: { status: { $in: ['done', 'failed', 'cancelled'] } } },
]);

Retention — TTL, tenant compounds, stale sweeper

createContainer({ retention }) consolidates the three operational knobs hosts previously had to wire by hand: a TTL on terminal runs, the tenant-prefixed compound multi-tenant deployments need, and a sweep that terminates runs whose worker crashed.

import { createContainer } from '@classytic/streamline';

const container = createContainer({
  repository: { multiTenant: { tenantField: 'context.organizationId', strict: true } },
  retention: {
    terminalRunsTtlSeconds: 30 * 24 * 60 * 60,   // GC done/failed/cancelled after 30d
    staleHeartbeatThresholdMs: 30 * 60 * 1000,   // mark as failed after 30 min no heartbeat
    staleRunSweepIntervalMs: 60_000,             // sweep every minute
    staleRunAction: 'fail',                      // or 'cancel'
  },
});

// Deploy-time — call ONCE after mongoose connects (PACKAGE_RULES §32):
await mongoose.connect(uri);
await container.syncRetentionIndexes();          // idempotent, drop+rebuild on TTL change

// Graceful shutdown — sweeper timer is unref()'d, so this is optional:
container.dispose();

| Knob | Default | Effect | | ----------------------------- | ------- | ---------------------------------------------------------------------------------------------- | | terminalRunsTtlSeconds | off | TTL index {endedAt:1} with partialFilterExpression: {endedAt:{$exists:true}, status:{$in:[done,failed,cancelled]}}. | | multiTenantIndexes | true when repo is multi-tenant | Builds {<tenantField>:1, workflowId:1, createdAt:-1} so org-scoped lists hit a covering index. | | staleHeartbeatThresholdMs | off | Setting this auto-starts StaleRunSweeper. Pick well above the engine heartbeat AND any step's max execution time. | | staleRunSweepIntervalMs | 60_000 | Self-rescheduling setTimeout (no overlap), unref()'d. | | staleRunAction | 'fail'| 'fail' ⇒ status failed + error.code === 'stale_heartbeat'; 'cancel' ⇒ status cancelled. | | staleRunBatchSize | 100 | Cap per sweep cycle. | | maxStaleRecoveries | 5 | After N recoveries, sweeper marks the run failed with error.code === 'dead_lettered' instead of recycling it. |

The sweeper terminates via the repository's atomic markStaleAsFailed() CAS, so it can't race the engine's recoverStale() (which re-executes from the last heartbeat). Run both with different thresholds — recover at 5 min for transient crashes, terminate at 30 min as a backstop — and the longer threshold acts as a give-up signal so the scheduler can move on.

Both paths bump WorkflowRun.recoveryAttempts atomically. Once it hits maxStaleRecoveries, the next sweep cycle dead-letters the run with error.code === 'dead_lettered' instead of recycling it — closes the crash-recover-crash-recover loop bug for permanently broken runs. Hosts build their "stuck runs" dashboard off this field directly.

In-flight version pinning

Long-running workflows survive engine deploys. Every run carries definitionVersion (snapshotted from WorkflowDefinition.version at create-time); on resume the engine routes execution to the version-pinned engine via workflowRegistry.lookupVersion(workflowId, version).

// Deploy v1 — registers automatically.
const v1 = createWorkflow('billing', {
  version: '1.0.0',
  steps: { charge: chargeV1 },
});

const run = await v1.start({ orderId: '123' });   // run.definitionVersion === '1.0.0'

// Later: deploy v2 in the SAME process (rolling node).
const v2 = createWorkflow('billing', {
  version: '2.0.0',
  steps: { charge: chargeV2, refund: refundV2 },
});

await v1.engine.execute(run._id);   // routes to v1 — chargeV1 runs
await v2.start({ orderId: '456' }); // run.definitionVersion === '2.0.0'

When the original engine isn't registered (you stopped registering v1 because no v1 runs should exist), provide migrateRun:

createWorkflow('billing', {
  version: '2.0.0',
  steps: { charge: chargeV2 },
  migrateRun: async (run) => {
    if (run.definitionVersion === '1.0.0' && run.currentStepId === 'oldCharge') {
      return {
        currentStepId: 'charge',
        steps: run.steps.map((s) =>
          s.stepId === 'oldCharge' ? { ...s, stepId: 'charge' } : s,
        ),
      };
    }
    return null; // fall through — engine fails the run with VERSION_MISMATCH
  },
});

Runs created before v2.3.3 have no definitionVersion; the engine falls through to the active definition (back-compat preserved).

Query & cleanup

// Query directly via the Mongoose model:
await WorkflowRunModel.find({ workflowId: 'web-scraper', status: 'running' })
  .sort({ createdAt: -1 }).limit(50).lean();

// Or via the repository (v2.2 class primitives, tenant-scoped):
import { workflowRunRepository as repo } from '@classytic/streamline';
await repo.deleteMany({ status: { $in: ['done', 'failed'] },
                       updatedAt: { $lt: cutoff } });
await repo.updateMany({ status: 'waiting' }, { $set: { status: 'cancelled' } });

Webhooks

import { createHook, resumeHook } from '@classytic/streamline';

// In a step — MUST pass `hookToken` to ctx.wait so resumeHook can validate it.
// Without it, resumeHook fails closed (no token in the run = no resume).
const hook = createHook(ctx, 'awaiting-approval');
console.log(hook.path); // /hooks/<token>
await ctx.wait('Awaiting approval', { hookToken: hook.token });

// In your HTTP handler:
app.post('/hooks/:token', async (req, res) => {
  const { runId, run } = await resumeHook(req.params.token, req.body);
  res.json({ runId, status: run.status });
});

Observability

Legacy event bus (in-process)

import { globalEventBus } from '@classytic/streamline';

globalEventBus.on('workflow:started',   ({ runId }) => metrics.inc('wf.start', { runId }));
globalEventBus.on('workflow:completed', ({ runId }) => metrics.inc('wf.done', { runId }));
globalEventBus.on('workflow:failed',    ({ runId, data }) => alert({ runId, error: data.error }));
globalEventBus.on('engine:error',       ({ runId, error, context }) => log.error({ runId, context, error }));

Events: workflow:started|completed|failed|waiting|resumed|cancelled|recovered|retry|compensating, step:started|completed|failed|waiting|skipped|retry-scheduled|compensated, engine:error, scheduler:error|circuit-open.

Arc-compatible transport (cross-process)

import { createContainer, createWorkflow } from '@classytic/streamline';
import { RedisEventTransport } from '@classytic/arc/events';

const container = createContainer({
  eventTransport: new RedisEventTransport({ url: process.env.REDIS_URL }),
});

await container.eventTransport.subscribe('streamline:workflow.*', async (event) => {
  // { type: 'streamline:workflow.completed', payload: { runId }, meta: { id, timestamp, resourceId, userId, ... } }
  await auditLog.write(event);
});

const wf = createWorkflow('order', { steps: { /* ... */ }, container });

Canonical names: streamline:<resource>.<verb>. Without eventTransport, streamline uses an in-process InProcessStreamlineBus (same matchEventPattern as arc). The legacy globalEventBus keeps working unchanged — the transport is an additive bridge.

OpenTelemetry

Import from @classytic/streamline/telemetry; see package exports.

Logger

import { configureStreamlineLogger } from '@classytic/streamline';
configureStreamlineLogger({ enabled: false });          // silence
configureStreamlineLogger({ level: 'debug' });          // verbose
configureStreamlineLogger({ transport: pinoAdapter });  // pino/winston/etc.

Scheduler

Adaptive polling (10 s–5 min by load), circuit breaker, stale-running recovery, priority-ordered pickup, timezone-aware scheduling with DST handling (luxon).

workflow.engine.configure({ scheduler: { maxConcurrentExecutions: 10 } });
// Cap concurrent runs. All slots full → scheduler skips the poll cycle.

API summary

Workflow (from createWorkflow): start(input, opts?) · execute(runId) · resume(runId, payload?) · get(runId) · cancel(runId) · pause(runId) · rewindTo(runId, stepId) · waitFor(runId, opts?) · shutdown()

StepContext (inside handlers): ctx.set · ctx.getOutput · ctx.wait · ctx.waitFor · ctx.sleep · ctx.heartbeat · ctx.emit · ctx.log · ctx.checkpoint · ctx.getCheckpoint · ctx.scatter · ctx.goto · ctx.startChildWorkflow · ctx.signal (AbortSignal)

Public exports (main entry): createWorkflow, WorkflowEngine, createContainer, createWorkflowRepository, WorkflowRunModel, WorkflowRunRepository, CommonQueries, executeParallel, createHook / resumeHook, globalEventBus, WorkflowEventBus, createEventSink, tenantFilterPlugin / singleTenantPlugin, SchedulingService, TimezoneHandler. Update-doc builders: MongoUpdate, normalizeUpdate, runSet, runSetUnset, buildStepUpdateOps. Arc-compatible events: InProcessStreamlineBus, createEvent, bridgeBusToTransport, STREAMLINE_EVENTS, LEGACY_TO_CANONICAL, DomainEvent, EventTransport, EventHandler. Errors: WorkflowError, WorkflowNotFoundError, StepNotFoundError, StepTimeoutError, InvalidStateError, DataCorruptionError, MaxRetriesExceededError, NonRetriableError, ErrorCode.

Subpath entries: @classytic/streamline/fastify, @classytic/streamline/telemetry.

Error handling

WorkflowError (and every subclass) implements HttpError from @classytic/repo-core/errors. Three equivalent ways to identify an error — pick whichever matches your codebase:

import { WorkflowError, ErrorCode, ErrorCodeHierarchical } from '@classytic/streamline';

try { await workflow.resume(runId, payload); }
catch (err) {
  if (!(err instanceof WorkflowError)) throw err;

  // (A) Cleanest in HTTP layers — the status is right on the error.
  return res.status(err.status).json({ code: err.code, message: err.message, meta: err.meta });

  // (B) Hierarchical (HttpError-canonical) code — switch on `'workflow.X'` ids.
  switch (err.code) {
    case ErrorCodeHierarchical.WORKFLOW_NOT_FOUND: return res.status(404).end();
    case ErrorCodeHierarchical.INVALID_STATE:      return res.status(400).end();
    case ErrorCodeHierarchical.STEP_TIMEOUT:       return res.status(408).end();
    default: throw err;
  }

  // (C) Legacy SCREAMING_SNAKE — kept for backwards compat. New code should
  // prefer (A) or (B). The legacy value lives on `legacyCode` post-migration.
  switch (err.legacyCode) {
    case ErrorCode.WORKFLOW_NOT_FOUND: return res.status(404).end();
    // ...
  }
}

Status mapping (also exported as ERROR_STATUS_MAP):

| Code | Status | Hierarchical id | |---|---|---| | WORKFLOW_NOT_FOUND | 404 | workflow.not_found | | STEP_NOT_FOUND | 404 | workflow.step.not_found | | STEP_TIMEOUT | 408 | workflow.step.timeout | | WORKFLOW_CANCELLED / WORKFLOW_ALREADY_COMPLETED / EXECUTION_ABORTED | 409 | workflow.cancelled / | | INVALID_STATE / INVALID_TRANSITION / VALIDATION_ERROR | 400 | workflow.invalid_state / | | DATA_CORRUPTION / STEP_FAILED / MAX_RETRIES_EXCEEDED | 500 | workflow.data_corruption / |

Type-safe exports

import type {
  Workflow, WorkflowConfig, StepConfig, WaitForOptions,
  WorkflowRun, WorkflowEventName, EventPayloadMap,
} from '@classytic/streamline';

export const myWorkflow: Workflow<MyCtx, MyInput> = createWorkflow('my', { /* ... */ });

Examples

See docs/examples/: hello-world, wait, sleep, parallel, conditional, newsletter automation, AI pipeline.

Testing your workflows

npm test                # unit + integration (fast)
npm run test:e2e        # full scenarios (slow)
npm run test:all        # everything

See TESTING.md for tier conventions + helpers.

What's new in 2.2

  • Arc-compatible event transport — drop any arc transport (Memory / Redis / Kafka / BullMQ) into createContainer({ eventTransport }) and subscribe glob-style.
  • mongokit 3.11 / repo-core 0.2 surfaceupdateMany / deleteMany as class primitives, portable Update IR, UpdatePatch<TDoc> rename.
  • ctx.waitFor() resumes from globalEventBus (was broken in 2.1).
  • Tenant scope on bulk opstenantFilterPlugin now hooks updateMany / deleteMany. Security-relevant if you were on 2.1 with bulk writes.
  • Heartbeat backpressure — aborts a step after N consecutive heartbeat failures to prevent double execution via stale re-claim.
  • updateOne() guardrail — rejects mixed { $set, rawField } updates instead of letting Mongo silently drop keys.
  • Exhaustive LEGACY_TO_CANONICAL map — adding a new event without mapping it is a compile error.

See CHANGELOG.md for the full list.

License

MIT · Issues + PRs welcome at github.com/classytic/streamline