npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2025 – Pkg Stats / Ryan Hefner

kylfil

v1.4.2

Published

A tiny database-agnostic Event Store

Readme

KylFil

All killer. No filler.
A tiny database-agnostic Event Store—clever by design, minimal by choice.

  • Optimistic Concurrency Control (OCC): no resource locks required.
  • Simple Functional API: for great composition and expressive code.
  • Atomic Guarantees: append multiple events in a single-transaction.
  • Storage Agnostic: supports virtually any Database or KV Store via a simple StoreProvider interface requiring only two functions: append() and read().
  • Reference Implementations: get started right away with Postgres, SQLite, MySQL (more coming soon)—or use them as a blueprint for integrating with your favourite store.
  • Lazy DB Connection & Deferred Query Execution: improves performance by avoiding premature resource allocation.
  • Flexible Querying: read events in any direction or from a specific version.
  • Immutable Operations: all functions return new objects without modifying original objects.
  • Flexible Event Payload: it's your event, put anything you want inside.

Contents

How to use:

const { append, createEvent, read, stream } = require("kylfil");
const postgres = require("kylfil/postgres")(dbConnection);

const myStream = stream ("be2ed21cad4b412f69c558510112262f") (postgres);

const sale = {
  airline: "Air Canada",
  tkt: "7712935657218",
  pax: "MRS HAZEL NUTT",
  saleDate: "2025-08-17",
};

const newEvent = createEvent ("AirTicketSold") (sale);
  
(async () => {
  const [ storedEvent ] = await append (newEvent) (myStream);

  const events = await read ({ maxCount: 5 }) (myStream);
})()

The append() function above will return an array with the newly appended event that now has a streamId, version at 0, since it's the 1st event in the stream and a sequence number seq indicating (Nth) position in the global event log:

[{
  seq: 15,
  id: "e2befa9a-141d-46fa-8f70-8fc48a7a7bbc",
  type: "AirTicketSold",
  streamId: "be2ed21cad4b412f69c558510112262f",
  version: 0,
  data: {
    airline: "AC",
    tkt: "7712935657218",
    pax: "MRS HAZEL NUTT",
    saleDate: "2025-08-17"
  }
}]

Simple API:

stream ( streamParams ) ( storeProvider )

stream :: String | StreamParams -> StoreProvider -> EventStream

Use this function to get a hold of a specific EventStream which can be supplied to other functions like append() or read() to work with events in a given stream.

Arguments:

  • streamParams argument can be either a String indicating a streamId or a StreamParams object with the following properties:

    | Name | Type | Default | Description | | ---- |:----:|:-------:| ----------- | | storageName | String | "events" | Indicates where events are stored. For RDBMS can be used as a table name. For Key-Value stores can be used as a key prefix. | | streamId | String | N/A | Unique stream ID |

  • storeProvider is an implementation of the StoreProvider interface for working with the underlying persistence storage engine.

Returns:

An instance of the EventStream function which accepts a single callback and invokes it with the following arguments:

(cb) => cb (StreamParams) (StoreProvider)

Examples:

1. Return a stream by a given ID:

stream("be2ed21cad4b412f69c558510112262f")

2. For streams that are stored in dedicated DB tables for example order_events and sales_events you can do the following:

const orderStream = stream({ 
  storageName: "order_events",
  streamId: "766b8aa93d71e3f460b0f2524e1d271c"
}) 

const saleStream = stream({ 
  storageName: "sale_events",
  streamId: "d1c61832169d7e053a96969e90f6b54b"
})

createEvent ( eventParams ) ( eventData )

createEvent :: String | EventParams -> a -> Event

This function creates events with a proper structure, suitable for appending to any stream in the event store.

Arguments:

  • eventParams can be a String indicating the eventType or an EventParams object with the following properties:

    | Name | Type | Default | Description | | ---- |:----:|:-------:| ----------- | | idGenerator | eventData -> String | randomUUID() | Function to generate event ID that receives eventData is an argument | | type | String | N/A | Event type | | version | Integer | 0 | Stream version |

  • eventData payload for the event data can be of any type.

Returns:

An Event object.

Examples:

1. Create an event representing airline ticket sale:

const event = createEvent ("AirTicketSold")({
  airline: "AC",
  tkt: "7712935657218",
  pax: "MRS HAZEL NUTT",
  saleDate: "2025-08-17"
})

// event will be:
{
  id: "6675e754-6dd9-4ac8-bfef-56532bfc9505",
  type: "AirTicketSold",
  version: 0,
  data: {
    airline: "AC",
    tkt: "7712935657218",
    pax: "MRS HAZEL NUTT",
    saleDate: "2025-08-17"
  }
}

2. Create an event at version 3 using a custom ID generator:

const event = createEvent ({
  idGenerator: ({ airline, tkt }) => `${airline}-${tkt}`,
  type: "AirTicketSold",
  version: 3
})({
  airline: "AC",
  tkt: "7712935657218",
  pax: "MRS HAZEL NUTT",
  saleDate: "2025-08-17"
})

