npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

@schemeless/event-store

v5.0.1

Published

The runtime core that orchestrates event flow execution, persistence, observer notification, and side-effect retries. It glues your `EventFlow` definitions together with an `IEventStoreRepo` implementation and emits every lifecycle outcome through an obse

Readme

@schemeless/event-store

The runtime core that orchestrates event flow execution, persistence, observer notification, and side-effect retries. It glues your EventFlow definitions together with an IEventStoreRepo implementation and emits every lifecycle outcome through an observable stream.

Installation

yarn add @schemeless/event-store

You will also need one of the persistence adapters (for example @schemeless/event-store-adapter-typeorm or @schemeless/event-store-adapter-dynamodb).

Define event flows

An event flow describes how a domain event is received, validated, applied, and enriched with consequent events or side effects. Each handler is optional, so you can start with a bare minimum and grow behaviour over time.

import { EventFlow } from '@schemeless/event-store';

export const userRegisteredFlow: EventFlow = {
  domain: 'user',
  type: 'registered',
  validate: async (event) => {
    if (!event.payload.email) {
      throw new Error('email is required');
    }
  },
  apply: async (event) => {
    // Update projections, write to read models, etc.
  },
};

You can attach additional hooks such as sideEffect (retryable asynchronous work) or createConsequentEvents (fan out new root events) by setting the corresponding properties on the flow object.

Creating consequent events

When an event spawns additional events via createConsequentEvents, the framework automatically maintains the causal relationship:

export const orderPlacedFlow: EventFlow = {
  domain: 'order',
  type: 'placed',
  createConsequentEvents: (parentEvent) => [
    {
      domain: 'account',
      type: 'transfer',
      payload: {
        fromAccountId: parentEvent.payload.buyerAccountId,
        toAccountId: parentEvent.payload.sellerAccountId,
        amount: parentEvent.payload.total,
      },
      // No need to set causationId or correlationId
      // The framework handles this automatically:
      // - causationId = parentEvent.id
      // - correlationId = parentEvent.correlationId
    },
  ],
};

This ensures all derived events share the same correlationId (for grouping) while each maintains a causationId pointer to its immediate parent (for chain traversal).

Aggregate Event Flows

If a flow manages aggregate state, declare it with AggregateEventFlow. The framework will load the current state automatically before validate and apply, and it will pass the computed state through replay as well.

import type { AggregateEventFlow, AggregateEventObserver } from '@schemeless/event-store';

type StockPayload = { amount: number };
type StockState = { count: number };

export const stockFlow: AggregateEventFlow<StockPayload, StockState> = {
  domain: 'stock',
  type: 'updated',
  aggregate: {
    initialState: { count: 0 },
    reducer: (state, event) => ({ count: state.count + event.payload.amount }),
  },
  validate: (event, state) => {
    if (state.count < 0) {
      throw new Error('stock cannot go below zero');
    }
  },
  apply: (_event, state) => state,
};

export const stockObserver: AggregateEventObserver<StockPayload, StockState> = {
  aggregate: true,
  filters: [{ domain: 'stock', type: 'updated' }],
  priority: 0,
  apply: async (event, state) => {
    console.log(event.id, state.count);
  },
};
  • aggregate.reducer is the pure fold function used by getAggregate().
  • validate(event, state) receives the current aggregate state.
  • apply(event, state) returns the next state and must not perform side effects.
  • Aggregate observers opt in with aggregate: true; regular observers still receive (event) only.

Use the existing getAggregate() helper when you need to read aggregate state manually from application code. Inside aggregate flows, the framework does that work for you.

Optional Runtime Validation (Recommended)

@schemeless/event-store does not require any specific validation library. The framework only calls your validate hook, so you can choose the tool that fits your stack.

For TypeScript-first projects, we recommend Zod because it gives clear runtime errors and strong type inference.

import { z } from 'zod';
import type { EventFlow } from '@schemeless/event-store';

const userRegisteredPayload = z.object({
  id: z.string().uuid(),
  email: z.string().email(),
  name: z.string().min(1),
});

type UserRegisteredPayload = z.infer<typeof userRegisteredPayload>;

export const userRegisteredFlow: EventFlow<UserRegisteredPayload> = {
  domain: 'user',
  type: 'registered',
  validate: async (event) => {
    const result = userRegisteredPayload.safeParse(event.payload);
    if (!result.success) {
      throw new Error(result.error.issues.map((i) => `${i.path.join('.')}: ${i.message}`).join('; '));
    }
  },
  apply: async (event) => {
    // event.payload is now validated at runtime
  },
};

