@pickle-packs/journaling
v1.0.0
Published
Support for journals
Readme
Journaling
Append only journaling for application state. Every change is an entry. Current state is the result of replaying entries, optionally starting from a snapshot. Storage is not included. You plug in persistence.
Why use this
- Full history for audits and debugging
- Time travel by replaying to any point
- Deterministic rebuilds for reproducibility
- Safer writes since updates are append only
- Easy evolution by adding new entry types
- Fast loads with periodic snapshots
- Portable because you bring your own storage
Core model
- Entry: a small fact with an
entryType. - Snapshot: a point-in-time materialized state with an
entryNumber. - Journal: identity, state, ordered entries, and handler maps.
- Handlers: pure functions that apply an entry or a snapshot to state.
- Numbers and IDs: strong brands for
EntryNumber,EntryType, andJournalId. - Outcomes: all operations return
Outcome<T>to signal success or failure.
Quick start
- Define your entry types and validation.
- Implement line entry handlers that update state from an entry.
- Optionally implement snapshot handlers that restore state from a snapshot.
- Provide two persistence functions:
loadEntriesandsaveEntries. - Load a journal, apply entries, then save.
API surface
applyLineEntry(journal, entryOrEntries) -> Outcome<Journal>
- Applies one or many entries in order.
- Increments
effectiveEntryNumberfor each applied entry. - Appends the entry to
journal.lineEntries. - Uses the handler mapped by
entry.entryType. - Failure when no handler exists.
loadJournal(id, initialState, lineEntryHandlers, snapshotEntryHandlers, snapshotEntryInterval, loadEntries) -> Promise<Outcome<Journal>>
- Calls your
loadEntriesto fetch{ lineEntries, maybeSnapshotEntry }. - If a snapshot exists, restores from it, then replays remaining entries.
- Returns a journal with empty
lineEntriesready for new work.
saveJournal(journal, createSnapshot, saveEntries) -> Promise<Outcome<Journal>>
- If
effectiveEntryNumber - basisEntryNumber > snapshotEntryInterval, callscreateSnapshot(journal)and includes it inEntries. - Calls your
saveEntries. - On success, advances
basisEntryNumbertoeffectiveEntryNumberand clearslineEntries.
Persistence contracts
type Entries<TState> = { lineEntries: Array<ILineEntry>, maybeSnapshotEntry: Maybe<ISnapshotEntry<TState>> }
type LoadEntries<TState> = (id: JournalId) => Promise<Entries<TState>>
type SaveEntries<TState> = (id: JournalId, entries: Entries<TState>, state: TState) => Promise<number>
You decide where and how to store data. Files, databases, object stores, or anything else are valid.
Snapshotting
Use SnapshotEntryInterval to bound rebuild time. When the interval is exceeded, saveJournal requests a snapshot via your createSnapshot function. Snapshots keep loads fast while the journal stays append only.
Failures
ENTRY_HANDLER_NOT_SPECIFIEDLOAD_JOURNAL_FAILURESAVE_JOURNAL_FAILURE
Failures carry a detail message and an optional maybeError.
Design notes
- Pure, functional handlers
- Append only updates
- Deterministic projections
- In memory engine with user supplied persistence
- Strongly branded identifiers to avoid mixups
Counter Journal Example
A minimal step by step guide that mirrors the CounterJournal test module. This explains how to implement a simple journal using this library.
1) Define your line entry and entry type
const counterValueAddedV1 = "acme.counter-value-added.v1";
type CounterValue = number & { readonly __brand: "CounterValue" };
interface ICounterValueAddedV1LineEntry extends ILineEntry {
readonly entryType: EntryType; // should be counterValueAddedV1
readonly value: CounterValue;
}2) Define the handler that mutates state when the entry is processed
Handlers are pure. They receive current state and a line entry. They return the next state. This is where state is mutated as entries are replayed.
type CounterState = Readonly<{
average: CounterValue;
maximum: CounterValue;
minimum: CounterValue;
valueCount: number;
}>;
function handleCounterValueAddedV1(
state: Readonly<CounterState>,
entryLike: Readonly<ILineEntry>
): CounterState {
const entry = entryLike as ICounterValueAddedV1LineEntry;
const nextCount = state.valueCount + 1;
return {
average: Math.round(((state.average * state.valueCount) + entry.value) / nextCount) as CounterValue,
maximum: entry.value > state.maximum ? entry.value : state.maximum,
minimum: entry.value < state.minimum ? entry.value : state.minimum,
valueCount: nextCount
};
}Register the handler:
const lineEntryHandlers: Record<EntryType, LineEntryHandler<CounterState>> = {
[counterValueAddedV1]: handleCounterValueAddedV1
};3) Expose an action that decides whether to append an entry
This is the action boundary. It runs business rules against the current state to decide if an entry should be appended. It returns Outcome<Journal> and never mutates state directly. Invalid actions return a failure outcome.
function addValue(
journal: CounterJournal,
value: CounterValue
): Outcome<CounterJournal> {
// Business rules first
if (value < 0) {
return failure({
code: "NEGATIVE_COUNTER_VALUE",
detail: "Value must be non-negative",
maybeError: none
});
}
// Build the entry
const lineEntry: ICounterValueAddedV1LineEntry = {
entryType: counterValueAddedV1 as EntryType,
value
};
// Optional validation layer can run here before append
// Append via applyLineEntry which will call the handler
return pipe(
success(journal),
(j) => applyLineEntry<CounterState, CounterJournal, ILineEntry>(j, lineEntry)
);
}4) Snapshots
A snapshot is a persisted aggregate used as a starting point to reduce load time when journals grow large. Choose an interval that keeps hydration within your target. Smaller is not always better. Tune based on acceptable load time.
Define a snapshot type and interval, and a function to create a snapshot from a journal:
const counterJournalSnapshotV1 = "acme.counter-journal-snapshot.v1";
const snapshotInterval: SnapshotEntryInterval = 5 as SnapshotEntryInterval;
function createSnapshot(j: Journal<CounterState>): ISnapshotEntry<CounterState> {
return {
entryNumber: j.effectiveEntryNumber,
entryType: counterJournalSnapshotV1 as EntryType,
state: j.state
};
}
const snapshotEntryHandlers: Record<EntryType, SnapshotEntryHandler<CounterState, ISnapshotEntry<CounterState>>> = {
[counterJournalSnapshotV1 as EntryType]: (snap) => snap.state
};Snapshots are requested during saveJournal only when the interval is exceeded. Only one snapshot is included per save.
5) Persistence hooks
You provide two functions. The persistence mechanism must preserve ordering and should assign monotonically increasing entry numbers. The library replays in the order returned.
async function loadEntries(id: JournalId): Promise<Entries<CounterState>> {
// Choose the latest snapshot
const latestSnapshot = maybe(
[...(snapshotEntryStore.get(id) ?? [])]
.sort((a, b) => b.entryNumber - a.entryNumber)[0]
);
// Fetch entries after the snapshot, or all if none
const lineEntries = match(
latestSnapshot,
(snap) => [...(lineEntryStore.get(id) ?? [])].slice(snap.entryNumber),
() => [...(lineEntryStore.get(id) ?? [])]
);
return {
lineEntries,
maybeSnapshotEntry: latestSnapshot
};
}
async function saveEntries(
id: JournalId,
entries: Entries<CounterState>,
_state: Readonly<CounterState>
): Promise<number> {
// Persist optional snapshot
const snapshots = match(
entries.maybeSnapshotEntry,
(snap) => [ ...(snapshotEntryStore.get(id) ?? []), snap ],
[]
);
snapshotEntryStore.set(id, snapshots);
// Append line entries in order
const prior = lineEntryStore.get(id) ?? [];
lineEntryStore.set(id, [ ...prior, ...entries.lineEntries ]);
return Promise.resolve(1 + entries.lineEntries.length);
}6) Wiring load and save
const initialCounterState: CounterState = {
average: 0 as CounterValue,
maximum: 0 as CounterValue,
minimum: 99999 as CounterValue,
valueCount: 0
};
export type CounterJournal = Journal<CounterState> & { readonly __brand: "CounterJournal" };
async function load(id: JournalId): Promise<Outcome<CounterJournal>> {
return loadJournal<CounterState, CounterJournal>(
id,
initialCounterState,
lineEntryHandlers,
snapshotEntryHandlers,
snapshotInterval,
loadEntries
);
}
async function save(journal: CounterJournal): Promise<Outcome<CounterJournal>> {
return saveJournal<CounterState, CounterJournal>(
journal,
createSnapshot,
saveEntries
);
}7) Multiple handlers and forward only change
Handler maps support many entry types. Use versions like v1, v2, v3 in the type name to evolve behavior without rewriting history. New behavior is introduced by new entry types. Old entries remain valid. This supports forward only change and backward compatibility.
const handlers = {
"acme.counter-value-added.v1": handleCounterValueAddedV1,
// add future types here
};You now have a complete loop:
- An action function decides whether to append an entry.
applyLineEntryrecords the entry and mutates state via the handler.saveJournalpersists entries and optionally a snapshot depending on the interval.loadJournalrestores from an optional snapshot and replays entries in order.