// event will be:
{
  id: "AC-7712935657218",
  type: "AirTicketSold",
  version: 3,
  data: {
    airline: "AC",
    tkt: "7712935657218",
    pax: "MRS HAZEL NUTT",
    saleDate: "2025-08-17"
  }
}

3. Generate sequence of events for one sale:

[
  { type: "ProductSold", version: 0 },
  { type: "ReceiptPrinted", version: 1 },
  { type: "ProductShipped", version: 2 }
].map(params => createEvent(params)(sale))

4. Generate a list of a certain event type for a list of entities:

sales.map(createEvent("ProductSold"))

append ( events ) ( stream )

append :: Event | Array Event -> EventStream -> Promise Array StoredEvent

Use this function to append events to a given stream.

Arguments:

  • events can be either a single Event object or an Array of Event objects to append to a stream.
  • stream is an EventStream to append events to.

 NOTE: This function will store the events using the ID of the stream regardless of the streamId of the events. The original event objects won't be changed because the library performs all operations immutably.

Returns:

A Promise containing an array of StoredEvent objects with the following properties.

| Name | Type | Description | | ---- |:----:| ----------- | | seq | Integer | Event's position in the global event log | | id | String | Event ID | | type | String | Event type | | streamId | String | Stream ID | | version | Integer | Stream version | | data | Any | Event payload |

Throws an OccError that includes the current streamVersion if it fails due to an OCC conflict.

Examples:

1. Append two events to the stream:

const [storedEvent1, storedEvent2] = await append ([ event1, event2 ]) (myStream)

2. Append same events to multiple streams concurrently:

const results = await Promise.allSettled(
  [myStream, andYourStream].map( append ([ event1, event2 ]) )
)

read ( readParams ) ( stream )

read :: ReadParams -> EventStream -> Promise Array StoredEvent

Use this function to read events from a stream in any direction and/or from a specific version. You can also set the max number of events to return.

Arguments:

  • readParams is of type ReadParams with the following properties:

    | Name | Type | Default | Description | | ---- |:----:|:-------:| ----------- | | direction | ReadDirection | FORWARD | Indicates the direction of the read operation. Can be FORWARD or BACKWARD | | fromVersion | Integer >= 0 | 0 | Indicates an inclusive version number of the events to read from. | | maxCount | Integer > 0 | N/A | Indicates the max number of events to return from stream |

  • stream is an EventStream to read events from.

Returns:

A Promise containing an array of StoredEvent objects with the following properties.

| Name | Type | Description | | ---- |:----:| ----------- | | seq | Integer | Event's position in the global event log | | id | String | Event ID | | type | String | Event type | | streamId | String | Stream ID | | version | Integer | Stream version | | data | Any | Event payload |

⚡ TIP: Partial application of read() function creates reusable readers that can be applied to many streams

Examples:

1. Read all events from a stream:

const results = await read () (myStream)
// results will be:
[
  {
    seq: 15,
    id: "e2befa9a-141d-46fa-8f70-8fc48a7a7bbc",
    type: "AirTicketSold",
    streamId: "be2ed21cad4b412f69c558510112262f",
    version: 0,
    data: {
      airline: "AC",
      tkt: "7712935657218",
      pax: "MRS HAZEL NUTT",
      saleDate: "2025-08-17"
    }
  }
  ... // other events in this stream
]

2. Read events after version 12 (inclusive):

await read ({ fromVersion: 12 }) (myStream)

3. Read latest 5 events from the stream:

await read ({ direction: ReadDirection.BACKWARD, maxCount: 5 }) (myStream)

4. Create reusable readers via partial application to read events from different streams concurrently:

const readLast5 = read ({
  direction: ReadDirection.BACKWARD,
  maxCount: 5
})

const results = await Promise.allSettled(
  [myStream, andYourStream].map(readLast5)
)

Optimistic Concurrency Control (OCC)

Optimistic Concurrency Control prevents appending events with conflicting id or a composite key of (streamId + version). A conflict throws an OccError that includes the stream's current streamVersion. Because reference implementations for storage engines provided with this library use "multi-value" inserts, the entire append process is an atomic transaction: if one event fails to append, none will be stored. Following example shows the difference:

// no events will be stored due to OccError
await append ([ sameEvent, sameEvent ]) (myStream)

// here the first append will succeed 
await append (sameEvent) (myStream)
// the second will fail due to OccError
await append (sameEvent) (myStream)

In the following simulation of a racing condition where different events have the same version 3 only one will succeed and the other will throw OccError:

cons results = await Promise.allSettled([
  eventVersion3,
  anotherEventVersion3
].map(event => append(event)(myStream)))

results
  .map((res, index) => {
    if (res.status === 'rejected') {
      const err = res.reason
      console.error(
        "Failed appending event #", index+1,
        "at streamVersion: ", err.streamVersion,
        "due to:", err.name
      )
      // will print:
      // "Failed appending event #X at streamVersion: 3 due to: OccError"
    }
  })

Concurrency with append() and read()

  • Calling append() or read() concurrently can consume multiple DB connections, depending on how you provide these connections to the StoreProvider.
  • If you're using a StoreProvider implementation that supports a connection Pool make sure you have set a conection limit.
  • When the connection pool limit is reached, concurrent calls to append() or read() are processed sequentially.