Alternative options:

  • Valibot: similar DX to Zod, smaller footprint.
  • TypeBox + Ajv: best when you need JSON Schema output and ecosystem tooling.
  • class-validator: useful in decorator/class-based DTO stacks.

Recommendation:

  • Keep validation in event flows (validate / preApply).
  • Keep the core event-store package library-agnostic.

Build the store

makeEventStore wires your repository and flows together, returning a submit helper, a replay function, and lifecycle utilities.

import { makeEventStore } from '@schemeless/event-store';
import { EventStoreRepo as TypeOrmRepo } from '@schemeless/event-store-adapter-typeorm';

const repo = new TypeOrmRepo({ type: 'sqlite', database: ':memory:' });
const buildStore = makeEventStore(repo);

const eventStore = await buildStore([userRegisteredFlow]);

const [created] = await eventStore.submit(userRegisteredFlow, {
  payload: { id: '123', email: '[email protected]' },
});

// `submit` resolves after persistence and observer execution.
// Side effects run asynchronously; call `shutdown()` for a full flush before teardown.

Performance & Concurrency

You can configure the concurrency level for the internal queues to optimize throughput. By default, all queues run sequentially (concurrent: 1) to guarantee strict ordering.

Configurable Concurrency

Pass EventStoreOptions to makeEventStore to enable parallel processing:

const eventStore = await makeEventStore(repo, {
  mainQueueConcurrent: 5,        // 5 parallel partition workers
  sideEffectQueueConcurrent: 10, // 10 parallel side-effect workers
})([userRegisteredFlow]);

Note: mainQueueConcurrent controls the number of partition workers. Events routed to the same partition (via getShardKey) are always processed sequentially; events on different partitions run in parallel. Set getShardKey on your flows to control routing.

Key-Based Partitioning (Sharding)

To solve race conditions with global parallelism, we support Key-Based Partitioning. This ensures events for the same entity (e.g., userId) are processed sequentially, while different entities are processed in parallel.

  1. Define Shard Key: Implement getShardKey in your EventFlow:

    const UserFlow: EventFlow = {
      // ...
      getShardKey: (event) => event.payload.userId,
    };
  2. Enable Concurrency:

    const store = await makeEventStore(..., {
      mainQueueConcurrent: 10, // 10 parallel shards
    });

👉 Read the Full Migration Guide for detailed implementation steps.

Snapshot Support (Performance)

When Do You NEED This?

| Your Use Case | Solution | Need getAggregate? | | ------------------------------------------------------------------------- | -------------------------- | -------------------- | | Prevent concurrent writes to same stream | expectedSequence (OCC) | ❌ No | | Update projections/read models | apply hook (incremental) | ❌ No | | Validate based on current aggregate state (e.g., "balance >= amount") | getAggregate | ✅ Yes |

OCC (Optimistic Concurrency Control) is O(1) – it only checks a version number, NOT by replaying events. If your validation logic only needs the event payload itself, you don't need snapshots.

Snapshots are for reconstructing aggregate state – When your validate or preApply hook needs the current state of an aggregate (calculated from all past events), getAggregate helps. Snapshots optimize this by caching intermediate state.

Example: When You Need getAggregate

// In your EventFlow's validate hook:
validate: async (event) => {
  // You need the current account balance to validate
  const { state } = await eventStore.getAggregate('account', event.payload.accountId, accountReducer, { balance: 0 });

  if (state.balance < event.payload.amount) {
    throw new Error('Insufficient funds');
  }
};

Using getAggregate

const { state, sequence } = await eventStore.getAggregate(
  'user', // domain
  '123', // identifier
  userReducer, // your reducer function
  { balance: 0 } // initial state
);

The helper automatically:

  1. Tries to load a snapshot (if adapter supports it).
  2. Fetches only events after the snapshot.
  3. Reduces them to get the final state.

Adapter Requirements

[!WARNING] As of v3.0.0, most adapters do NOT implement these methods yet. Check your adapter's documentation.

To use getAggregate, your IEventStoreRepo adapter must implement:

  1. getStreamEvents(domain, identifier, fromSequence) (Required)
  2. getSnapshot / saveSnapshot (Optional, for performance)

