npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

@artworkdev/cqrs

v0.7.0

Published

Standalone CQRS primitives for TypeScript applications.

Downloads

1,136

Readme

@artworkdev/cqrs

Standalone CQRS primitives for TypeScript applications.

@artworkdev/cqrs provides framework-agnostic command, query and event buses. It is inspired by @nestjs/cqrs, but intentionally avoids Nest, decorators, reflect-metadata, RxJS and runtime dependency injection.

Status

The current main branch targets 0.7.0.

Published package:

bun add @artworkdev/cqrs

Available primitives:

  • CommandBus for application writes.
  • QueryBus for application reads.
  • EventBus for in-process event handlers.
  • AggregateRoot and DomainEvent for domain event collection.
  • TransactionPerformer and TransactionableAsync for explicit transaction boundaries.
  • OutboxRepository, OutboxMessageFactory and OutboxProcessor for transaction-aware outbox workflows.
  • DomainEventPublisher and EventBusDomainEventPublisher for post-outbox domain event publication.

Design Rules

  • Commands and queries use execute.
  • Events use publish and publishAll.
  • Registration is explicit, not decorator-based.
  • Commands and queries have one handler.
  • Events can have zero, one or many handlers.
  • A missing command/query handler is an error.
  • A missing event handler is not an error.
  • The library does not own transactions. Application code must keep aggregate persistence and outbox writes atomic.
  • Outbox writes return transactionable operations. The use case decides which transaction executes them.

Commands

A command represents an application write intent. In strict CQRS, command handlers should usually return void. Technical IDs such as commandId, jobId or correlationId are acceptable when the caller needs them.

import { Command, CommandBus, type CommandHandler } from '@artworkdev/cqrs';

class RegisterUserCommand extends Command {
	constructor(
		readonly userId: string,
		readonly email: string,
	) {
		super();
	}
}

class RegisterUserCommandHandler
	implements CommandHandler<RegisterUserCommand, void>
{
	async execute(command: RegisterUserCommand): Promise<void> {
		// Persist the aggregate here.
	}
}

const commandBus = new CommandBus();

commandBus.register(RegisterUserCommand, new RegisterUserCommandHandler());

await commandBus.execute(
	new RegisterUserCommand('user-1', '[email protected]'),
);

Command Errors

CommandBus.execute() throws:

  • CommandHandlerNotFoundError when no handler is registered.
  • CommandHandlerAlreadyRegisteredError when registering a second handler for the same command constructor.

Queries

A query represents an application read intent. Query handlers should return a read model or view model, not a mutable domain aggregate.

import { Query, QueryBus, type QueryHandler } from '@artworkdev/cqrs';

type UserProfileViewModel = {
	id: string;
	email: string;
};

class RetrieveUserProfileQuery extends Query<UserProfileViewModel> {
	constructor(readonly userId: string) {
		super();
	}
}

class RetrieveUserProfileQueryHandler
	implements QueryHandler<RetrieveUserProfileQuery, UserProfileViewModel>
{
	async execute(
		query: RetrieveUserProfileQuery,
	): Promise<UserProfileViewModel> {
		return {
			id: query.userId,
			email: '[email protected]',
		};
	}
}

const queryBus = new QueryBus();

queryBus.register(
	RetrieveUserProfileQuery,
	new RetrieveUserProfileQueryHandler(),
);

const profile = await queryBus.execute(
	new RetrieveUserProfileQuery('user-1'),
);

Query Errors

QueryBus.execute() throws:

  • QueryHandlerNotFoundError when no handler is registered.
  • QueryHandlerAlreadyRegisteredError when registering a second handler for the same query constructor.

Events

An event represents something that already happened. Events can be observed by multiple handlers. Publishing an event with no registered handler is valid.

import {
	Event,
	EventBus,
	type EventHandler,
	EventHandlerExecutionError,
} from '@artworkdev/cqrs';

class UserRegisteredEvent extends Event {
	constructor(readonly userId: string) {
		super();
	}
}

class SendWelcomeEmailEventHandler
	implements EventHandler<UserRegisteredEvent>
{
	async handle(event: UserRegisteredEvent): Promise<void> {
		// Send a welcome email, enqueue a job, update a projection, etc.
	}
}

const eventBus = new EventBus();

eventBus.register(
	UserRegisteredEvent,
	new SendWelcomeEmailEventHandler(),
);

await eventBus.publish(new UserRegisteredEvent('user-1'));

Publish multiple events:

await eventBus.publishAll([
	new UserRegisteredEvent('user-1'),
	new UserRegisteredEvent('user-2'),
]);

Event Errors

EventBus does not fail when no handler is registered.

If one or more handlers throw, EventBus throws EventHandlerExecutionError. The error exposes failures, so callers can inspect which events failed and why.

try {
	await eventBus.publish(new UserRegisteredEvent('user-1'));
} catch (error) {
	if (error instanceof EventHandlerExecutionError) {
		for (const failure of error.failures) {
			// failure.event
			// failure.error
		}
	}
}