Event Sourcing helpers

aggregate (aggregateParams) (stream)

aggregate :: AggregateParams -> EventStream -> Promise (Integer, State)

Use this function to get the current state of the entity represented by the stream. By default it reads all events in the stream to rebuild the entity state. However, if you provide a version in AggregateParams it will only read events that have occured since that version (i.e. version + 1). You can pass an evolve function to AggregateParams which reduces a list of events into a resulting entity state. By default it will use a NoFold evolve which appends each event to the state where the state is a list of events.

Arguments:

  • aggregateParams is of type AggregateParams with the following properties:

    | Name | Type | Default | Description | | ---- |:----:|:-------:| ----------- | | evolve | State -> Event -> State | NoFold | Reducer function to build the stream state | | initialState | Any | [ ] | Initial state of the entity | | version | Integer >= 0 | N/A | Current version of the entity |

  • stream is an EventStream to read events from.

Returns:

A Promise containing a Pair (Integer, State) (i.e. array) where the 1st element is an entity version and 2nd is an entity state.

Examples:

1. Aggregate events to rebuild the state of an order:

const evolve = order => event =>
  event.type === "OrderItemAdded"
    ? { ...order, items: [...order.items, event.data] }
    : event.type === "OrderItemRemoved"
      ? removeItemById(event.data.id)(order)
      : order;

const initialState = {
  id: "my-order",
  createdDate: "2025-04-01",
  items: []
}

const [version, order] = await aggregate({
  evolve,
  initialState,
  version: 0
})(stream)

// assuming events were:
// 1) OrderItemAdded: "First Item"
// 2) OrderItemAdded: "Second Item"
// 3) OrderItemRemoved: "First Item"
// version will be 3 and order will be:
{
  id: "my-order",
  createdDate: "2025-04-01",
  items: [
    { id: "item-2-id", name: "Second Item" },
  ]
}

evolveWith (evolvers) (state) (event)

evolveWith :: StrMap (State -> Event -> State) -> State -> Event -> State

This is a convenience function that helps to quickly setup an evolve function in an easy and declarative way. It allows mapping specific event types to their respective handlers using the evolvers argument. The resulting evolve function can be passed into an aggregate for handling events.

Arguments:

  • evolvers is an object that maps event types to their handlers with a signature State -> Event -> State.
  • state is a current state of the aggregate
  • event is an event to be processed

Returns:

A new evolve function with the following signature State -> Evolve -> State which produces next state by applying the event to the current state.

Examples:

1. Create evolve function for order event handling:

// create evolve function
const evolveOrder = evolveWith({
  OrderCreated: () => (event) => Order(event.data),
  OrderItemAdded: (order) => (event) => ({
    ...order,
    items: [...order.items, event.data],
  }),
  OrderItemRemoved: (order) => (event) => ({
    ...order,
    items: order.items.filter(({ id }) => id !== event.data.id),
  }),
});

const order = { id: "my-order", items: [] }
const itemAddedEvent = createEvent("OrderItemAdded")({
  id: "some-item",
  name: "Some Item"
})

const updatedOrder = evolveOrder(order)(itemAddedEvent);
// updatedOrder will be:
{
  id: "my-order",
  items: [
    { id: "some-item", name: "Some Item" }
  ]
}

// can be passed into aggregate:
await aggregate({ evolve: evolveOrder, initialState: order })

Snapshots

The aggregate function can be used for Snapshotting as follows:

  • Load the last known entity state from your "Read View".
  • Pass it as the initialState along with the version in the AggregateParams.
  • aggregate will load all the events from the stream since that version, and replay them using the evolve function producing the updated snapshot of the entity.
  • Save the new snapshot along with its version back to your "Read View".

Assuming the aggregate example above we can do the following:

const snapshot = await findSnapshotById(orderId)
const [version, order] = await aggregate({
  evolve,
  initialState: snapshot.doc,
  version: snapshot.version
})(stream)
await saveSnapshot({ id: orderId, version, doc: order })

*Since the implementation of snapshot loading/saving is highly dependent on the user's choices and the storage engine used, these details have been omitted.

StoreProvider Interface

append (streamParams) (events)

append :: StreamParams -> Array Event -> Promise Array StoredEvent

This function stores events in the underlying storage engine. It's partially applied by the library, deferring the database connection and query execution until the main append() function of the library is invoked. This helps to avoid premature resource allocation.

Arguments:

  • streamParams is of type StreamParams and supplied by the stream() function of the library.
  • events is an Array of events and provided by the append() function of the library.

Returns:

A Promise containing an array of StoredEvent objects.

read (streamParams) (readParams)

read :: StreamParams -> ReadParams -> Promise Array StoredEvent

This function retrieves events from the underlying storage engine. It's partially applied by the library deferring the database connection and query execution until the main read() function of the library is called. This helps to avoid premature resource allocation.

Arguments:

  • streamParams is of type StreamParams and supplied by the stream() function of the library.
  • readParams is of type ReadParams and provided by the read() function of the library.

Returns:

A Promise containing an array of StoredEvent objects.