Capability Detection

eventStore now exposes capabilities.aggregate so callers can branch safely before invoking getAggregate:

if (!eventStore.capabilities.aggregate) {
  // Fallback to projection/CQRS read model validation
}

Adapters can also declare support explicitly with repo.capabilities.aggregate. If omitted, support is inferred from the presence of repo.getStreamEvents.

Adapter Capability Matrix (v3.x)

| Adapter | capabilities.aggregate | getAggregate Support | | ------------------------------------------ | -------------------------- | -------------------------------------------------------------------- | | @schemeless/event-store-adapter-dynamodb | false (declared) | ❌ No | | Other adapters | inferred / adapter-defined | Check adapter docs or eventStore.capabilities.aggregate at runtime |

EventStoreOptions Reference

| property | type | default | description | | --------------------------- | -------- | ------- | --------------------------------------------------------------------------------------------------------------------- | | mainQueueConcurrent | number | 1 | Number of partition workers for main event processing. Events sharing the same shard key are always sequential; events on different partitions run in parallel. | | sideEffectQueueConcurrent | number | 1 | Number of partition workers for side-effect execution. Safe to increase; side effects are retryable and independent of the main pipeline. |

Fire-and-Forget Observers

For observers that perform non-critical, time-consuming tasks (like sending analytics or notifications) where you don't want to block the main event processing flow, use fireAndForget: true.

const analyticsObserver: SuccessEventObserver = {
  filters: [{ domain: 'user', type: 'registered' }],
  priority: 1,
  fireAndForget: true, // Run asynchronously, do not wait
  apply: async (event) => {
    await sendAnalytics(event);
  },
};
  • Non-blocking: submit() resolves without waiting for this observer.
  • Error Isolation: If this observer throws an error, it is logged but does not fail the main event flow.

The returned object exposes:

  • submit(flow, input) – validates, applies, persists, runs side effects, and notifies observers. Resolves with all created events.
  • on('processed', handler) – subscribe to processed event notifications without RxJS. Returns an unsubscribe function.
  • replay – streams stored events back through the flows and success observers, enabling projection rebuilds.
  • shutdown(timeout?) – drains all in-flight work and closes the repository connection.

Observers and replay

Register success observers when constructing the store to react to committed events. Observers run inline after each event is persisted, in priority order. During replay, the same lifecycle is used for upcast / validate / preApply / apply, then observers are called with the resulting event and aggregate state.

const logObserver = {
  filters: [{ domain: 'user', type: 'registered' }],
  priority: 100,
  apply: async (event) => {
    console.log('User registered:', event.payload.email);
  },
};

const eventStore = await buildStore([userRegisteredFlow], [logObserver]);
await eventStore.replay();

replay batches historical records and re-applies each event in chronological order. Observers are called inline after each event, so read models stay consistent after deployments or migrations. Replay uses the same lifecycle as live processing: upcast, validate, preApply, apply, then observers. Aggregate observers marked with aggregate: true receive (event, state) during replay; regular observers receive only (event).

Graceful Shutdown

To prevent data loss or resource leaks during application shutdown (or testing), use the shutdown method. This stops accepting new events, drains existing queues, and closes repository connections.

// 30 second timeout
await eventStore.shutdown(30000);

This is particularly useful for:

  • Jest Tests: Preventing "Store is not a constructor" or "file after teardown" errors.
  • Kubernetes: Handling SIGTERM signals gracefully.
  • Updates: Ensuring queues are empty before deploying new versions.

Export and Import Events

The core package ships built-in utilities to snapshot the entire repository into a plain array and restore it later. This is adapter-agnostic — it works with every IEventStoreRepo implementation.

import { exportEventsToArray, importEventsFromArray, createSnapshot, parseSnapshot } from '@schemeless/event-store';

// ── Export ──────────────────────────────────────────────────────────────────
const events = await exportEventsToArray(eventStore.eventStoreRepo, {
  pageSize: 200, // optional, default 200
  onProgress: (n) => console.log(`exported ${n}`),
});
const json = JSON.stringify(createSnapshot(events));

// ── Import (restore) ────────────────────────────────────────────────────────
const { events } = parseSnapshot(json); // Date fields automatically restored
await importEventsFromArray(eventStore.eventStoreRepo, events, {
  replace: true, // call resetStore() before writing (optional)
  batchSize: 100, // optional, default 100
  onProgress: (n) => console.log(`imported ${n}`),
});

