npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2025 – Pkg Stats / Ryan Hefner

@codeforbreakfast/eventsourcing-aggregates

v0.9.3

Published

Type-safe aggregate root patterns for event sourcing with Effect - Build bulletproof domain models with functional composition and immutable state management

Readme

@codeforbreakfast/eventsourcing-aggregates

Type-safe aggregate root patterns for event sourcing with Effect integration. This package provides functional patterns for building aggregate roots that handle commands, emit events, and maintain consistency in your event-sourced applications.

Installation

npm install @codeforbreakfast/eventsourcing-aggregates effect
bun add @codeforbreakfast/eventsourcing-aggregates effect

Key Features

  • Functional Aggregate Patterns: Composable functions for building domain aggregates with Effect
  • Type-Safe Event Sourcing: Schema-validated events with automatic state reconstruction
  • Domain-Specific Command Processing: Generic command layer that maintains strong typing for each domain's events
  • Command Context Tracking: Built-in command tracking with user attribution
  • Event Metadata: Automatic timestamping and originator tracking for all events
  • Effect Integration: Built on Effect for composable, functional aggregate operations
  • Testing Support: Test-friendly APIs for unit testing aggregate behavior
  • Serialization Boundary Pattern: Events stay strongly-typed throughout domain layer, generic only at storage/wire boundaries

Core Exports

import {
  // Main aggregate creation function
  makeAggregateRoot,

  // Aggregate event store factory
  defineAggregateEventStore,

  // Core interfaces and types
  AggregateState,
  EventMetadata,

  // Command processing
  CommandHandler,
  CommandRouter,
  createCommandProcessingService,
  CommandProcessingService,
  CommandProcessingError,
  CommandRoutingError,

  // Command context services
  CommandContext,
  CommandContextService,

  // Helper functions
  eventMetadata,
  eventSchema,
} from '@codeforbreakfast/eventsourcing-aggregates';

Quick Start

import { Effect, Schema, Option, Chunk, Context, pipe } from 'effect';
import {
  makeAggregateRoot,
  AggregateState,
  CommandContext,
  CommandContextTest,
  eventSchema,
  eventMetadata,
  defineAggregateEventStore,
  type EventRecord,
} from '@codeforbreakfast/eventsourcing-aggregates';
import type { EventStore } from '@codeforbreakfast/eventsourcing-store';
import { makeInMemoryEventStore } from '@codeforbreakfast/eventsourcing-store-inmemory';

// 1. Define your aggregate ID schema
const UserId = Schema.String.pipe(Schema.brand('UserId'));
type UserId = typeof UserId.Type;

// 2. Define your initiator ID schema (who executes commands)
const InitiatorId = Schema.String.pipe(Schema.brand('InitiatorId'));
type InitiatorId = typeof InitiatorId.Type;

// 3. Define your domain events using eventSchema
const UserRegisteredEvent = eventSchema(Schema.Literal('UserRegistered'), {
  email: Schema.String,
  name: Schema.String,
});

const UserEmailUpdatedEvent = eventSchema(Schema.Literal('UserEmailUpdated'), {
  oldEmail: Schema.String,
  newEmail: Schema.String,
});

const UserEvent = Schema.Union(UserRegisteredEvent, UserEmailUpdatedEvent);
type UserEvent = typeof UserEvent.Type;

// 4. Define your aggregate state
// Note: Aggregate state doesn't need to store its own ID - that's implicit from the stream
interface UserState {
  email: string;
  name: string;
  isActive: boolean;
}

// 5. Create the event application function
// Events don't contain aggregate IDs - the ID comes from the event stream
const applyUserEvent: (
  state: Option.Option<UserState>
) => (event: UserEvent) => Effect.Effect<UserState, Error, never> = (state) => (event) =>
  pipe(
    event.type === 'UserRegistered'
      ? Effect.succeed({
          email: event.data.email,
          name: event.data.name,
          isActive: true,
        })
      : event.type === 'UserEmailUpdated'
        ? pipe(
            state,
            Option.match({
              onNone: () => Effect.fail(new Error('Cannot update email: user does not exist')),
              onSome: (currentState) =>
                Effect.succeed({
                  ...currentState,
                  email: event.data.newEmail,
                }),
            })
          )
        : Effect.fail(new Error(`Unknown event type: ${(event as any).type}`))
  );

// 6. Create event store tag using factory
const UserEventStore = defineAggregateEventStore<UserEvent, InitiatorId>('User');

// 7. Define command handlers that return functions taking state
// Commands return bare events - framework adds metadata automatically during commit
const registerUser = (email: string, name: string) => (currentState: AggregateState<UserState>) =>
  pipe(
    currentState.data,
    Option.match({
      onSome: () => Effect.fail(new Error('User already exists')),
      onNone: () =>
        Effect.succeed([
          {
            type: 'UserRegistered' as const,
            data: { email, name },
          } satisfies UserEvent,
        ]),
    })
  );

