npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2025 – Pkg Stats / Ryan Hefner

cqrskit

v0.0.1

Published

A lightweight TypeScript CQRS and Event Sourcing framework with pluggable database adapters

Readme

CQRSKit

A lightweight TypeScript CQRS (Command Query Responsibility Segregation) and Event Sourcing framework with pluggable database adapters.

Features

  • Full CQRS Implementation - Separate command and query models with clear boundaries
  • Event Sourcing - Rebuild aggregate state from historical events
  • Pluggable Database Adapters - Easy integration with any event store (Genesis DB, EventStoreDB, PostgreSQL, etc.)
  • TypeScript-First - Full type safety and excellent IDE support
  • Decorator-Based API - Clean, intuitive syntax for defining handlers
  • State Caching - Optional LRU cache for reconstructed aggregates
  • Event Upcasting - Handle event schema evolution gracefully
  • Async Event Processing - Background event handlers with partitioning and progress tracking
  • Testing Utilities - Fluent Given-When-Then API for testing command handlers
  • Zero Dependencies (except database adapters)

Installation

npm install cqrskit

For Genesis DB support:

npm install cqrskit genesisdb

Key Concepts & Terminology

Before diving in, here are the core concepts used throughout this framework:

CQRS (Command Query Responsibility Segregation)

A pattern that separates read and write operations into different models:

  • Commands modify state (writes)
  • Queries read state (reads)
  • This separation allows independent scaling and optimization of reads vs writes

Event Sourcing

Instead of storing current state, store a sequence of events that led to that state:

  • Events are immutable facts that have occurred
  • State Rebuilding reconstructs current state by replaying events
  • Provides complete audit trail and time-travel capabilities

Aggregate

A cluster of domain objects treated as a single unit:

  • Enforces business rules and invariants
  • All state changes happen through events
  • Example: A Task aggregate with properties like id, title, status

Command

An instruction to perform an action that may change state:

  • Immutable data structure
  • Examples: CreateTaskCommand, AssignTaskCommand
  • Can succeed or fail based on business rules

Event

A fact that has already happened and cannot be changed:

  • Immutable data structure
  • Past tense naming: TaskCreatedEvent, TaskAssignedEvent
  • Contains all data needed to update state

Command Handler

Business logic that processes a command:

  • Validates the command
  • Checks business rules
  • Publishes one or more events if valid
  • Example: handleCreate(), handleAssign()

State Rebuilding Handler

Logic that applies an event to recreate aggregate state:

  • Takes previous state and an event
  • Returns new state
  • Pure function (no side effects)
  • Example: onTaskCreated(), onTaskAssigned()

Event Handler

Asynchronous logic that reacts to events:

  • Updates read models (projections)
  • Sends notifications
  • Triggers external actions
  • Runs in background, separate from commands

Subject

