@ossy/event-store
v1.8.0
Published
Ossy Event Store — Aggregate, EventStore, and MongoDB client
Downloads
4,044
Readme
@ossy/event-store
Event sourcing primitives for the Ossy platform. Provides Aggregate (stream interaction), EventStore (raw event I/O), and AggregateRebuild (startup snapshot rebuilding) on top of MongoDB.
Core concepts
The event store keeps two MongoDB collections:
eventstore— every event ever appended. Immutable. Each document is one domain event withaggregateId,aggregateType,aggregateVersion,type,payload,created, andcreatedBy.aggregates— denormalized state snapshots, rebuilt from the event stream. Used for fast queries. Rebuilt automatically at startup and kept up-to-date as new events arrive.
Aggregate
The main entry point for reading and writing event streams. All methods return Promises and are chainable with .then().
Reading the current state
import { Aggregate } from '@ossy/event-store'
import { Order } from '@acme/orders'
// Fold all events for `orderId` through Order.View() and return the result
const state = await Aggregate.Of(Order, orderId).then(Aggregate.View())Creating a new aggregate
Pass an event object as the identifier. The event is appended and the new aggregate is returned.
const newOrderEvent = { type: 'OrderPlaced', createdBy: userId, payload: { ... } }
const orderView = await Aggregate.Of(Order, newOrderEvent).then(Aggregate.View())An aggregateId inside the event is used as the aggregate's id; if absent, a nanoid() is generated.
Appending an event to an existing aggregate
await Aggregate.Of(Order, orderId)
.then(Aggregate.Add({ type: 'OrderShipped', createdBy: userId, payload: { shippedAt: Date.now() } }))
.then(Aggregate.Save())API reference
| Method | Signature | Description |
|---|---|---|
| Aggregate.Of(Root, id) | (class, string) => Promise<Aggregate> | Load an existing aggregate from the event store. |
| Aggregate.Of(Root, event) | (class, object) => Promise<Aggregate> | Create a new aggregate by appending the first event. |
| Aggregate.Add(event) | (event) => (aggregate) => Promise<Aggregate> | Append an event to the aggregate. Chainable. |
| Aggregate.Save() | () => (aggregate) => Promise<void> | Persist the current state snapshot to the aggregates collection. |
| Aggregate.View(fn?) | (fn?) => (aggregate) => state | Call the aggregate's View(events, savedState) and return the result. Pass a custom view function to override. |
| Aggregate.Validate(fn) | (fn) => (aggregate) => Promise<Aggregate> | Run a validation function against (events, savedState). Reject if the function throws. |
| Aggregate.Find(id) | (string) => Promise<object \| null> | Low-level: find the snapshot document in aggregates by id. |
| Aggregate.Collection | Collection | Direct access to the MongoDB aggregates collection. |
Writing an aggregate class
An aggregate class is a pure reducer — no side effects, no I/O. It declares a static AggregateType string and a static View(events, savedState) function.
// src/order.aggregate.js (inside an installable package)
export class Order {
static AggregateType = 'Order'
/**
* Folds an array of events into the current state.
* @param {object[]} events - Event documents in ascending version order.
* @param {object} savedState - Last saved snapshot (can be empty object).
* @returns {object}
*/
static View(events, savedState = {}) {
return events.reduce((state, event) => {
switch (event.type) {
case 'OrderPlaced':
return { ...state, id: event.aggregateId, status: 'pending', ...event.payload }
case 'OrderShipped':
return { ...state, status: 'shipped', shippedAt: event.created }
case 'OrderCancelled':
return { ...state, status: 'cancelled' }
default:
return state
}
}, savedState)
}
}
// Required exports for the *.aggregate.js primitive
export { Order as Aggregate }
export const id = 'Order' // must equal Order.AggregateTypeAggregate files must be in an installable package (a
node_modules/package with"ossy": { "src": "./src" }inpackage.json). The platform never loads aggregates from the local app'ssrc/to avoid duplicates.
EventStore
Low-level access to the eventstore collection. Prefer using Aggregate for most work.
import { EventStore } from '@ossy/event-store'| Method | Signature | Description |
|---|---|---|
| EventStore.AppendEvent(event) | (object) => Promise<object> | Insert an event document. Rejects on failure. |
| EventStore.GetEventStream({ aggregateId, fromVersion? }) | => Promise<object[]> | Return all events for an aggregate id, optionally after a version. |
| EventStore.GetEventStreams() | => Promise<{ aggregateId, aggregateType }[]> | Return all known (aggregateId, aggregateType) pairs. |
| EventStore.FindEvent(query) | (MongoQuery) => Promise<object> | Find a single event. Rejects when not found. |
| EventStore.FindEvents(query) | (MongoQuery) => Promise<object[]> | Find multiple events. Rejects when none found. |
| EventStore.Aggregate(pipeline) | (Pipeline) => Promise<object[]> | Run a MongoDB aggregation pipeline against the event store. |
| EventStore.Collection | Collection | Direct access to the MongoDB eventstore collection. |
AggregateRebuild
Rebuilds state snapshots from scratch at startup. Called automatically by @ossy/platform — you rarely need to use this directly.
import { AggregateRebuild } from '@ossy/event-store'
// Register an aggregate class (done automatically by the platform)
AggregateRebuild.registerAggregate({ id: 'Order', Aggregate: Order })
// Rebuild and save every known aggregate of every registered type
await AggregateRebuild.BuildAndSaveAll()
// Rebuild a single aggregate
await AggregateRebuild.BuildAndSave('Order', orderId)*.aggregate.js primitive
The platform auto-discovers aggregate files from installed packages. See PRIMITIVES.md for the full specification.
MongoDB setup
Connect to MongoDB before using any Aggregate or EventStore methods:
import { Mongo } from '@ossy/event-store'
await Mongo.connect(process.env.DB_URL)The platform server calls Mongo.connect automatically when DB_URL is present in the environment.
Indexes recommended for the eventstore collection:
{ aggregateId: 1, aggregateVersion: 1 } // stream queries
{ aggregateType: 1 } // type-level queries