const updateUserEmail = (newEmail: string) => (currentState: AggregateState<UserState>) =>
  pipe(
    currentState.data,
    Option.match({
      onNone: () => Effect.fail(new Error('User not found')),
      onSome: (state) =>
        Effect.succeed([
          {
            type: 'UserEmailUpdated' as const,
            data: {
              oldEmail: state.email,
              newEmail,
            },
          } satisfies UserEvent,
        ]),
    })
  );

// 8. Create the aggregate root
const UserAggregate = makeAggregateRoot(
  UserId,
  InitiatorId,
  applyUserEvent as any,
  UserEventStore,
  {
    registerUser,
    updateUserEmail,
  }
);

// 9. Usage example
const userId = 'user-123' as UserId; // The aggregate ID is only used for loading/committing

const program = pipe(
  // Load an existing user (returns empty state if not found)
  UserAggregate.load(userId),
  Effect.tap((state) => Effect.log(`Loaded user state: ${JSON.stringify(state)}`)),
  Effect.flatMap((existingUser) => {
    // Create a new user if one doesn't exist
    if (Option.isNone(existingUser.data)) {
      return pipe(
        // Generate registration events using the command handler
        UserAggregate.commands.registerUser('[email protected]', 'John Doe')(existingUser as any),
        Effect.flatMap((events: any) =>
          // Commit events to store with the aggregate ID
          UserAggregate.commit({
            id: userId,
            eventNumber: existingUser.nextEventNumber,
            events,
          })
        ),
        Effect.flatMap(() => UserAggregate.load(userId)),
        Effect.tap((state) => Effect.log(`User after registration: ${JSON.stringify(state)}`))
      );
    }
    return Effect.succeed(existingUser);
  }),
  Effect.flatMap((userState) =>
    // Update the user's email
    pipe(
      UserAggregate.commands.updateUserEmail('[email protected]')(userState as any),
      Effect.flatMap((events: any) =>
        UserAggregate.commit({
          id: userId,
          eventNumber: userState.nextEventNumber,
          events,
        })
      ),
      Effect.flatMap(() => UserAggregate.load(userId)),
      Effect.tap((state) => Effect.log(`User after email update: ${JSON.stringify(state)}`))
    )
  )
);

// Run with dependencies
(async () => {
  const eventStoreLayer: any = makeInMemoryEventStore(UserEventStore as any);
  const contextLayer: any = CommandContextTest<string>('system-user');

  const runnable: Effect.Effect<any, any, never> = pipe(
    program,
    Effect.provide(eventStoreLayer),
    Effect.provide(contextLayer)
  ) as any;

  await Effect.runPromise(runnable);
})();

Core Concepts

AggregateState

Represents the current state of an aggregate at a point in time:

import { Option } from 'effect';

interface AggregateState<TData> {
  readonly nextEventNumber: number;
  readonly data: Option.Option<TData>;
}

makeAggregateRoot

The main function for creating aggregate roots with event sourcing capabilities:

import { Schema } from 'effect';
import { makeAggregateRoot } from '@codeforbreakfast/eventsourcing-aggregates';

declare const IdSchema: any;
declare const applyEventFunction: any;
declare const EventStoreTag: any;
declare const commandHandlers: any;

const MyAggregate = makeAggregateRoot(
  IdSchema,
  Schema.String,
  applyEventFunction,
  EventStoreTag,
  commandHandlers
);

Event Schema Creation

Use eventSchema to create properly structured domain events:

import { Schema } from 'effect';
import { eventSchema } from '@codeforbreakfast/eventsourcing-aggregates';

const MyEvent = eventSchema(Schema.Literal('MyEventType'), {
  field1: Schema.String,
  field2: Schema.Number,
});

Command Context

Track command execution metadata:

import { Effect, pipe } from 'effect';
import { CommandContext, CommandContextTest } from '@codeforbreakfast/eventsourcing-aggregates';

const myCommand = pipe(
  CommandContext<string>(),
  Effect.flatMap((context) => context.getInitiator),
  Effect.map((initiatorId) => initiatorId)
);

const testLayer = CommandContextTest<string>('test-user-id');

Event Metadata Generation

Automatically generate event metadata with timestamps and originator:

import { Effect, pipe } from 'effect';
import { eventMetadata } from '@codeforbreakfast/eventsourcing-aggregates';

const createEvent = pipe(
  eventMetadata<string>(),
  Effect.map((metadata) => ({
    type: 'SomethingHappened' as const,
    metadata,
    data: {},
  }))
);

Advanced Patterns

Business Rule Validation