A unique identifier for an aggregate instance:

  • Hierarchical path format: /task/task-1, /user/user-123
  • Used to query and store events
  • Supports recursive queries (e.g., /task/*)

Decorator

TypeScript annotation that adds metadata to classes/methods:

  • @CommandHandling() - Marks command handler methods
  • @StateRebuilding() - Marks state rebuilding methods
  • @EventHandling() - Marks event handler methods
  • Simplifies configuration and registration

Sourcing Mode

Strategy for loading events when executing a command:

  • NONE - Don't load any events (for stateless commands)
  • LOCAL - Load events for exact subject only
  • RECURSIVE - Load events for subject and all children (for parent-child relationships)

Upcasting

Converting old event versions to new versions:

  • Handles event schema evolution
  • Example: Renaming fields, adding required fields
  • Allows changing event structure without breaking old data
  • Applied automatically when reading events

Precondition

Optimistic locking mechanism:

  • Checks expected last event ID before publishing
  • Prevents concurrent modification conflicts
  • Example: Ensure task hasn't been modified since we loaded it

Event Store

Database that stores events:

  • Append-only (events never modified/deleted)
  • Supports querying by subject and timestamp
  • Examples: Genesis DB, PostgreSQL, MySQL
  • CQRSKit supports custom adapters

Read Model (Projection)

Denormalized view of data optimized for queries:

  • Built from events
  • Can be rebuilt anytime from event history
  • Example: Task list, user profile view
  • Updated by event handlers

Partition

Splitting event processing across multiple workers:

  • Enables parallel processing
  • Each partition handles subset of events
  • Ensures events for same subject stay in order

Progress Tracker

Tracks position in event stream:

  • Remembers last processed event
  • Enables resuming after restart
  • Per partition and event handler group

Metadata

Additional context attached to events:

  • Not part of event data itself
  • Examples: correlationId, userId, timestamp
  • Can be propagated from commands to events

Cache

In-memory storage of reconstructed aggregate state:

  • Improves performance
  • LRU eviction policy (removes least recently used)
  • Optional optimization

Quick Start

1. Define Your Domain

// domain/Task.ts
export enum TaskStatus {
  TODO = 'TODO',
  IN_PROGRESS = 'IN_PROGRESS',
  COMPLETED = 'COMPLETED'
}

export class Task {
  constructor(
    public id: string,
    public title: string,
    public assignee: string | null,
    public status: TaskStatus
  ) {}
}

2. Create Commands

// commands/CreateTaskCommand.ts
import { Command, SubjectCondition } from 'cqrskit';

export class CreateTaskCommand implements Command {
  constructor(
    public taskId: string,
    public title: string
  ) {}

  getSubject(): string {
    return `/task/${this.taskId}`;
  }

  getSubjectCondition(): SubjectCondition {
    return SubjectCondition.NEW; // Must not exist
  }
}

3. Define Events

// events/TaskCreatedEvent.ts
export class TaskCreatedEvent {
  constructor(
    public taskId: string,
    public title: string,
    public createdAt: string
  ) {}
}

4. Create Command Handlers

// handlers/TaskHandlers.ts
import { CommandEventPublisher, SourcingMode } from 'cqrskit';
import { CommandHandling, StateRebuilding } from 'cqrskit';

export class TaskHandlers {
  @CommandHandling({ sourcingMode: SourcingMode.LOCAL })
  async handleCreate(
    command: CreateTaskCommand,
    publisher: CommandEventPublisher<Task>
  ): Promise<string> {
    publisher.publish(
      new TaskCreatedEvent(command.taskId, command.title, new Date().toISOString())
    );
    return command.taskId;
  }

  @StateRebuilding()
  onTaskCreated(task: Task | null, event: TaskCreatedEvent): Task {
    return new Task(event.taskId, event.title, null, TaskStatus.TODO);
  }
}

5. Set Up the Framework

import {
  CommandRouter,
  GenesisDBAdapter,
  ConfiguredEventTypeResolver,
  JsonEventDataMarshaller,
  InMemoryStateCache
} from 'cqrskit';

// Configure database adapter
const eventStore = new GenesisDBAdapter({
  apiUrl: 'http://localhost:8080',
  authToken: 'your-token',
  source: 'my-app'
});

// Configure event type resolver
const eventTypeResolver = new ConfiguredEventTypeResolver()
  .register('app.TaskCreatedEvent', TaskCreatedEvent);

// Create command router
const commandRouter = new CommandRouter({
  eventStore,
  eventTypeResolver,
  eventDataMarshaller: new JsonEventDataMarshaller(),
  commandHandlers: [/* your handler definitions */],
  stateRebuildingHandlers: [/* your state rebuilding definitions */],
  eventSource: 'my-app',
  cache: new InMemoryStateCache(1000)
});

// Send commands
const taskId = await commandRouter.send(
  new CreateTaskCommand('task-1', 'Implement login feature')
);

Core Concepts

Commands

Commands are immutable instructions to change aggregate state. They implement the Command interface:

interface Command {
  getSubject(): string;
  getSubjectCondition?(): SubjectCondition;
}

Subject Conditions:

  • NONE - No validation (default)
  • NEW - Subject must not exist (for creation)
  • EXISTS - Subject must exist (for updates)

Events

Events represent facts that have occurred in the system. They are plain TypeScript classes:

class TaskCreatedEvent {
  constructor(
    public taskId: string,
    public title: string,
    public createdAt: string
  ) {}
}

Command Handlers

