npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

@robinmalfait/event-source

v0.0.21

Published

A TypeScript library for building event-sourced applications in Node.js. This library provides the foundational building blocks for implementing CQRS (Command Query Responsibility Segregation) and Event Sourcing patterns.

Downloads

92

Readme

Event Source

A TypeScript library for building event-sourced applications in Node.js. This library provides the foundational building blocks for implementing CQRS (Command Query Responsibility Segregation) and Event Sourcing patterns.

Installation

npm install @robinmalfait/event-source
# or
pnpm add @robinmalfait/event-source

Core Concepts

Events

Events are immutable records of something that happened in your domain. They contain an aggregate ID, payload, metadata, and version information.

import { Event } from '@robinmalfait/event-source'

function accountOpened(id: string, owner: string) {
  return Event('ACCOUNT_OPENED', id, { owner })
}

function moneyDeposited(id: string, amount: number) {
  return Event('MONEY_DEPOSITED', id, { amount })
}

Commands

Commands represent an intent to perform an action. They carry a type and payload.

import { Command } from '@robinmalfait/event-source'

function openAccount(id: string, owner: string) {
  return Command('OPEN_ACCOUNT', { id, owner })
}

function depositMoney(accountId: string, amount: number) {
  return Command('DEPOSIT_MONEY', { accountId, amount })
}

Aggregates

Aggregates are domain entities that emit events and rebuild their state from event history. They extend the base Aggregate class and define apply handlers for each event type.

import { Aggregate, type ApplyEvents, abort } from '@robinmalfait/event-source'

class Account extends Aggregate {
  private owner: string = ''
  private balance: number = 0
  private closed: boolean = false

  // Static factory methods for creating new aggregates
  static open(id: string, owner: string) {
    return new Account().recordThat(accountOpened(id, owner))
  }

  // Instance methods for operations on existing aggregates
  deposit(amount: number) {
    if (this.closed) {
      abort('Cannot deposit to a closed account')
    }
    return this.recordThat(moneyDeposited(this.aggregateId, amount))
  }

  // Apply handlers rebuild state from events
  apply: ApplyEvents<
    typeof accountOpened | typeof moneyDeposited | typeof accountClosed
  > = {
    ACCOUNT_OPENED: (event) => {
      this.owner = event.payload.owner
    },
    MONEY_DEPOSITED: (event) => {
      this.balance += event.payload.amount
    },
    ACCOUNT_CLOSED: () => {
      this.closed = true
    },
  }
}

Setting Up EventSource

The EventSource class is the central coordinator that connects everything together. Use the builder pattern to configure it:

import { EventSource } from '@robinmalfait/event-source'

const eventSource = EventSource.builder(myEventStore)
  .addCommandHandler('OPEN_ACCOUNT', openAccountHandler)
  .addCommandHandler('DEPOSIT_MONEY', depositMoneyHandler)
  .addProjector(new BalanceProjector())
  .addEventHandler(sendNotificationHandler)
  .metadata(() => ({ userId: getCurrentUserId() }))
  .build()

// Dispatch commands
await eventSource.dispatch(openAccount('acc-123', 'John Doe'))
await eventSource.dispatch(depositMoney('acc-123', 1000))

Command Handlers

Command handlers receive a command and the event source instance, and return an aggregate to persist:

import type { CommandHandler } from '@robinmalfait/event-source'

const openAccountHandler: CommandHandler<
  ReturnType<typeof openAccount>
> = async (command, es) => {
  return es.persist(Account.open(command.payload.id, command.payload.owner))
}

const depositMoneyHandler: CommandHandler<
  ReturnType<typeof depositMoney>
> = async (command, es) => {
  let account = await es.load(Account, command.payload.accountId)
  return es.persist(account.deposit(command.payload.amount))
}

EventStore Interface

To use the library, implement the EventStore interface with your preferred storage:

import type { EventStore, EventType } from '@robinmalfait/event-source'

class MyEventStore implements EventStore {
  async persist(events: EventType[]): Promise<void> {
    // Store events in your database
  }

  async load(aggregateId: string): Promise<EventType[]> {
    // Load all events for an aggregate
  }

  async loadEvents(): Promise<EventType[]> {
    // Load all events (for rebuilding projections)
  }
}

See the examples/mysql-event-store directory for a MySQL implementation using Knex.js.

Projectors

Projectors build read models from events. They process events sequentially and maintain derived state.

Lifecycle:

  • Normal operation: When events are persisted via es.persist(), each projector's apply handlers are called for the new events only.
  • Full rebuild: When es.resetProjections() is called, reset() is called first (to clear existing state), then all events are replayed through apply.

This means projectors work well in serverless environments - projections are persisted to your database and don't need rebuilding on every cold start.