React Native / Expo integration example:

import * as FileSystem from 'expo-file-system';
import * as Sharing from 'expo-sharing';

// Backup
const json = JSON.stringify(createSnapshot(await exportEventsToArray(store.eventStoreRepo)));
const path = FileSystem.documentDirectory + 'backup.json';
await FileSystem.writeAsStringAsync(path, json);
await Sharing.shareAsync(path, { mimeType: 'application/json' });

// Restore
const json = await FileSystem.readAsStringAsync(backupPath);
await importEventsFromArray(store.eventStoreRepo, parseSnapshot(json).events, { replace: true });

| Function | Description | | -------------------------------------------- | ---------------------------------------------------- | | exportEventsToArray(repo, opts?) | Pages through getAllEventsIEventStoreEntity[] | | importEventsFromArray(repo, events, opts?) | Writes in batches; coerces date strings to Date | | createSnapshot(events) | Adds exportedAt + count metadata | | parseSnapshot(json) | Parses JSON; restores created as Date |

Cascading Revert

Sometimes you need to undo an event and all its derived effects. The framework provides a built-in revert mechanism that traverses the causal chain and generates compensating events.

Define the compensate hook

Each event flow can define a compensate hook that returns one or more compensating events:

export const orderPlacedFlow: EventFlow = {
  domain: 'order',
  type: 'placed',
  compensate: (originalEvent) => ({
    domain: 'order',
    type: 'voided',
    payload: {
      orderId: originalEvent.payload.orderId,
      reason: 'Reverted via framework',
    },
  }),
};

The framework adds metadata to each compensating event (meta.isCompensating = true, meta.compensatesEventId).

Check if an event can be reverted

Before attempting a revert, check if the event tree is fully covered:

const result = await eventStore.canRevert(eventId);

if (!result.canRevert) {
  console.log('Cannot revert:', result.reason);
  console.log('Missing hooks:', result.missingCompensateEvents);
}

Strict mode: If any event in the tree (including descendants) lacks a compensate hook, the revert will fail. This ensures data consistency.

Preview a revert

See what would be affected without making changes:

const preview = await eventStore.previewRevert(eventId);

console.log('Root event:', preview.rootEvent.id);
console.log('Descendants:', preview.descendantEvents.length);
console.log('Total affected:', preview.totalEventCount);

Execute a revert

Revert the event and all its descendants:

const result = await eventStore.revert(eventId);

console.log('Reverted:', result.revertedEventId);
console.log('Compensation events:', result.compensatingEvents);
console.log('Child results:', result.childResults); // Nested results

The revert processes events depth-first, starting from the leaves and working up to the root. This ensures that dependent events are compensated before their parents.

Root events only: Only events without a causationId (root events) can be reverted. Attempting to revert intermediate events will throw an error.

Schema Versioning & Upcasting

To support evolving event schemas over time (e.g., changing a price field from a number to an object), you can define a schemaVersion and an upcast hook in your EventFlow.

How it works

  1. Tagging: New events are automatically tagged with the flow's current schemaVersion in event.meta.schemaVersion.
  2. Upcasting: When an older event (lower version) is processed (during receive or replay), the upcast hook is called to migrate it to the current structure.
  3. Pipeline: Upcasting happens before validation and pre-application, so your validate and apply logic only ever needs to handle the current schema version.

Example

interface OrderPlacedPayloadV1 {
  price: number;
}

interface OrderPlacedPayloadV2 {
  price: { amount: number; currency: string };
}

// The flow definition uses the LATEST payload type
export const orderPlacedFlow: EventFlow<OrderPlacedPayloadV2> = {
  domain: 'order',
  type: 'placed',

  // 1. Set the current version
  schemaVersion: 2,

  // 2. Define how to migrate from older versions
  upcast: (event, fromVersion) => {
    if (fromVersion < 2) {
      // Migrate V1 -> V2
      return {
        ...event,
        payload: {
          ...event.payload,
          price: { amount: event.payload.price, currency: 'USD' }, // Default to USD
        },
      };
    }
    // Return void if no changes needed
  },

  // 3. Validation and Application logic only sees V2
  validate: async (event) => {
    // event.payload.price is guaranteed to be { amount, currency }
    if (event.payload.price.amount < 0) throw new Error('Negative price');
  },
};