Command handlers contain business logic. They receive commands, validate them, and publish events:

@CommandHandling({ sourcingMode: SourcingMode.RECURSIVE })
async handleCommand(
  instance: MyAggregate,        // Reconstructed state
  command: MyCommand,            // The command
  publisher: CommandEventPublisher<MyAggregate>
): Promise<Result> {
  // Validate
  if (!instance.canDoSomething()) {
    throw new Error('Cannot do something');
  }

  // Publish events
  publisher.publish(new SomethingDoneEvent(...));

  return result;
}

Sourcing Modes:

  • NONE - No event sourcing
  • LOCAL - Load events for exact subject only
  • RECURSIVE - Load events for subject and children

State Rebuilding

State rebuilding handlers reconstruct aggregate state from events:

@StateRebuilding()
onEventOccurred(
  instance: MyAggregate | null,
  event: EventOccurred
): MyAggregate {
  // Return new state
  return new MyAggregate(...);
}

Event Handlers

Event handlers process events asynchronously (for read models, notifications, etc.):

@EventHandling({ group: 'my-projector' })
async onEventOccurred(event: EventOccurred): Promise<void> {
  // Update read model
  await database.save(...);
}

Pluggable Database Adapters

CQRSKit supports any event store through the EventStoreAdapter interface:

interface EventStoreAdapter {
  streamEvents(subject: string, options?: StreamOptions, recursive?: boolean): AsyncIterable<RawEvent>;
  observeEvents(subject: string, options?: StreamOptions, recursive?: boolean): AsyncIterable<RawEvent>;
  publishEvents(events: EventToPublish[], preconditions?: Precondition[]): Promise<RawEvent[]>;
  ping?(): Promise<void>;
}

Built-in Adapters

Genesis DB

import { GenesisDBAdapter } from 'cqrskit';

const adapter = new GenesisDBAdapter({
  apiUrl: process.env.GENESISDB_API_URL,
  authToken: process.env.GENESISDB_AUTH_TOKEN,
  source: 'my-app'
});

Creating Custom Adapters

See the examples/custom-adapter/ directory for complete examples:

  • InMemoryAdapter.ts - Simple in-memory implementation
  • MySQLAdapter.ts - MySQL database adapter with polling
  • PostgreSQLAdapter.ts - PostgreSQL adapter with LISTEN/NOTIFY

Example custom adapter:

export class MyCustomAdapter implements EventStoreAdapter {
  async *streamEvents(subject: string, options?: StreamOptions): AsyncIterable<RawEvent> {
    // Fetch historical events from your database
    const events = await myDatabase.query(...);
    for (const event of events) {
      yield mapToRawEvent(event);
    }
  }

  async *observeEvents(subject: string, options?: StreamOptions): AsyncIterable<RawEvent> {
    // Stream historical events first
    for await (const event of this.streamEvents(subject, options)) {
      yield event;
    }
    // Then subscribe to new events
    await myDatabase.subscribe(...);
  }

  async publishEvents(events: EventToPublish[]): Promise<RawEvent[]> {
    // Store events atomically
    return await myDatabase.insertEvents(events);
  }
}

Testing

CQRSKit provides a fluent Given-When-Then API for testing:

import { CommandHandlingTestFixture } from 'cqrskit';

it('should create a task', async () => {
  await CommandHandlingTestFixture
    .create<Task, CreateTaskCommand, string>()
    .withStateRebuildingHandlers([...])
    .using(Task, handlers.handleCreate)
    .given()
    .givenNothing()
    .when(new CreateTaskCommand('task-1', 'Implement login feature'))
    .then(expect => {
      expect
        .expectSuccessfulExecution()
        .expectSingleEvent(new TaskCreatedEvent(...))
        .expectResult('task-1');
    });
});

Test API:

  • givenNothing() - Start with no state
  • givenEvents(...) - Start with historical events
  • givenState(instance) - Start with explicit state
  • when(command) - Execute command
  • expectSuccessfulExecution() - Assert no errors
  • expectFailure(message?) - Assert error occurred
  • expectResult(value) - Assert return value
  • expectEventCount(n) - Assert event count
  • expectEvent(event) - Assert next event matches
  • expectNoEvents() - Assert no events published

