@l-etabli/events
v0.8.1
Published
The purpose of this repository is to make it easy to setup event driven architecture using outbox pattern
Downloads
654
Readme
@l-etabli/events
TypeScript library for event-driven architecture with the outbox pattern.
Events are persisted in the same transaction as your domain changes, then published asynchronously with retry support.
Installation
bun add @l-etabli/eventsFor Kysely/PostgreSQL:
bun add @l-etabli/events kysely pgRecommended API
The recommended way to model events is now a single canonical definitions object.
import {
defineEvent,
defineEvents,
type InferEventsFromDefinitions,
} from "@l-etabli/events";
type Project = { id: string; name: string };
type ProjectContext = { projectId: string };
type ProjectMemberContext = { projectId: string; memberId: string };
type ProjectRole = "admin" | "editor";
const eventDefinitions = defineEvents({
ProjectCreated: defineEvent<{ project: Project }, ProjectContext>({
priority: 1,
}),
ProjectUpdated: defineEvent<{ project: Project }, ProjectContext>(),
UserAddedToProject: defineEvent<
{ projectId: string; userId: string; role: ProjectRole },
ProjectMemberContext
>({
priority: 20,
}),
PingSent: defineEvent<{ at: Date }>(),
});
type AppEvents = InferEventsFromDefinitions<typeof eventDefinitions>;Benefits:
- one object to maintain
- topics derived from object keys
prioritylives in the same canonical definition- no handwritten
GenericEvent<...> | ...union in the app
Quick Start
1. Setup infrastructure
import {
createUserActor,
createWorkerActor,
createEventCrawler,
createInMemoryEventBus,
createInMemoryEventRepositoryAndQueries,
} from "@l-etabli/events";
const { eventQueries, withUow } =
createInMemoryEventRepositoryAndQueries<AppEvents>();
const { eventBus, createNewEvent } = createInMemoryEventBus(withUow, {
eventDefinitions,
});
const crawler = createEventCrawler({
withUow,
eventQueries,
eventBus,
});2. Subscribe to events
eventBus.subscribe({
topic: "ProjectCreated",
subscriptionId: "send-project-created-email",
callBack: async (event) => {
await emailService.send(event.payload.project.id);
},
});3. Emit events
await withUow(async (uow) => {
await projectRepository.save(project);
await uow.eventRepository.saveNewEventsBatch([
createNewEvent({
topic: "ProjectCreated",
payload: { project },
context: { projectId: project.id },
triggeredByActor: createUserActor(currentUserId),
flowId: requestId,
}),
]);
});4. Process events
Traditional server:
crawler.start();Serverless:
await withUow(
async (uow) => {
await uow.eventRepository.saveNewEventsBatch([event]);
},
{
afterCommit: async () => {
await crawler.triggerProcessing();
},
},
);Event shape
GenericEvent remains the base type.
type GenericEvent<Topic, Payload, Context = undefined> = {
id: EventId;
occurredAt: Date;
topic: Topic;
payload: Payload;
status: EventStatus;
publications: EventPublication[];
triggeredByActor: Actor;
flowId?: string;
causedByEventId?: EventId;
priority?: number;
context?: Context;
};
type Actor =
| UserActor
| SystemActor
| WorkerActor
| ApiKeyActor
| AnonymousActor;
type UserActor<Id extends string = string> = { kind: "user"; id: Id };
type SystemActor = { kind: "system" };
type WorkerActor<Id extends string = string> = { kind: "worker"; id?: Id };
type ApiKeyActor<Id extends string = string> = { kind: "api-key"; id: Id };
type AnonymousActor = { kind: "anonymous" };Rules:
payloadis always requiredcontextis only required at the call site for topics that declare onepriorityis injected automatically fromeventDefinitionswhen provided
Creating events
From event definitions
import { createUserActor, makeCreateNewEvent } from "@l-etabli/events";
const createNewEvent = makeCreateNewEvent({
eventDefinitions,
});
createNewEvent({
topic: "ProjectCreated",
payload: { project },
context: { projectId: project.id },
triggeredByActor: createUserActor(currentUserId),
});
createNewEvent({
topic: "PingSent",
payload: { at: new Date() },
triggeredByActor: createWorkerActor("nightly-sync"),
});From an existing union
The previous union-based approach still works.
import { type GenericEvent, makeCreateNewEvent } from "@l-etabli/events";
type LegacyEvents =
| GenericEvent<"UserCreated", { userId: string; email: string }>
| GenericEvent<"OrderPlaced", { orderId: string }, { tenantId: string }>;
const createNewEvent = makeCreateNewEvent<LegacyEvents>();Event lifecycle
never-published -> in-process -> published
\-> failed-but-will-retry -> published
\-> quarantinedStatuses:
never-publishedto-republishin-processpublishedfailed-but-will-retryquarantined
Kysely migration helper
import type { Kysely } from "kysely";
export async function up(db: Kysely<unknown>): Promise<void> {
await db.schema
.createTable("events")
.addColumn("id", "text", (col) => col.primaryKey())
.addColumn("topic", "text", (col) => col.notNull())
.addColumn("payload", "jsonb", (col) => col.notNull())
.addColumn("context", "jsonb")
.addColumn("status", "text", (col) => col.notNull())
.addColumn("triggeredByActor", "jsonb", (col) => col.notNull())
.addColumn("flowId", "text")
.addColumn("causedByEventId", "text")
.addColumn("occurredAt", "timestamptz", (col) => col.notNull())
.addColumn("publications", "jsonb", (col) => col.notNull().defaultTo("[]"))
.addColumn("priority", "integer")
.execute();
}Effect v4 support
Effect-native ports and adapters are available via dedicated subpath exports. Non-Effect users are not impacted — effect is an optional peer dependency.
bun add @l-etabli/events effectSetup
import { Effect } from "effect";
import type { InferEventsFromDefinitions } from "@l-etabli/events";
import {
createEffectInMemoryEventBus,
createEffectInMemoryEventRepositoryAndQueries,
createEffectEventCrawler,
} from "@l-etabli/events/effect";
const { eventQueries, withUow } =
createEffectInMemoryEventRepositoryAndQueries<AppEvents>();
const { eventBus, createNewEvent } = createEffectInMemoryEventBus(withUow, {
eventDefinitions,
});
const crawler = createEffectEventCrawler({
withUow,
eventQueries,
eventBus,
});Subscribe
eventBus.subscribe({
topic: "ProjectCreated",
subscriptionId: "send-project-created-email",
callBack: (event) =>
Effect.promise(() => emailService.send(event.payload.project.id)),
});Emit
await Effect.runPromise(
withUow((uow) =>
uow.eventRepository.saveNewEventsBatch([
createNewEvent({
topic: "ProjectCreated",
payload: { project },
context: { projectId: project.id },
triggeredByActor: createUserActor(currentUserId),
}),
]),
),
);Process
// Traditional server
crawler.start();
// Serverless / on-demand
await Effect.runPromise(crawler.triggerProcessing());Kysely adapters
For PostgreSQL with Effect-native Kysely:
import {
createEffectKyselyEventRepository,
createEffectKyselyEventQueries,
} from "@l-etabli/events/effect-kysely";
const eventRepository = createEffectKyselyEventRepository<AppEvents, Db>(db);
const eventQueries = createEffectKyselyEventQueries<AppEvents, Db>(db);These adapters wrap Kysely queries with Effect.promise() — they work with any standard Kysely instance.
Subpath exports
| Import path | What | Requires |
|---|---|---|
| @l-etabli/events | Promise-based ports, in-memory adapters, crawler | — |
| @l-etabli/events/kysely | Promise-based Kysely adapters | kysely |
| @l-etabli/events/effect | Effect-native ports, in-memory adapters, crawler | effect |
| @l-etabli/events/effect-kysely | Effect-native Kysely adapters | effect, kysely |
Examples
See examples/ for Kysely integration, cascading events and serverless usage.
