@delta-base/toolkit
v0.0.4
Published
Application-level event sourcing toolkit for delta-base
Readme
@delta-base/toolkit
Application-level event sourcing toolkit for delta-base. This package provides high-level patterns and utilities for building event-sourced applications with delta-base.
Features
- 🏗️ Core Types - Strongly-typed events, commands, and projections
- 💾 In-Memory Event Store - Complete event store implementation for testing
- ⚡ Command Handlers - Simple and decider pattern-based command handling with retry logic
- 📊 Projection System - Interface-based projections with pluggable read model stores
- 🔧 Error Handling - Comprehensive error types with type guards
- 🛠️ Utility Functions - Event and command creation helpers
Installation
npm install @delta-base/toolkit
# or
pnpm add @delta-base/toolkitQuick Start
Basic Event Store Usage
import { InMemoryEventStore, createEvent } from '@delta-base/toolkit';
const eventStore = new InMemoryEventStore();
// Create and append events
const event = createEvent('UserRegistered', {
userId: 'user-123',
email: '[email protected]',
name: 'John Doe'
});
await eventStore.appendToStream('user-123', [event]);
// Read events back
const events = await eventStore.readFromStream('user-123');
console.log(events.events); // [{ type: 'UserRegistered', ... }]Command Handling with Decider Pattern
import {
handleCommandWithDecider,
createCommand,
InMemoryEventStore,
type Decider,
type ReadEvent
} from '@delta-base/toolkit';
// Define your aggregate state
interface UserState {
id?: string;
email?: string;
isActive: boolean;
}
// Define your events
type UserEvent =
| { type: 'UserRegistered'; data: { userId: string; email: string; name: string } }
| { type: 'EmailUpdated'; data: { userId: string; newEmail: string } };
// Define your commands
type UserCommand =
| { type: 'RegisterUser'; data: { userId: string; email: string; name: string } }
| { type: 'UpdateEmail'; data: { userId: string; newEmail: string } };
// Create your decider
const userDecider: Decider<UserState, UserCommand, UserEvent> = {
initialState: () => ({ isActive: false }),
decide: (state: UserState, command: UserCommand) => {
switch (command.type) {
case 'RegisterUser':
if (state.id) throw new Error('User already exists');
return [{
type: 'UserRegistered',
data: command.data
}];
case 'UpdateEmail':
if (!state.id) throw new Error('User does not exist');
return [{
type: 'EmailUpdated',
data: { userId: command.data.userId, newEmail: command.data.newEmail }
}];
default:
return [];
}
},
evolve: (state: UserState, event: ReadEvent<UserEvent>) => {
switch (event.type) {
case 'UserRegistered':
return {
...state,
id: event.data.userId,
email: event.data.email,
isActive: true
};
case 'EmailUpdated':
return {
...state,
email: event.data.newEmail
};
default:
return state;
}
}
};
// Use the command handler
const eventStore = new InMemoryEventStore();
const command = createCommand('RegisterUser', {
userId: '123',
email: '[email protected]',
name: 'John Doe'
});
const result = await handleCommandWithDecider(
eventStore,
'user-123',
command,
userDecider
);
console.log(result.newState); // { id: '123', email: '[email protected]', isActive: true }
console.log(result.newEvents); // [UserRegistered event]Building Projections with the New Interface System
The new projection system provides a powerful interface-based approach with pluggable read model stores:
import {
BaseProjection,
InMemoryReadModelStore,
KVReadModelStore,
type ReadEvent,
type IReadModelStore
} from '@delta-base/toolkit';
// Define your read model structure
interface UserReadModel {
id: string;
email: string;
name: string;
status: 'active' | 'inactive';
registeredAt: string;
lastUpdated: string;
revision: number;
}
// Create a projection class extending BaseProjection
class UserProjection extends BaseProjection {
readonly supportedEventTypes = [
'UserRegisteredEvent',
'UserUpdatedEvent',
'UserStatusChangedEvent'
];
constructor(store: IReadModelStore) {
super(store);
}
protected async processEvent(event: ReadEvent): Promise<void> {
switch (event.type) {
case 'UserRegisteredEvent':
await this.handleUserRegistered(event);
break;
case 'UserUpdatedEvent':
await this.handleUserUpdated(event);
break;
case 'UserStatusChangedEvent':
await this.handleUserStatusChanged(event);
break;
}
}
private async handleUserRegistered(event: ReadEvent): Promise<void> {
const eventData = event.data as any;
const userId = `user:${eventData.userId}`;
// Check if user already exists (idempotency)
const existingUser = await this.store.get<UserReadModel>(userId);
if (existingUser && !(await this.shouldProcessEvent(event, existingUser.revision))) {
return;
}
const userReadModel: UserReadModel = {
id: eventData.userId,
email: eventData.email,
name: eventData.name,
status: 'active',
registeredAt: event.createdAt,
lastUpdated: event.createdAt,
revision: event.streamPosition
};
await this.store.put(userId, userReadModel);
}
private async handleUserUpdated(event: ReadEvent): Promise<void> {
const eventData = event.data as any;
const userId = `user:${eventData.userId}`;
const existingUser = await this.store.get<UserReadModel>(userId);
if (!existingUser) {
console.warn(`User ${eventData.userId} not found for update`);
return;
}
if (!this.validateRevision(event, existingUser.revision)) {
return;
}
const updatedUser: UserReadModel = {
...existingUser,
email: eventData.email || existingUser.email,
name: eventData.name || existingUser.name,
lastUpdated: event.createdAt,
revision: event.streamPosition
};
await this.store.put(userId, updatedUser);
}
private async handleUserStatusChanged(event: ReadEvent): Promise<void> {
const eventData = event.data as any;
const userId = `user:${eventData.userId}`;
const existingUser = await this.store.get<UserReadModel>(userId);
if (!existingUser || !this.validateRevision(event, existingUser.revision)) {
return;
}
const updatedUser: UserReadModel = {
...existingUser,
status: eventData.status,
lastUpdated: event.createdAt,
revision: event.streamPosition
};
await this.store.put(userId, updatedUser);
}
}
// Usage with different store implementations
const inMemoryStore = new InMemoryReadModelStore();
const userProjection = new UserProjection(inMemoryStore);
// Process events
await userProjection.processEvents([userRegisteredEvent, userUpdatedEvent]);
// Query the read model
const user = await inMemoryStore.get<UserReadModel>('user:123');
console.log(user); // { id: '123', email: '[email protected]', ... }Read Model Store Implementations
The toolkit provides multiple store implementations:
In-Memory Store (for testing)
import { InMemoryReadModelStore } from '@delta-base/toolkit';
const store = new InMemoryReadModelStore();
// Supports all IReadModelStore operations
await store.put('key', { data: 'value' });
const value = await store.get('key');
const allItems = await store.getAll({ prefix: 'user:' });
// Advanced features
await store.batchPut([
{ key: 'user:1', value: { name: 'John' } },
{ key: 'user:2', value: { name: 'Jane' } }
]);
const users = await store.query({
filter: { status: 'active' }
});Cloudflare KV Store
import { KVReadModelStore } from '@delta-base/toolkit';
// In a Cloudflare Worker
const store = new KVReadModelStore(env.MY_KV_NAMESPACE);
// Supports native batch operations for performance
const users = await store.batchGet(['user:1', 'user:2', 'user:3']);
// TTL and metadata support
await store.put('session:123', sessionData, {
expirationTtl: 3600, // 1 hour
metadata: { userId: '123' }
});HTTP Store (for external APIs)
import { HttpReadModelStore } from '@delta-base/toolkit';
const store = new HttpReadModelStore('https://api.example.com/readmodels', {
'Authorization': 'Bearer token'
});
// Works with any HTTP API that follows REST conventions
await store.put('user:123', userData);
const user = await store.get('user:123');Webhook Projections
import {
createWebhookProjectionHandler,
KVReadModelStore
} from '@delta-base/toolkit';
// Create projection for webhook deployment
class UserStatsProjection extends BaseProjection {
readonly supportedEventTypes = ['UserRegisteredEvent', 'UserStatusChangedEvent'];
protected async processEvent(event: ReadEvent): Promise<void> {
const stats = await this.store.get<{ totalUsers: number; activeUsers: number }>('stats') ||
{ totalUsers: 0, activeUsers: 0 };
switch (event.type) {
case 'UserRegisteredEvent':
stats.totalUsers++;
stats.activeUsers++;
break;
case 'UserStatusChangedEvent':
const eventData = event.data as any;
if (eventData.status === 'inactive') stats.activeUsers--;
else if (eventData.status === 'active') stats.activeUsers++;
break;
}
await this.store.put('stats', stats);
}
}
// In Cloudflare Worker
const projection = new UserStatsProjection(new KVReadModelStore(env.KV_NAMESPACE));
const webhookHandler = createWebhookProjectionHandler(projection);
export default {
async fetch(request: Request): Promise<Response> {
return await webhookHandler(request);
}
};Store Capabilities and Runtime Detection
// Check store capabilities at runtime
const capabilities = store.getCapabilities();
if (capabilities.features.ttl) {
// Store supports TTL
await store.put('temp-data', data, { expirationTtl: 300 });
}
if (capabilities.features.advancedQueries) {
// Store supports complex queries
const results = await store.query({
filter: {
status: 'active',
lastLogin: { $gte: new Date('2024-01-01') }
}
});
}
console.log(`Store type: ${capabilities.storeType}`);
console.log(`Max batch size: ${capabilities.limits.maxBatchSize}`);Core Concepts
Events
Events represent something that happened in the past. They are immutable and contain:
eventId- Unique identifier (added by event store)type- Event type (e.g., "UserRegistered")streamId- Stream where the event belongs (added by event store)streamPosition- Position within the stream (added by event store)globalPosition- Global position across all streams (added by event store)data- Event payloadmetadata- Additional metadata (optional)schemaVersion- Version of the event schema (added by event store)transactionId- Transaction identifier (added by event store)createdAt- When the event occurred (added by event store)
Commands
Commands represent an intention to do something. They contain:
commandId- Unique identifier (added automatically)type- Command type (e.g., "RegisterUser")data- Command payloadmetadata- Additional metadata (optional)createdAt- When the command was created (added automatically)
Decider Pattern
The decider pattern separates:
- Decide - Given current state and a command, what events should be produced?
- Evolve - Given current state and an event, what is the new state?
- Initial State - What is the starting state?
This pattern makes business logic pure and testable.
Read Model Projections
The new projection system provides a clean, interface-based approach:
IReadModelStore Interface
- Unified API: Same interface works with in-memory, KV, HTTP, and other stores
- Flexible Operations: get, put, delete, getAll, batchGet, batchPut, query
- Store Capabilities: Runtime feature detection for optimal performance
- Multi-table Support: Optional table/namespace isolation
Projection Interface
- Event Filtering: Declare supported event types for automatic filtering
- Batch Processing: Process multiple events efficiently in order
- Type Safety: Strongly typed event handling
BaseProjection Class
- Common Patterns: Built-in revision tracking and idempotency checking
- Event Ordering: Sequential processing maintains consistency
- Error Handling: Graceful handling of out-of-order events
Benefits
- Developer Experience: Clean APIs with comprehensive TypeScript support
- Performance: Native batch operations and store-optimized queries
- Scalability: Pluggable stores from in-memory to distributed systems
- Testing: Easy to test with in-memory stores
- Production: Deploy to any platform with appropriate store implementation
Error Handling
The toolkit provides comprehensive error types with type guards:
import {
isDeltaBaseError,
isVersionConflictError,
StreamVersionConflictError
} from '@delta-base/toolkit';
try {
await handleCommand(eventStore, streamId, command, decider);
} catch (error) {
if (isVersionConflictError(error)) {
// Handle concurrency conflict
console.log('Version conflict, retrying...');
} else if (isDeltaBaseError(error)) {
// Handle other toolkit errors
console.log('Toolkit error:', error.message);
} else {
// Handle unexpected errors
console.log('Unexpected error:', error);
}
}API Reference
Core Types
Event<TType, TData, TMetadata?>- Event interfaceReadEvent<TEvent>- Event as read from event store with system fieldsCommand<TType, TData, TMetadata?>- Command interfaceStreamId- Stream identifier typeEventStore- Event persistence interface
Command Handling
handleCommand()- Handle command without decider patternhandleCommandWithDecider()- Handle command with decider patternhandleCommandWithRetry()- Handle command with automatic retryhandleCommandWithDeciderAndRetry()- Handle command with decider and retryDecider<State, Command, Event>- Decider pattern interface
Read Model Projections
IReadModelStore- Unified interface for read model storageInMemoryReadModelStore- In-memory implementation for testingKVReadModelStore- Cloudflare KV implementation with native batch operationsHttpReadModelStore- HTTP API implementation for external servicesProjection- Interface for event-driven projectionsBaseProjection- Abstract base class with common projection patternscreateWebhookProjectionHandler()- Create HTTP webhook handlers
Database
InMemoryEventStore- Full event store implementation for testing- Implements all
EventStoreinterface methods - Includes utility methods like
getAllStreamIds(),clear(), etc.
- Implements all
Error Types
All errors inherit from DeltaBaseError and include specific types for:
VersionConflictError/StreamVersionConflictError- Concurrency conflictsValidationError- Request validation failuresAuthenticationError/AuthorizationError- Auth failuresNotFoundError/StreamNotFoundError- Resource not foundTimeoutError/RateLimitError- Operational failures
Utilities
createEvent(type, data, metadata?)- Create events with type inferencecreateCommand(type, data, metadata?)- Create commands with type inferencecreateReadEvent()- Create read events for testing
Testing
The toolkit is designed for easy testing:
import {
InMemoryEventStore,
InMemoryReadModelStore,
createEvent,
createCommand,
handleCommandWithDecider,
BaseProjection,
type ReadEvent
} from '@delta-base/toolkit';
describe('User Registration with Projections', () => {
let eventStore: InMemoryEventStore;
let readModelStore: InMemoryReadModelStore;
let userProjection: UserProjection;
beforeEach(() => {
eventStore = new InMemoryEventStore();
readModelStore = new InMemoryReadModelStore();
userProjection = new UserProjection(readModelStore);
});
it('should register user and update read model', async () => {
// Given - command
const command = createCommand('RegisterUser', {
userId: '123',
email: '[email protected]',
name: 'John Doe'
});
// When - command is handled
const result = await handleCommandWithDecider(
eventStore,
'user-123',
command,
userDecider
);
// And - projection processes the events
await userProjection.processEvents(result.newEvents);
// Then - verify command result
expect(result.newState.id).toBe('123');
expect(result.newEvents).toHaveLength(1);
expect(result.newEvents[0].type).toBe('UserRegistered');
// And - verify read model
const userReadModel = await readModelStore.get('user:123');
expect(userReadModel).toEqual({
id: '123',
email: '[email protected]',
name: 'John Doe',
status: 'active',
registeredAt: expect.any(String),
lastUpdated: expect.any(String),
revision: 1
});
});
});Examples
See the /examples directory for complete working examples:
- Basic User Management - Complete user lifecycle with commands and projections
Best Practices
- Keep Business Logic Pure: Use the decider pattern to separate decisions from effects
- Design for Idempotency: Commands and projections should be safe to retry
- Version Your Events: Include schema version in event metadata
- Test with Events: Write tests that verify event streams and read models
- Handle Errors Gracefully: Use provided error types and type guards
- Choose the Right Store: Use InMemory for testing, KV for serverless, HTTP for existing APIs
- Leverage Batch Operations: Use batchGet/batchPut for better performance
- Monitor Store Capabilities: Check capabilities at runtime for optimal behavior