Advanced Features

Event Upcasting

Handle event schema evolution:

import { EventUpcaster, UpcasterResult } from 'cqrskit';

class TaskEventUpcaster implements EventUpcaster {
  canUpcast(event: RawEvent): boolean {
    return event.type === 'app.TaskAddedEvent.v1';
  }

  upcast(event: RawEvent): UpcasterResult[] {
    return [{
      type: 'app.TaskCreatedEvent',
      data: {
        ...event.data,
        createdAt: event.data.timestamp || new Date().toISOString() // Rename field
      }
    }];
  }
}

State Caching

Improve performance with in-memory caching:

import { InMemoryStateCache } from 'cqrskit';

const cache = new InMemoryStateCache(1000); // Cache 1000 aggregates

const commandRouter = new CommandRouter({
  // ...
  cache
});

Metadata Propagation

Propagate metadata (correlation IDs, user context, etc.):

import { MetadataPropagationMode } from 'cqrskit';

const commandRouter = new CommandRouter({
  // ...
  metadataPropagation: {
    mode: MetadataPropagationMode.SHALLOW,
    keys: ['correlationId', 'userId']
  }
});

await commandRouter.send(
  new MyCommand(),
  { correlationId: '123', userId: 'john' }
);

Propagation Modes:

  • NONE - Don't propagate
  • SHALLOW - Propagate specific keys
  • DEEP - Propagate all metadata

Event Partitioning

Process events in parallel with partitioning:

import {
  EventHandlingProcessor,
  DefaultPartitionKeyResolver,
  PerSubjectEventSequenceResolver
} from 'cqrskit';

const processor = new EventHandlingProcessor({
  group: 'my-group',
  partition: 0, // Process partition 0
  partitionKeyResolver: new DefaultPartitionKeyResolver(10), // 10 partitions
  eventSequenceResolver: new PerSubjectEventSequenceResolver(),
  // ...
});

processor.start();

Examples

See the examples/ directory for complete working examples:

  • examples/tasks/ - Full task management system
  • examples/custom-adapter/ - Custom in-memory adapter
  • examples/testing/ - Test examples with Given-When-Then

Run the task management example:

cd examples/tasks
npm install
npx tsx app.ts

Architecture

CQRSKit follows the CQRS and Event Sourcing patterns:

┌─────────────┐
│   Command   │
└──────┬──────┘
       │
       v
┌─────────────────┐      ┌──────────────┐
│ CommandRouter   │─────→│ Event Store  │
└─────────────────┘      └──────┬───────┘
       │                         │
       │ Rebuild State          │ Observe
       v                         v
┌─────────────────┐      ┌──────────────────┐
│ State Handlers  │      │ Event Processors │
└─────────────────┘      └──────────────────┘
                                 │
                                 v
                         ┌──────────────┐
                         │  Read Model  │
                         └──────────────┘

API Reference

Core Types

  • Command - Base interface for commands
  • SubjectCondition - Validation for aggregate existence
  • SourcingMode - Event loading strategy
  • RawEvent - Event as stored in database
  • EventToPublish - Event to be published
  • Precondition - Conditional event publishing

Command Handling

  • CommandRouter - Routes commands to handlers
  • CommandHandler - Processes commands
  • StateRebuildingHandler - Rebuilds state from events
  • CommandEventPublisher - Publishes events during command execution

Event Handling

  • EventHandler - Processes events asynchronously
  • EventHandlingProcessor - Background event processing
  • ProgressTracker - Tracks processing position
  • PartitionKeyResolver - Maps events to partitions

Persistence

  • EventStoreAdapter - Abstract database interface
  • GenesisDBAdapter - Genesis DB implementation
  • StreamOptions - Options for streaming events

Utilities

  • EventTypeResolver - Maps classes to event types
  • EventDataMarshaller - Serializes events
  • EventUpcaster - Migrates event schemas
  • StateRebuildingCache - Caches reconstructed state

Contributing

Contributions welcome! Please open an issue or PR.

Related Projects

  • Genesis DB: Genesis DB - The GDPR-ready event sourcing database

License

MIT