kylfil
v1.4.2
Published
A tiny database-agnostic Event Store
Maintainers
Readme
KylFil
All killer. No filler.
A tiny database-agnostic Event Store—clever by design, minimal by choice.
- Optimistic Concurrency Control (OCC): no resource locks required.
- Simple Functional API: for great composition and expressive code.
- Atomic Guarantees: append multiple events in a single-transaction.
- Storage Agnostic: supports virtually any Database or KV Store via a simple
StoreProviderinterface requiring only two functions:append()andread(). - Reference Implementations: get started right away with Postgres, SQLite, MySQL (more coming soon)—or use them as a blueprint for integrating with your favourite store.
- Lazy DB Connection & Deferred Query Execution: improves performance by avoiding premature resource allocation.
- Flexible Querying: read events in any direction or from a specific version.
- Immutable Operations: all functions return new objects without modifying original objects.
- Flexible Event Payload: it's your event, put anything you want inside.
Contents
- How to use
- Simple API
- Optimistic Concurrency Control (OCC)
- Concurrency with append() and read()
- Event Sourcing helpers
- Snapshots
- StoreProvider Interface
How to use:
const { append, createEvent, read, stream } = require("kylfil");
const postgres = require("kylfil/postgres")(dbConnection);
const myStream = stream ("be2ed21cad4b412f69c558510112262f") (postgres);
const sale = {
airline: "Air Canada",
tkt: "7712935657218",
pax: "MRS HAZEL NUTT",
saleDate: "2025-08-17",
};
const newEvent = createEvent ("AirTicketSold") (sale);
(async () => {
const [ storedEvent ] = await append (newEvent) (myStream);
const events = await read ({ maxCount: 5 }) (myStream);
})()The append() function above will return an array with the newly appended event that now has a streamId, version at 0, since it's the 1st event in the stream and a sequence number seq indicating (Nth) position in the global event log:
[{
seq: 15,
id: "e2befa9a-141d-46fa-8f70-8fc48a7a7bbc",
type: "AirTicketSold",
streamId: "be2ed21cad4b412f69c558510112262f",
version: 0,
data: {
airline: "AC",
tkt: "7712935657218",
pax: "MRS HAZEL NUTT",
saleDate: "2025-08-17"
}
}]Simple API:
stream ( streamParams ) ( storeProvider )
stream :: String | StreamParams -> StoreProvider -> EventStream
Use this function to get a hold of a specific EventStream which can be supplied to other functions like append() or read() to work with events in a given stream.
Arguments:
streamParams argument can be either a
Stringindicating astreamIdor aStreamParamsobject with the following properties:| Name | Type | Default | Description | | ---- |:----:|:-------:| ----------- | |
storageName| String | "events" | Indicates where events are stored. For RDBMS can be used as a table name. For Key-Value stores can be used as a key prefix. | |streamId| String | N/A | Unique stream ID |storeProvider is an implementation of the
StoreProviderinterface for working with the underlying persistence storage engine.
Returns:
An instance of the EventStream function which accepts a single callback and invokes it with the following arguments:
(cb) => cb (StreamParams) (StoreProvider)Examples:
1. Return a stream by a given ID:
stream("be2ed21cad4b412f69c558510112262f")2. For streams that are stored in dedicated DB tables for example order_events and sales_events you can do the following:
const orderStream = stream({
storageName: "order_events",
streamId: "766b8aa93d71e3f460b0f2524e1d271c"
})
const saleStream = stream({
storageName: "sale_events",
streamId: "d1c61832169d7e053a96969e90f6b54b"
})createEvent ( eventParams ) ( eventData )
createEvent :: String | EventParams -> a -> Event
This function creates events with a proper structure, suitable for appending to any stream in the event store.
Arguments:
eventParams can be a
Stringindicating theeventTypeor anEventParamsobject with the following properties:| Name | Type | Default | Description | | ---- |:----:|:-------:| ----------- | |
idGenerator| eventData -> String | randomUUID() | Function to generate event ID that receiveseventDatais an argument | |type| String | N/A | Event type | |version| Integer |0| Stream version |eventData payload for the event data can be of any type.
Returns:
An Event object.
Examples:
1. Create an event representing airline ticket sale:
const event = createEvent ("AirTicketSold")({
airline: "AC",
tkt: "7712935657218",
pax: "MRS HAZEL NUTT",
saleDate: "2025-08-17"
})
// event will be:
{
id: "6675e754-6dd9-4ac8-bfef-56532bfc9505",
type: "AirTicketSold",
version: 0,
data: {
airline: "AC",
tkt: "7712935657218",
pax: "MRS HAZEL NUTT",
saleDate: "2025-08-17"
}
}2. Create an event at version 3 using a custom ID generator:
const event = createEvent ({
idGenerator: ({ airline, tkt }) => `${airline}-${tkt}`,
type: "AirTicketSold",
version: 3
})({
airline: "AC",
tkt: "7712935657218",
pax: "MRS HAZEL NUTT",
saleDate: "2025-08-17"
})
// event will be:
{
id: "AC-7712935657218",
type: "AirTicketSold",
version: 3,
data: {
airline: "AC",
tkt: "7712935657218",
pax: "MRS HAZEL NUTT",
saleDate: "2025-08-17"
}
}3. Generate sequence of events for one sale:
[
{ type: "ProductSold", version: 0 },
{ type: "ReceiptPrinted", version: 1 },
{ type: "ProductShipped", version: 2 }
].map(params => createEvent(params)(sale))
4. Generate a list of a certain event type for a list of entities:
sales.map(createEvent("ProductSold"))append ( events ) ( stream )
append :: Event | Array Event -> EventStream -> Promise Array StoredEvent
Use this function to append events to a given stream.
Arguments:
- events can be either a single
Eventobject or anArrayofEventobjects to append to a stream. - stream is an
EventStreamto append events to.
NOTE: This function will store the events using the ID of the stream regardless of the
streamIdof the events. The original event objects won't be changed because the library performs all operations immutably.
Returns:
A Promise containing an array of StoredEvent objects with the following properties.
| Name | Type | Description |
| ---- |:----:| ----------- |
| seq | Integer | Event's position in the global event log |
| id | String | Event ID |
| type | String | Event type |
| streamId | String | Stream ID |
| version | Integer | Stream version |
| data | Any | Event payload |
Throws an OccError that includes the current streamVersion if it fails due to an OCC conflict.
Examples:
1. Append two events to the stream:
const [storedEvent1, storedEvent2] = await append ([ event1, event2 ]) (myStream)2. Append same events to multiple streams concurrently:
const results = await Promise.allSettled(
[myStream, andYourStream].map( append ([ event1, event2 ]) )
)read ( readParams ) ( stream )
read :: ReadParams -> EventStream -> Promise Array StoredEvent
Use this function to read events from a stream in any direction and/or from a specific version. You can also set the max number of events to return.
Arguments:
readParams is of type
ReadParamswith the following properties:| Name | Type | Default | Description | | ---- |:----:|:-------:| ----------- | |
direction| ReadDirection | FORWARD | Indicates the direction of the read operation. Can beFORWARDorBACKWARD| |fromVersion| Integer >=0|0| Indicates an inclusive version number of the events to read from. | |maxCount| Integer >0| N/A | Indicates the max number of events to return from stream |stream is an
EventStreamto read events from.
Returns:
A Promise containing an array of StoredEvent objects with the following properties.
| Name | Type | Description |
| ---- |:----:| ----------- |
| seq | Integer | Event's position in the global event log |
| id | String | Event ID |
| type | String | Event type |
| streamId | String | Stream ID |
| version | Integer | Stream version |
| data | Any | Event payload |
⚡ TIP: Partial application of
read()function creates reusable readers that can be applied to many streams
Examples:
1. Read all events from a stream:
const results = await read () (myStream)
// results will be:
[
{
seq: 15,
id: "e2befa9a-141d-46fa-8f70-8fc48a7a7bbc",
type: "AirTicketSold",
streamId: "be2ed21cad4b412f69c558510112262f",
version: 0,
data: {
airline: "AC",
tkt: "7712935657218",
pax: "MRS HAZEL NUTT",
saleDate: "2025-08-17"
}
}
... // other events in this stream
]2. Read events after version 12 (inclusive):
await read ({ fromVersion: 12 }) (myStream)3. Read latest 5 events from the stream:
await read ({ direction: ReadDirection.BACKWARD, maxCount: 5 }) (myStream)4. Create reusable readers via partial application to read events from different streams concurrently:
const readLast5 = read ({
direction: ReadDirection.BACKWARD,
maxCount: 5
})
const results = await Promise.allSettled(
[myStream, andYourStream].map(readLast5)
)Optimistic Concurrency Control (OCC)
Optimistic Concurrency Control prevents appending events with conflicting id or a composite key of (streamId + version). A conflict throws an OccError that includes the stream's current streamVersion. Because reference implementations for storage engines provided with this library use "multi-value" inserts, the entire append process is an atomic transaction: if one event fails to append, none will be stored. Following example shows the difference:
// no events will be stored due to OccError
await append ([ sameEvent, sameEvent ]) (myStream)
// here the first append will succeed
await append (sameEvent) (myStream)
// the second will fail due to OccError
await append (sameEvent) (myStream)In the following simulation of a racing condition where different events have the same version 3 only one will succeed and the other will throw OccError:
cons results = await Promise.allSettled([
eventVersion3,
anotherEventVersion3
].map(event => append(event)(myStream)))
results
.map((res, index) => {
if (res.status === 'rejected') {
const err = res.reason
console.error(
"Failed appending event #", index+1,
"at streamVersion: ", err.streamVersion,
"due to:", err.name
)
// will print:
// "Failed appending event #X at streamVersion: 3 due to: OccError"
}
})Concurrency with append() and read()
- Calling
append()orread()concurrently can consume multiple DB connections, depending on how you provide these connections to theStoreProvider. - If you're using a
StoreProviderimplementation that supports a connectionPoolmake sure you have set a conection limit. - When the connection pool limit is reached, concurrent calls to
append()orread()are processed sequentially.
Event Sourcing helpers
aggregate (aggregateParams) (stream)
aggregate :: AggregateParams -> EventStream -> Promise (Integer, State)
Use this function to get the current state of the entity represented by the stream. By default it reads all events in the stream to rebuild the entity state. However, if you provide a version in AggregateParams it will only read events that have occured since that version (i.e. version + 1). You can pass an evolve function to AggregateParams which reduces a list of events into a resulting entity state. By default it will use a NoFold evolve which appends each event to the state where the state is a list of events.
Arguments:
aggregateParams is of type
AggregateParamswith the following properties:| Name | Type | Default | Description | | ---- |:----:|:-------:| ----------- | |
evolve| State -> Event -> State | NoFold | Reducer function to build the stream state | |initialState| Any | [ ] | Initial state of the entity | |version| Integer >=0| N/A | Current version of the entity |stream is an
EventStreamto read events from.
Returns:
A Promise containing a Pair (Integer, State) (i.e. array) where the 1st element is an entity version and 2nd is an entity state.
Examples:
1. Aggregate events to rebuild the state of an order:
const evolve = order => event =>
event.type === "OrderItemAdded"
? { ...order, items: [...order.items, event.data] }
: event.type === "OrderItemRemoved"
? removeItemById(event.data.id)(order)
: order;
const initialState = {
id: "my-order",
createdDate: "2025-04-01",
items: []
}
const [version, order] = await aggregate({
evolve,
initialState,
version: 0
})(stream)
// assuming events were:
// 1) OrderItemAdded: "First Item"
// 2) OrderItemAdded: "Second Item"
// 3) OrderItemRemoved: "First Item"
// version will be 3 and order will be:
{
id: "my-order",
createdDate: "2025-04-01",
items: [
{ id: "item-2-id", name: "Second Item" },
]
}evolveWith (evolvers) (state) (event)
evolveWith :: StrMap (State -> Event -> State) -> State -> Event -> State
This is a convenience function that helps to quickly setup an evolve function in an easy and declarative way. It allows mapping specific event types to their respective handlers using the evolvers argument. The resulting evolve function can be passed into an aggregate for handling events.
Arguments:
- evolvers is an object that maps event types to their handlers with a signature
State -> Event -> State. - state is a current state of the aggregate
- event is an event to be processed
Returns:
A new evolve function with the following signature State -> Evolve -> State which produces next state by applying the event to the current state.
Examples:
1. Create evolve function for order event handling:
// create evolve function
const evolveOrder = evolveWith({
OrderCreated: () => (event) => Order(event.data),
OrderItemAdded: (order) => (event) => ({
...order,
items: [...order.items, event.data],
}),
OrderItemRemoved: (order) => (event) => ({
...order,
items: order.items.filter(({ id }) => id !== event.data.id),
}),
});
const order = { id: "my-order", items: [] }
const itemAddedEvent = createEvent("OrderItemAdded")({
id: "some-item",
name: "Some Item"
})
const updatedOrder = evolveOrder(order)(itemAddedEvent);
// updatedOrder will be:
{
id: "my-order",
items: [
{ id: "some-item", name: "Some Item" }
]
}
// can be passed into aggregate:
await aggregate({ evolve: evolveOrder, initialState: order })Snapshots
The aggregate function can be used for Snapshotting as follows:
- Load the last known entity state from your "Read View".
- Pass it as the
initialStatealong with theversionin theAggregateParams. aggregatewill load all the events from the stream since that version, and replay them using theevolvefunction producing the updated snapshot of the entity.- Save the new snapshot along with its version back to your "Read View".
Assuming the aggregate example above we can do the following:
const snapshot = await findSnapshotById(orderId)
const [version, order] = await aggregate({
evolve,
initialState: snapshot.doc,
version: snapshot.version
})(stream)
await saveSnapshot({ id: orderId, version, doc: order })*Since the implementation of snapshot loading/saving is highly dependent on the user's choices and the storage engine used, these details have been omitted.
StoreProvider Interface
append (streamParams) (events)
append :: StreamParams -> Array Event -> Promise Array StoredEvent
This function stores events in the underlying storage engine. It's partially applied by the library, deferring the database connection and query execution until the main append() function of the library is invoked. This helps to avoid premature resource allocation.
Arguments:
- streamParams is of type
StreamParamsand supplied by thestream()function of the library. - events is an
Arrayof events and provided by theappend()function of the library.
Returns:
A Promise containing an array of StoredEvent objects.
read (streamParams) (readParams)
read :: StreamParams -> ReadParams -> Promise Array StoredEvent
This function retrieves events from the underlying storage engine. It's partially applied by the library deferring the database connection and query execution until the main read() function of the library is called. This helps to avoid premature resource allocation.
Arguments:
- streamParams is of type
StreamParamsand supplied by thestream()function of the library. - readParams is of type
ReadParamsand provided by theread()function of the library.
Returns:
A Promise containing an array of StoredEvent objects.