import { Effect, Option, Chunk, pipe } from 'effect';
import { AggregateState, eventMetadata } from '@codeforbreakfast/eventsourcing-aggregates';

interface BankAccountState {
  balance: number;
}

const transferMoney =
  (toAccountId: string, amount: number) => (currentState: AggregateState<BankAccountState>) =>
    pipe(
      currentState.data,
      Option.match({
        onNone: () => Effect.fail(new Error('Account not found')),
        onSome: (account) => {
          if (account.balance < amount) {
            return Effect.fail(new Error('Insufficient funds'));
          }
          if (amount <= 0) {
            return Effect.fail(new Error('Transfer amount must be positive'));
          }

          return pipe(
            eventMetadata<string>(),
            Effect.map((metadata) =>
              Chunk.of({
                type: 'MoneyTransferred' as const,
                metadata,
                data: {
                  toAccountId,
                  amount,
                },
              })
            )
          );
        },
      })
    );

Complex Event Application

import { Effect, Option, pipe } from 'effect';

declare const BankAccountEvent: any;
type BankAccountEvent = typeof BankAccountEvent;

interface BankAccountState {
  balance: number;
  isActive: boolean;
  transactions: Array<{
    type: string;
    amount: number;
    timestamp: Date;
  }>;
}

const applyBankAccountEvent =
  (state: Option.Option<BankAccountState>) => (event: BankAccountEvent) => {
    switch (event.type) {
      case 'AccountOpened':
        return pipe(
          state,
          Option.match({
            onSome: () => Effect.fail(new Error('Account already exists')),
            onNone: () =>
              Effect.succeed({
                balance: event.data.initialDeposit,
                isActive: true,
                transactions: [],
              }),
          })
        );

      case 'MoneyTransferred':
        return pipe(
          state,
          Option.match({
            onNone: () => Effect.fail(new Error('Account does not exist')),
            onSome: (currentState) =>
              Effect.succeed({
                ...currentState,
                balance: currentState.balance - event.data.amount,
                transactions: [
                  ...currentState.transactions,
                  {
                    type: 'transfer' as const,
                    amount: event.data.amount,
                    timestamp: event.metadata.occurredAt,
                  },
                ],
              }),
          })
        );

      default:
        return Effect.fail(new Error(`Unknown event: ${(event as any).type}`));
    }
  };

Testing Aggregates

Test aggregate behavior in isolation:

import { describe, it, expect } from 'bun:test';
import { Effect, Option, Chunk, pipe } from 'effect';
import { AggregateState, CommandContextTest } from '@codeforbreakfast/eventsourcing-aggregates';

declare const UserAggregate: any;
declare const applyUserEvent: any;

interface UserState {
  email: string;
  name: string;
  isActive: boolean;
}

describe('UserAggregate', () => {
  it('should register a new user', async () => {
    const program: Effect.Effect<any, any, never> = pipe(
      Effect.succeed(UserAggregate.new()),
      Effect.flatMap((state: any) =>
        pipe(
          UserAggregate.commands.registerUser('[email protected]', 'Test User')(state),
          Effect.tap((events: any) => {
            const event: any = Chunk.unsafeHead(events);
            expect(event.type).toBe('UserRegistered');
            expect(event.data.email).toBe('[email protected]');
            return Effect.void;
          }),
          Effect.flatMap((events: any) =>
            pipe(
              Chunk.unsafeHead(events),
              applyUserEvent(state.data),
              Effect.tap((newState: any) => {
                expect(newState.email).toBe('[email protected]');
                expect(newState.isActive).toBe(true);
                return Effect.void;
              })
            )
          )
        )
      )
    ) as any;

    await Effect.runPromise(
      pipe(program, Effect.provide(CommandContextTest<string>('test-initiator')))
    );
  });

  it('should prevent duplicate registration', async () => {
    const existingState: AggregateState<UserState> = {
      nextEventNumber: 1,
      data: Option.some({
        email: '[email protected]',
        name: 'Existing User',
        isActive: true,
      }),
    };

    const program: Effect.Effect<any, any, never> = pipe(
      UserAggregate.commands.registerUser('[email protected]', 'Duplicate User')(existingState),
      Effect.either,
      Effect.tap((result: any) => {
        expect(result._tag).toBe('Left');
        return Effect.void;
      })
    ) as any;

    await Effect.runPromise(
      pipe(program, Effect.provide(CommandContextTest<string>('test-initiator')))
    );
  });
});

Error Handling

Handle domain-specific errors with Effect's error management:

import { Data, Effect, pipe } from 'effect';

declare const TransferCommand: any;
type TransferCommand = typeof TransferCommand;
declare const processTransfer: (command: TransferCommand) => Effect.Effect<any, any, any>;

class InsufficientFundsError extends Data.TaggedError('InsufficientFundsError')<{
  required: number;
  available: number;
}> {}

