@artworkdev/cqrs
v0.7.0
Published
Standalone CQRS primitives for TypeScript applications.
Downloads
1,136
Maintainers
Readme
@artworkdev/cqrs
Standalone CQRS primitives for TypeScript applications.
@artworkdev/cqrs provides framework-agnostic command, query and event buses.
It is inspired by @nestjs/cqrs, but intentionally avoids Nest, decorators,
reflect-metadata, RxJS and runtime dependency injection.
Status
The current main branch targets 0.7.0.
Published package:
bun add @artworkdev/cqrsAvailable primitives:
CommandBusfor application writes.QueryBusfor application reads.EventBusfor in-process event handlers.AggregateRootandDomainEventfor domain event collection.TransactionPerformerandTransactionableAsyncfor explicit transaction boundaries.OutboxRepository,OutboxMessageFactoryandOutboxProcessorfor transaction-aware outbox workflows.DomainEventPublisherandEventBusDomainEventPublisherfor post-outbox domain event publication.
Design Rules
- Commands and queries use
execute. - Events use
publishandpublishAll. - Registration is explicit, not decorator-based.
- Commands and queries have one handler.
- Events can have zero, one or many handlers.
- A missing command/query handler is an error.
- A missing event handler is not an error.
- The library does not own transactions. Application code must keep aggregate persistence and outbox writes atomic.
- Outbox writes return transactionable operations. The use case decides which transaction executes them.
Commands
A command represents an application write intent. In strict CQRS, command
handlers should usually return void. Technical IDs such as commandId,
jobId or correlationId are acceptable when the caller needs them.
import { Command, CommandBus, type CommandHandler } from '@artworkdev/cqrs';
class RegisterUserCommand extends Command {
constructor(
readonly userId: string,
readonly email: string,
) {
super();
}
}
class RegisterUserCommandHandler
implements CommandHandler<RegisterUserCommand, void>
{
async execute(command: RegisterUserCommand): Promise<void> {
// Persist the aggregate here.
}
}
const commandBus = new CommandBus();
commandBus.register(RegisterUserCommand, new RegisterUserCommandHandler());
await commandBus.execute(
new RegisterUserCommand('user-1', '[email protected]'),
);Command Errors
CommandBus.execute() throws:
CommandHandlerNotFoundErrorwhen no handler is registered.CommandHandlerAlreadyRegisteredErrorwhen registering a second handler for the same command constructor.
Queries
A query represents an application read intent. Query handlers should return a read model or view model, not a mutable domain aggregate.
import { Query, QueryBus, type QueryHandler } from '@artworkdev/cqrs';
type UserProfileViewModel = {
id: string;
email: string;
};
class RetrieveUserProfileQuery extends Query<UserProfileViewModel> {
constructor(readonly userId: string) {
super();
}
}
class RetrieveUserProfileQueryHandler
implements QueryHandler<RetrieveUserProfileQuery, UserProfileViewModel>
{
async execute(
query: RetrieveUserProfileQuery,
): Promise<UserProfileViewModel> {
return {
id: query.userId,
email: '[email protected]',
};
}
}
const queryBus = new QueryBus();
queryBus.register(
RetrieveUserProfileQuery,
new RetrieveUserProfileQueryHandler(),
);
const profile = await queryBus.execute(
new RetrieveUserProfileQuery('user-1'),
);Query Errors
QueryBus.execute() throws:
QueryHandlerNotFoundErrorwhen no handler is registered.QueryHandlerAlreadyRegisteredErrorwhen registering a second handler for the same query constructor.
Events
An event represents something that already happened. Events can be observed by multiple handlers. Publishing an event with no registered handler is valid.
import {
Event,
EventBus,
type EventHandler,
EventHandlerExecutionError,
} from '@artworkdev/cqrs';
class UserRegisteredEvent extends Event {
constructor(readonly userId: string) {
super();
}
}
class SendWelcomeEmailEventHandler
implements EventHandler<UserRegisteredEvent>
{
async handle(event: UserRegisteredEvent): Promise<void> {
// Send a welcome email, enqueue a job, update a projection, etc.
}
}
const eventBus = new EventBus();
eventBus.register(
UserRegisteredEvent,
new SendWelcomeEmailEventHandler(),
);
await eventBus.publish(new UserRegisteredEvent('user-1'));Publish multiple events:
await eventBus.publishAll([
new UserRegisteredEvent('user-1'),
new UserRegisteredEvent('user-2'),
]);Event Errors
EventBus does not fail when no handler is registered.
If one or more handlers throw, EventBus throws
EventHandlerExecutionError. The error exposes failures, so callers can
inspect which events failed and why.
try {
await eventBus.publish(new UserRegisteredEvent('user-1'));
} catch (error) {
if (error instanceof EventHandlerExecutionError) {
for (const failure of error.failures) {
// failure.event
// failure.error
}
}
}Domain Events
AggregateRoot records domain events produced by domain behavior. It does not
know about EventBus, outbox persistence, transactions or infrastructure.
It is intentionally not generic over the full list of event types: forcing
every aggregate to maintain a large event union makes real aggregates noisy.
Keep event-specific typing inside the aggregate methods and on<EventName>
handlers instead.
import { AggregateRoot, DomainEvent } from '@artworkdev/cqrs';
class UserRegisteredDomainEvent extends DomainEvent {
constructor(
readonly userId: string,
readonly email: string,
) {
super();
}
}
class User extends AggregateRoot {
id = '';
email = '';
register(params: { readonly userId: string; readonly email: string }): void {
this.apply(
new UserRegisteredDomainEvent(params.userId, params.email),
);
}
protected onUserRegisteredDomainEvent(
event: UserRegisteredDomainEvent,
): void {
this.id = event.userId;
this.email = event.email;
}
}
const user = new User();
user.register({
userId: 'user-1',
email: '[email protected]',
});
const events = user.pullDomainEvents();apply() records new events and invokes a matching internal handler named
on<EventClassName> when it exists.
this.apply(new UserRegisteredDomainEvent('user-1', '[email protected]'));loadFromHistory() replays events into the aggregate without recording them as
new events.
user.loadFromHistory(previousEvents);Use pullDomainEvents() after persistence to collect and clear pending events.
Use getDomainEvents() only for inspection.
Outbox
The outbox primitives define how domain events become durable messages. The library provides the contract and a memory implementation for tests; production storage belongs to your application.
import {
InMemoryOutboxRepository,
NoopTransactionPerformer,
OutboxEventRegistry,
OutboxMessageFactory,
RegisteredOutboxMessageSerializer,
type Clock,
type IdGenerator,
} from '@artworkdev/cqrs';
const fixedClock: Clock = {
now: () => new Date('2026-01-01T00:00:00.000Z'),
};
const idGenerator: IdGenerator = {
generate: () => 'outbox-message-1',
};
const outboxEventRegistry = new OutboxEventRegistry();
outboxEventRegistry.register(UserRegisteredDomainEvent, {
eventName: 'user.registered',
eventVersion: 1,
serialize: (event) => ({
userId: event.userId,
email: event.email,
}),
retrieveMetadata: (event) => ({
aggregateId: event.userId,
aggregateType: 'user',
}),
deserialize: (payload) =>
new UserRegisteredDomainEvent(
String(payload.userId),
String(payload.email),
),
});
const outboxMessageFactory =
new OutboxMessageFactory({
clock: fixedClock,
idGenerator,
serializer: new RegisteredOutboxMessageSerializer(outboxEventRegistry),
});
const outboxRepository = new InMemoryOutboxRepository(fixedClock);
const transactionPerformer = new NoopTransactionPerformer();
const messages = outboxMessageFactory.createMany(events);
await transactionPerformer.perform(outboxRepository.appendMany(messages));Outbox event identity is explicit. Do not use class names as durable event
names. eventName and eventVersion are stored on each OutboxMessage, so
renaming a class does not break old messages.
type OutboxMessage = {
readonly id: string;
readonly eventName: string;
readonly eventVersion: number;
readonly payload: Record<string, unknown>;
readonly aggregateId?: string;
readonly aggregateType?: string;
readonly correlationId?: string;
readonly causationId?: string;
};Use retrieveMetadata for event-derived metadata such as aggregate identity.
Use OutboxMessageFactory.create({ event, metadata }) for request-level
metadata such as correlationId and causationId.
Repository Contract
OutboxRepository is intentionally small, but write operations are
transaction-aware:
type TransactionableAsync<TResult = void, TTransaction = void> = (
transaction: TTransaction,
) => Promise<TResult> | TResult;
interface OutboxRepository<TTransaction = void> {
append(message: OutboxMessage): TransactionableAsync<void, TTransaction>;
appendMany(
messages: readonly OutboxMessage[],
): TransactionableAsync<void, TTransaction>;
findPending(params?: { readonly limit?: number }):
| Promise<readonly OutboxMessage[]>
| readonly OutboxMessage[];
markPublished(id: string): TransactionableAsync<void, TTransaction>;
markFailed(
id: string,
error: unknown,
): TransactionableAsync<void, TTransaction>;
}InMemoryOutboxRepository is for tests and local development only. It is not a
durable outbox.
Transaction Boundary
The use case owns the transaction. This is the important bit:
await transactionPerformer.perform(async (transaction) => {
await userRepository.save(user)(transaction);
const events = user.pullDomainEvents();
const messages = outboxMessageFactory.createMany(events);
await outboxRepository.appendMany(messages)(transaction);
});Do not hide transaction boundaries inside the bus. If aggregate persistence and outbox writes are not atomic, the outbox gives a false sense of reliability.
SQLite Adapter
The SQLite adapter is exposed from a separate subpath:
import { Database } from 'bun:sqlite';
import {
SqliteOutboxRepository,
SqliteTransactionPerformer,
} from '@artworkdev/cqrs/sqlite';
const database = new Database('app.sqlite');
const transactionPerformer = new SqliteTransactionPerformer(database);
const outboxRepository = new SqliteOutboxRepository({ database });
outboxRepository.initialize();Use it exactly like the in-memory repository, but execute writes through the SQLite transaction performer:
await transactionPerformer.perform((transaction) => {
userRepository.save(user)(transaction);
outboxRepository.appendMany(messages)(transaction);
});SqliteTransactionPerformer uses BEGIN IMMEDIATE through Bun SQLite
transactions. Transaction blocks must be synchronous. Do async work before
opening the SQLite transaction.
Processing Pending Messages
OutboxProcessor reads pending messages, deserializes them into domain events,
publishes them, then marks the message as published or failed.
import {
EventBus,
EventBusDomainEventPublisher,
OutboxProcessor,
RegisteredOutboxMessageDeserializer,
} from '@artworkdev/cqrs';
const eventBus = new EventBus();
const processor = new OutboxProcessor({
outboxRepository,
transactionPerformer,
domainEventPublisher: new EventBusDomainEventPublisher(eventBus),
deserializer: new RegisteredOutboxMessageDeserializer(outboxEventRegistry),
});
const result = await processor.processPending({ limit: 100 });OutboxProcessor publishes messages sequentially. That is deliberate for v1:
it keeps retry and failure semantics simple. Parallel processing belongs in an
application-level worker once locking, leases and concurrency limits are known.
EventBus vs DomainEventPublisher
EventBus is the low-level in-process publication mechanism. It finds
registered handlers and calls them.
DomainEventPublisher is a port for publishing a domain event after it has
left the outbox. EventBusDomainEventPublisher is the built-in adapter that
publishes through EventBus.
The correct shape is:
await transactionPerformer.perform(async (transaction) => {
await userRepository.save(user)(transaction);
await outboxRepository.appendMany(messages)(transaction);
});
await outboxProcessor.processPending();Public API
export {
Command,
CommandBus,
CommandHandlerAlreadyRegisteredError,
CommandHandlerNotFoundError,
CommandRegistry,
Event,
EventBus,
EventBusDomainEventPublisher,
EventHandlerExecutionError,
EventRegistry,
InMemoryOutboxRepository,
NoopTransactionPerformer,
OutboxEventAlreadyRegisteredError,
OutboxEventNotRegisteredError,
OutboxEventRegistry,
OutboxMessageFactory,
OutboxProcessor,
OutboxMessageStatus,
AggregateRoot,
DomainEvent,
Query,
QueryBus,
QueryHandlerAlreadyRegisteredError,
QueryHandlerNotFoundError,
QueryRegistry,
RegisteredOutboxMessageDeserializer,
RegisteredOutboxMessageSerializer,
};
export type {
CommandConstructor,
CommandHandler,
EventConstructor,
EventHandler,
EventHandlerExecutionFailure,
Clock,
DomainEventPublisher,
DomainEventSerializer,
DomainEventConstructor,
FindPendingOutboxMessagesParams,
IdGenerator,
OutboxMessage,
OutboxMessageDeserializer,
OutboxEventRegistration,
OutboxMessageFactoryDependencies,
OutboxMessageMetadata,
OutboxMessagePayload,
OutboxProcessFailure,
OutboxProcessResult,
OutboxProcessorDependencies,
OutboxRepository,
ProcessPendingOutboxMessagesParams,
QueryConstructor,
QueryHandler,
RetrieveOutboxEventRegistrationParams,
RegisteredOutboxEvent,
SerializedOutboxEvent,
TransactionableAsync,
TransactionPerformer,
};SQLite adapter:
import {
SqliteAsyncTransactionBlockError,
SqliteOutboxPayloadParseError,
SqliteOutboxRepository,
SqliteOutboxStatusParseError,
SqliteTransactionPerformer,
type SqliteBinding,
type SqliteDatabase,
type SqliteStatement,
type SqliteTransactionContext,
type SqliteTransactionRunner,
} from '@artworkdev/cqrs/sqlite';Development
Use Bun for local development.
bun install
bun run validate
bun run build
bun run pack:dry-runUseful scripts:
bun testbun run checkbun run typecheckbun run buildbun run publish:dry-run