import {
  Projector,
  type ApplyProjectorEvents,
} from '@robinmalfait/event-source'

class BalanceProjector extends Projector {
  name = 'balance-projector'
  private balances = new Map<string, number>()

  async reset() {
    // Called only during resetProjections() - clear state before full rebuild
    this.balances.clear()
  }

  apply: ApplyProjectorEvents<typeof accountOpened | typeof moneyDeposited> = {
    ACCOUNT_OPENED: (event, es) => {
      this.balances.set(event.aggregateId, 0)
    },
    MONEY_DEPOSITED: (event, es) => {
      let current = this.balances.get(event.aggregateId) ?? 0
      this.balances.set(event.aggregateId, current + event.payload.amount)
    },
  }

  getBalance(accountId: string) {
    return this.balances.get(accountId) ?? 0
  }
}

Accessing Aggregate State in Projectors

Each apply handler receives the EventSource instance as the second argument. This allows projectors to reconstruct aggregate state when needed — useful when an event doesn't contain all the data required for the projection.

class TransactionHistoryProjector extends Projector {
  name = 'transaction-history-projector'

  apply: ApplyProjectorEvents<typeof accountClosed> = {
    ACCOUNT_CLOSED: async (event, es) => {
      // Reconstruct the aggregate to access its full state
      let account = await es.load(new Account(), event.aggregateId)

      await db.transactionHistory.insert({
        accountId: event.aggregateId,
        type: 'CLOSED',
        finalBalance: account.balance, // Data not in the event
        closedAt: event.recordedAt,
      })
    },
  }
}

Testing

The library provides BDD-style testing utilities with a given/when/then pattern:

import { createTestEventStore } from '@robinmalfait/event-source'

describe('deposit money', () => {
  let { given, when, then, ___ } = createTestEventStore({
    DEPOSIT_MONEY: depositMoneyHandler,
  })

  it('should deposit money to an open account', async () => {
    await given([accountOpened('acc-123', 'John Doe')])

    await when(depositMoney('acc-123', 500))

    await then([moneyDeposited('acc-123', 500)])
  })

  it('should fail when depositing to a closed account', async () => {
    await given([
      accountOpened('acc-123', 'John Doe'),
      accountClosed('acc-123'),
    ])

    await when(depositMoney('acc-123', 500))

    await then(new Error('Cannot deposit to a closed account'))
  })

  it('should use placeholders for values we do not care about', async () => {
    await given([accountOpened('acc-123', 'John Doe')])

    await when(depositMoney('acc-123', 500))

    // Use ___ as a placeholder for any value
    await then([Event('MONEY_DEPOSITED', ___, { amount: 500 })])
  })
})

Utilities

abort

Throw errors with clean stack traces and custom attributes:

import { abort } from '@robinmalfait/event-source'

if (balance < amount) {
  abort('Insufficient funds', { balance, requested: amount })
}

Type Utilities

ApplyEvents / ApplyProjectorEvents

Type-safe handlers for applying events to aggregates and projectors:

import type {
  ApplyEvents,
  ApplyProjectorEvents,
} from '@robinmalfait/event-source'

// For Aggregates - handlers receive only the event
apply: ApplyEvents<typeof accountOpened | typeof moneyDeposited> = {
  ACCOUNT_OPENED: (event) => {
    /* ... */
  },
  MONEY_DEPOSITED: (event) => {
    /* ... */
  },
}

// For Projectors - handlers receive the event and EventSource
apply: ApplyProjectorEvents<typeof accountOpened | typeof moneyDeposited> = {
  ACCOUNT_OPENED: (event, es) => {
    /* ... */
  },
  MONEY_DEPOSITED: (event, es) => {
    /* ... */
  },
}

PayloadOf / TypeOf

Extract types from events and commands:

import type { PayloadOf, TypeOf } from '@robinmalfait/event-source'

type AccountOpenedPayload = PayloadOf<ReturnType<typeof accountOpened>>
// { owner: string }

type AccountOpenedType = TypeOf<ReturnType<typeof accountOpened>>
// 'ACCOUNT_OPENED'

Examples

See the examples/bank directory for a complete bank account domain implementation demonstrating:

  • Domain events and commands
  • Account aggregate with business rules
  • Command handlers
  • Test cases using the given/when/then pattern

Local Development

Prerequisites

  • Node.js 24+ (see .nvmrc)
  • pnpm

Commands

| Command | Description | | ------------- | ----------------------------------------------------- | | pnpm start | Build in watch mode for development | | pnpm build | Production build (ESM + CJS + TypeScript definitions) | | pnpm test | Run all tests | | pnpm tdd | Run tests in watch mode | | pnpm format | Format code with Prettier |

License

MIT