class AccountNotFoundError extends Data.TaggedError('AccountNotFoundError')<{
  accountId: string;
}> {}

const handleTransferCommand = (command: TransferCommand) =>
  pipe(
    processTransfer(command),
    Effect.catchTag('InsufficientFundsError', (error) =>
      Effect.logError(`Insufficient funds: need ${error.required}, have ${error.available}`)
    ),
    Effect.catchTag('AccountNotFoundError', (error) =>
      Effect.logError(`Account not found: ${error.accountId}`)
    )
  );

Command Processing

The package provides a generic command processing layer that maintains strong typing for domain events.

Type-Safe Command Handlers

Command handlers are generic over your domain event types:

import { Effect, Schema, Context, Layer, pipe } from 'effect';
import {
  CommandHandler,
  CommandRouter,
  createCommandProcessingService,
  CommandProcessingService,
  CommandRoutingError,
} from '@codeforbreakfast/eventsourcing-aggregates';
import type { EventStore } from '@codeforbreakfast/eventsourcing-store';
import type { WireCommand } from '@codeforbreakfast/eventsourcing-commands';

declare const userEventStoreLayer: any;

const UserCreated = Schema.Struct({
  type: Schema.Literal('UserCreated'),
  data: Schema.Struct({
    name: Schema.String,
    email: Schema.String,
  }),
});

const UserUpdated = Schema.Struct({
  type: Schema.Literal('UserUpdated'),
  data: Schema.Struct({
    name: Schema.String,
  }),
});

const UserEvent = Schema.Union(UserCreated, UserUpdated);
type UserEvent = typeof UserEvent.Type;

const UserEventStore = Context.GenericTag<EventStore<UserEvent>, EventStore<UserEvent>>(
  'UserEventStore'
);

const createUserHandler: CommandHandler<UserEvent> = {
  execute: (command: Readonly<WireCommand>) =>
    Effect.succeed([
      {
        type: 'UserCreated' as const,
        data: {
          name: (command.payload as any).name,
          email: (command.payload as any).email,
        },
      },
    ]),
};

const createUserRouter = (): CommandRouter<UserEvent> => ({
  route: (command: Readonly<WireCommand>) => {
    if (command.target === 'user' && command.name === 'CreateUser') {
      return Effect.succeed(createUserHandler);
    }
    return Effect.fail(
      new CommandRoutingError({
        target: command.target,
        message: `No handler found for ${command.target}:${command.name}`,
      })
    );
  },
});

const UserCommandProcessingService = Layer.effect(
  CommandProcessingService,
  createCommandProcessingService(UserEventStore)(createUserRouter())
);

const processCommand = (command: WireCommand) =>
  pipe(
    CommandProcessingService,
    Effect.flatMap((service) => service.processCommand(command)),
    Effect.provide(UserCommandProcessingService),
    Effect.provide(userEventStoreLayer)
  );

Key Principles

⚠️ IMPORTANT: Never use generic Event types in domain code

The Event type from @codeforbreakfast/eventsourcing-store has data: Schema.Unknown and is only for serialization boundaries (storage, wire protocol).

import { Effect } from 'effect';
import { CommandHandler } from '@codeforbreakfast/eventsourcing-aggregates';
import type { Event } from '@codeforbreakfast/eventsourcing-store';

declare const UserEvent: any;
type UserEvent = typeof UserEvent;

const wrongHandler: CommandHandler<Event> = {
  execute: () => Effect.succeed([{ type: 'UserCreated', data: { name: 'John' } } as Event]),
};

const correctHandler: CommandHandler<UserEvent> = {
  execute: () =>
    Effect.succeed([{ type: 'UserCreated', data: { name: 'John', email: '[email protected]' } }]),
};

Each aggregate or bounded context should:

  1. Define its own event union type
  2. Create a named event store tag
  3. Use typed command handlers and routers

See ARCHITECTURE.md for detailed design rationale.

Integration with Event Stores

This package works with any event store implementation that matches the EventStore interface:

import { Effect, pipe } from 'effect';
import { makeInMemoryEventStore } from '@codeforbreakfast/eventsourcing-store-inmemory';

declare const myAggregateOperation: Effect.Effect<any, any, any>;
declare const MyEventStoreTag: any;
declare const postgresEventStore: any;

const eventStoreLayer: any = makeInMemoryEventStore(MyEventStoreTag as any);
const testProgram = pipe(myAggregateOperation, Effect.provide(eventStoreLayer));

const productionProgram = pipe(myAggregateOperation, Effect.provide(postgresEventStore));

Related Packages

API Reference

For detailed TypeScript definitions, see the source code included with this package.

Contributing

This package is part of the @codeforbreakfast/eventsourcing monorepo. Please see the main repository for contributing guidelines.

License

MIT