Domain Events

AggregateRoot records domain events produced by domain behavior. It does not know about EventBus, outbox persistence, transactions or infrastructure. It is intentionally not generic over the full list of event types: forcing every aggregate to maintain a large event union makes real aggregates noisy. Keep event-specific typing inside the aggregate methods and on<EventName> handlers instead.

import { AggregateRoot, DomainEvent } from '@artworkdev/cqrs';

class UserRegisteredDomainEvent extends DomainEvent {
	constructor(
		readonly userId: string,
		readonly email: string,
	) {
		super();
	}
}

class User extends AggregateRoot {
	id = '';
	email = '';

	register(params: { readonly userId: string; readonly email: string }): void {
		this.apply(
			new UserRegisteredDomainEvent(params.userId, params.email),
		);
	}

	protected onUserRegisteredDomainEvent(
		event: UserRegisteredDomainEvent,
	): void {
		this.id = event.userId;
		this.email = event.email;
	}
}

const user = new User();

user.register({
	userId: 'user-1',
	email: '[email protected]',
});

const events = user.pullDomainEvents();

apply() records new events and invokes a matching internal handler named on<EventClassName> when it exists.

this.apply(new UserRegisteredDomainEvent('user-1', '[email protected]'));

loadFromHistory() replays events into the aggregate without recording them as new events.

user.loadFromHistory(previousEvents);

Use pullDomainEvents() after persistence to collect and clear pending events. Use getDomainEvents() only for inspection.

Outbox

The outbox primitives define how domain events become durable messages. The library provides the contract and a memory implementation for tests; production storage belongs to your application.

import {
	InMemoryOutboxRepository,
	NoopTransactionPerformer,
	OutboxEventRegistry,
	OutboxMessageFactory,
	RegisteredOutboxMessageSerializer,
	type Clock,
	type IdGenerator,
} from '@artworkdev/cqrs';

const fixedClock: Clock = {
	now: () => new Date('2026-01-01T00:00:00.000Z'),
};

const idGenerator: IdGenerator = {
	generate: () => 'outbox-message-1',
};

const outboxEventRegistry = new OutboxEventRegistry();

outboxEventRegistry.register(UserRegisteredDomainEvent, {
	eventName: 'user.registered',
	eventVersion: 1,
	serialize: (event) => ({
		userId: event.userId,
		email: event.email,
	}),
	retrieveMetadata: (event) => ({
		aggregateId: event.userId,
		aggregateType: 'user',
	}),
	deserialize: (payload) =>
		new UserRegisteredDomainEvent(
			String(payload.userId),
			String(payload.email),
		),
});

const outboxMessageFactory =
	new OutboxMessageFactory({
		clock: fixedClock,
		idGenerator,
		serializer: new RegisteredOutboxMessageSerializer(outboxEventRegistry),
	});

const outboxRepository = new InMemoryOutboxRepository(fixedClock);
const transactionPerformer = new NoopTransactionPerformer();
const messages = outboxMessageFactory.createMany(events);

await transactionPerformer.perform(outboxRepository.appendMany(messages));

Outbox event identity is explicit. Do not use class names as durable event names. eventName and eventVersion are stored on each OutboxMessage, so renaming a class does not break old messages.

type OutboxMessage = {
	readonly id: string;
	readonly eventName: string;
	readonly eventVersion: number;
	readonly payload: Record<string, unknown>;
	readonly aggregateId?: string;
	readonly aggregateType?: string;
	readonly correlationId?: string;
	readonly causationId?: string;
};

Use retrieveMetadata for event-derived metadata such as aggregate identity. Use OutboxMessageFactory.create({ event, metadata }) for request-level metadata such as correlationId and causationId.

Repository Contract

OutboxRepository is intentionally small, but write operations are transaction-aware:

type TransactionableAsync<TResult = void, TTransaction = void> = (
	transaction: TTransaction,
) => Promise<TResult> | TResult;

interface OutboxRepository<TTransaction = void> {
	append(message: OutboxMessage): TransactionableAsync<void, TTransaction>;
	appendMany(
		messages: readonly OutboxMessage[],
	): TransactionableAsync<void, TTransaction>;
	findPending(params?: { readonly limit?: number }):
		| Promise<readonly OutboxMessage[]>
		| readonly OutboxMessage[];
	markPublished(id: string): TransactionableAsync<void, TTransaction>;
	markFailed(
		id: string,
		error: unknown,
	): TransactionableAsync<void, TTransaction>;
}

InMemoryOutboxRepository is for tests and local development only. It is not a durable outbox.

Transaction Boundary

The use case owns the transaction. This is the important bit:

await transactionPerformer.perform(async (transaction) => {
	await userRepository.save(user)(transaction);

	const events = user.pullDomainEvents();
	const messages = outboxMessageFactory.createMany(events);

	await outboxRepository.appendMany(messages)(transaction);
});

Do not hide transaction boundaries inside the bus. If aggregate persistence and outbox writes are not atomic, the outbox gives a false sense of reliability.

SQLite Adapter

The SQLite adapter is exposed from a separate subpath:

import { Database } from 'bun:sqlite';
import {
	SqliteOutboxRepository,
	SqliteTransactionPerformer,
} from '@artworkdev/cqrs/sqlite';

const database = new Database('app.sqlite');
const transactionPerformer = new SqliteTransactionPerformer(database);
const outboxRepository = new SqliteOutboxRepository({ database });

outboxRepository.initialize();

Use it exactly like the in-memory repository, but execute writes through the SQLite transaction performer:

await transactionPerformer.perform((transaction) => {
	userRepository.save(user)(transaction);
	outboxRepository.appendMany(messages)(transaction);
});

SqliteTransactionPerformer uses BEGIN IMMEDIATE through Bun SQLite transactions. Transaction blocks must be synchronous. Do async work before opening the SQLite transaction.

Processing Pending Messages

OutboxProcessor reads pending messages, deserializes them into domain events, publishes them, then marks the message as published or failed.

import {
	EventBus,
	EventBusDomainEventPublisher,
	OutboxProcessor,
	RegisteredOutboxMessageDeserializer,
} from '@artworkdev/cqrs';

const eventBus = new EventBus();

const processor = new OutboxProcessor({
	outboxRepository,
	transactionPerformer,
	domainEventPublisher: new EventBusDomainEventPublisher(eventBus),
	deserializer: new RegisteredOutboxMessageDeserializer(outboxEventRegistry),
});

const result = await processor.processPending({ limit: 100 });

OutboxProcessor publishes messages sequentially. That is deliberate for v1: it keeps retry and failure semantics simple. Parallel processing belongs in an application-level worker once locking, leases and concurrency limits are known.

EventBus vs DomainEventPublisher

EventBus is the low-level in-process publication mechanism. It finds registered handlers and calls them.

DomainEventPublisher is a port for publishing a domain event after it has left the outbox. EventBusDomainEventPublisher is the built-in adapter that publishes through EventBus.

The correct shape is:

await transactionPerformer.perform(async (transaction) => {
	await userRepository.save(user)(transaction);
	await outboxRepository.appendMany(messages)(transaction);
});

await outboxProcessor.processPending();

Public API

export {
	Command,
	CommandBus,
	CommandHandlerAlreadyRegisteredError,
	CommandHandlerNotFoundError,
	CommandRegistry,
	Event,
	EventBus,
	EventBusDomainEventPublisher,
	EventHandlerExecutionError,
	EventRegistry,
	InMemoryOutboxRepository,
	NoopTransactionPerformer,
	OutboxEventAlreadyRegisteredError,
	OutboxEventNotRegisteredError,
	OutboxEventRegistry,
	OutboxMessageFactory,
	OutboxProcessor,
	OutboxMessageStatus,
	AggregateRoot,
	DomainEvent,
	Query,
	QueryBus,
	QueryHandlerAlreadyRegisteredError,
	QueryHandlerNotFoundError,
	QueryRegistry,
	RegisteredOutboxMessageDeserializer,
	RegisteredOutboxMessageSerializer,
};

export type {
	CommandConstructor,
	CommandHandler,
	EventConstructor,
	EventHandler,
	EventHandlerExecutionFailure,
	Clock,
	DomainEventPublisher,
	DomainEventSerializer,
	DomainEventConstructor,
	FindPendingOutboxMessagesParams,
	IdGenerator,
	OutboxMessage,
	OutboxMessageDeserializer,
	OutboxEventRegistration,
	OutboxMessageFactoryDependencies,
	OutboxMessageMetadata,
	OutboxMessagePayload,
	OutboxProcessFailure,
	OutboxProcessResult,
	OutboxProcessorDependencies,
	OutboxRepository,
	ProcessPendingOutboxMessagesParams,
	QueryConstructor,
	QueryHandler,
	RetrieveOutboxEventRegistrationParams,
	RegisteredOutboxEvent,
	SerializedOutboxEvent,
	TransactionableAsync,
	TransactionPerformer,
};

SQLite adapter:

import {
	SqliteAsyncTransactionBlockError,
	SqliteOutboxPayloadParseError,
	SqliteOutboxRepository,
	SqliteOutboxStatusParseError,
	SqliteTransactionPerformer,
	type SqliteBinding,
	type SqliteDatabase,
	type SqliteStatement,
	type SqliteTransactionContext,
	type SqliteTransactionRunner,
} from '@artworkdev/cqrs/sqlite';

Development

Use Bun for local development.

bun install
bun run validate
bun run build
bun run pack:dry-run

Useful scripts:

  • bun test
  • bun run check
  • bun run typecheck
  • bun run build
  • bun run publish:dry-run