@l-etabli/events
v0.15.0
Published
The purpose of this repository is to make it easy to setup event driven architecture using outbox pattern
Readme
@l-etabli/events
TypeScript library for event-driven architecture with the outbox pattern.
Events are persisted in the same transaction as your domain changes, then published asynchronously with retry support.
Installation
bun add @l-etabli/eventsFor Kysely/PostgreSQL:
bun add @l-etabli/events kysely pgRecommended API
The recommended way to model events is now a single canonical definitions object.
import {
defineEvent,
defineEvents,
type InferEventsFromDefinitions,
} from "@l-etabli/events";
type Project = { id: string; name: string };
type ProjectContext = { projectId: string };
type ProjectMemberContext = { projectId: string; memberId: string };
type ProjectRole = "admin" | "editor";
const eventDefinitions = defineEvents({
ProjectCreated: defineEvent<{ project: Project }, ProjectContext>({
priority: 1,
}),
ProjectUpdated: defineEvent<{ project: Project }, ProjectContext>(),
UserAddedToProject: defineEvent<
{ projectId: string; userId: string; role: ProjectRole },
ProjectMemberContext
>({
priority: 20,
}),
PingSent: defineEvent<{ at: Date }>(),
});
type AppEvents = InferEventsFromDefinitions<typeof eventDefinitions>;Benefits:
- one object to maintain
- topics derived from object keys
prioritylives in the same canonical definition- no handwritten
GenericEvent<...> | ...union in the app
Quick Start
1. Setup infrastructure
import {
createUserActor,
createWorkerActor,
createEventCrawler,
createInMemoryEventBus,
createInMemoryEventRepositoryAndQueries,
} from "@l-etabli/events";
const { eventQueries, withUow } =
createInMemoryEventRepositoryAndQueries<AppEvents>();
const { eventBus, createNewEvent } = createInMemoryEventBus({
eventDefinitions,
});
const crawler = createEventCrawler({
withUow,
eventQueries,
eventBus,
});2. Subscribe to events
eventBus.subscribe({
topic: "ProjectCreated",
subscriptionId: "send-project-created-email",
callBack: async (event) => {
await emailService.send(event.payload.project.id);
},
});3. Emit events
await withUow(async (uow) => {
await projectRepository.save(project);
await uow.eventRepository.saveNewEventsBatch([
createNewEvent({
topic: "ProjectCreated",
payload: { project },
context: { projectId: project.id },
triggeredByActor: createUserActor(currentUserId),
flowId: requestId,
}),
]);
});4. Process events
Traditional server:
crawler.start();Serverless:
await withUow(
async (uow) => {
await uow.eventRepository.saveNewEventsBatch([event]);
},
{
afterCommit: async () => {
await crawler.triggerProcessing();
},
},
);Event shape
GenericEvent remains the base type.
type GenericEvent<Topic, Payload, Context = undefined> = {
id: EventId;
occurredAt: Date;
topic: Topic;
payload: Payload;
status: EventStatus;
publications: EventPublication[];
triggeredByActor: Actor;
flowId: string;
causedByEventId?: EventId;
priority?: number;
context?: Context;
};
type Actor =
| UserActor
| SystemActor
| WorkerActor
| ApiKeyActor
| AnonymousActor;
type UserActor<Id extends string = string> = { kind: "user"; id: Id };
type SystemActor = { kind: "system" };
type WorkerActor<Id extends string = string> = { kind: "worker"; id?: Id };
type ApiKeyActor<Id extends string = string> = { kind: "api-key"; id: Id };
type AnonymousActor = { kind: "anonymous" };Rules:
payloadis always requiredcontextis only required at the call site for topics that declare onepriorityis injected automatically fromeventDefinitionswhen providedflowIdis required — it is the correlation id that ties a chain of cause→effect events together. From inside an Effect computation usemakeCreateEventEffectso it is stamped fromFlowContextServiceautomatically (see Flow correlation below).
Creating events
From event definitions
import { createUserActor, makeCreateNewEvent } from "@l-etabli/events";
const createNewEvent = makeCreateNewEvent({
eventDefinitions,
});
createNewEvent({
topic: "ProjectCreated",
payload: { project },
context: { projectId: project.id },
triggeredByActor: createUserActor(currentUserId),
});
createNewEvent({
topic: "PingSent",
payload: { at: new Date() },
triggeredByActor: createWorkerActor("nightly-sync"),
});From an existing union
The previous union-based approach still works.
import { type GenericEvent, makeCreateNewEvent } from "@l-etabli/events";
type LegacyEvents =
| GenericEvent<"UserCreated", { userId: string; email: string }>
| GenericEvent<"OrderPlaced", { orderId: string }, { tenantId: string }>;
const createNewEvent = makeCreateNewEvent<LegacyEvents>();Event lifecycle
never-published -> in-process -> published
\-> failed-but-will-retry -> published
\-> quarantinedStatuses:
never-publishedto-republishin-processpublishedfailed-but-will-retryquarantined
Kysely migration helper
import type { Kysely } from "kysely";
export async function up(db: Kysely<unknown>): Promise<void> {
await db.schema
.createTable("events")
.addColumn("id", "text", (col) => col.primaryKey())
.addColumn("topic", "text", (col) => col.notNull())
.addColumn("payload", "jsonb", (col) => col.notNull())
.addColumn("context", "jsonb")
.addColumn("status", "text", (col) => col.notNull())
.addColumn("triggeredByActor", "jsonb", (col) => col.notNull())
.addColumn("flowId", "text")
.addColumn("causedByEventId", "text")
.addColumn("occurredAt", "timestamptz", (col) => col.notNull())
.addColumn("publications", "jsonb", (col) => col.notNull().defaultTo("[]"))
.addColumn("priority", "integer")
.addColumn("traceContext", "jsonb")
.execute();
}If you upgrade and already have an events table, add the column with:
ALTER TABLE events ADD COLUMN "traceContext" jsonb;Effect v4 support
Effect-native ports and adapters are available via dedicated subpath exports. Non-Effect users are not impacted — effect is an optional peer dependency.
bun add @l-etabli/events effectSetup
import { Effect } from "effect";
import type { InferEventsFromDefinitions } from "@l-etabli/events";
import {
createEffectInMemoryEventBus,
createEffectInMemoryEventRepositoryAndQueries,
createEffectEventCrawler,
} from "@l-etabli/events/effect";
const { eventQueries, withUow } =
createEffectInMemoryEventRepositoryAndQueries<AppEvents>();
const { eventBus, createNewEvent } = createEffectInMemoryEventBus({
eventDefinitions,
});
const crawler = createEffectEventCrawler({
withUow,
eventQueries,
eventBus,
});Subscribe
eventBus.subscribe({
topic: "ProjectCreated",
subscriptionId: "send-project-created-email",
callBack: (event) =>
Effect.promise(() => emailService.send(event.payload.project.id)),
});Emit
From inside a use case use makeCreateEventEffect and let
FlowContextService stamp flowId for you:
import {
FlowContextService,
makeCreateEventEffect,
} from "@l-etabli/events/effect";
const createEvent = makeCreateEventEffect({ eventDefinitions });
const placeOrder = (params: { project: Project; currentUserId: string }) =>
Effect.gen(function* () {
const event = yield* createEvent({
topic: "ProjectCreated",
payload: { project: params.project },
context: { projectId: params.project.id },
triggeredByActor: createUserActor(params.currentUserId),
});
yield* withUow((uow) => uow.eventRepository.save(event));
});
// Provide FlowContextService at the entry point — once per request/job:
await Effect.runPromise(
placeOrder(params).pipe(
Effect.provideService(FlowContextService, { flowId: requestId }),
),
);The lib does not own the transaction — withUow stays exactly where it
is. createEvent only builds the event, eventRepository.save persists it.
createEvent accepts both narrow and union-typed params with the same
signature, so a generic dispatcher (HTTP route, queue worker, …) can
forward a discriminated-union payload without casts. Use
CreateEventEffectInputFromDefinitions<D> as the canonical input type:
import {
type CreateEventEffectInputFromDefinitions,
makeCreateEventEffect,
} from "@l-etabli/events/effect";
const createEvent = makeCreateEventEffect({ eventDefinitions });
// Generic handler: the topic is only known at runtime.
const dispatchFromHttp = (
params: CreateEventEffectInputFromDefinitions<typeof eventDefinitions>,
) =>
Effect.gen(function* () {
const event = yield* createEvent(params);
yield* withUow((uow) => uow.eventRepository.save(event));
});The result is the matching event variant when params is a literal, and
the event union when params is a union — no as never anywhere. A typoed
topic, a wrong payload shape, or a flowId override all still fail to
compile.
Flow correlation
flowId is required on every event. It survives the whole cause→effect
chain so a single request id can be traced through every event it spawned.
Two pieces wire it up automatically:
FlowContextService— provide once at every Effect entry point. HTTP handlers map their request id toflowId. CLIs, crons and tests useadhocFlow():import { adhocFlow, FlowContextService } from "@l-etabli/events/effect"; Effect.provideService(program, FlowContextService, adhocFlow());instrumentEventHandler— wrap every event handler with it. The wrapper re-providesFlowContextServicefrom the incoming event so any child event emitted inside the handler inheritsflowIdand getscausedByEventId = event.idfor free:import { instrumentEventHandler } from "@l-etabli/events/effect"; eventBus.subscribe({ topic: "ProjectCreated", subscriptionId: "send-welcome-email", callBack: instrumentEventHandler((event) => Effect.gen(function* () { // any createEvent here automatically inherits the flow }), ), });
Use flowAttrs(event) for canonical observability keys (flow_id,
app.event.id, app.event.topic, app.event.caused_by_event_id) so log
lines stay consistent across services.
Sync (non-Effect) callers
Callers outside an Effect computation must pass flowId explicitly:
const event = createNewEvent({
topic: "ProjectCreated",
payload: { project },
context: { projectId: project.id },
triggeredByActor: createUserActor(currentUserId),
flowId: requestId,
});Process
// Traditional server
crawler.start();
// Serverless / on-demand
await Effect.runPromise(crawler.triggerProcessing());Pass an optional runtime so scheduler ticks inherit your app's Layer
(Logger, tracing, etc.). When omitted, ticks fall back to bare
Effect.runPromise.
import { Layer, ManagedRuntime } from "effect";
const runtime = ManagedRuntime.make(Layer.mergeAll(LoggerLayer, TracingLayer));
const crawler = createEffectEventCrawler({
withUow,
eventQueries,
eventBus,
runtime,
});
crawler.start();Only start() uses the runtime — triggerProcessing() still returns a
plain Effect you run yourself.
Kysely adapters
For PostgreSQL with Effect-native Kysely:
import {
createEffectKyselyEventRepository,
createEffectKyselyEventQueries,
} from "@l-etabli/events/effect-kysely";
const eventRepository = createEffectKyselyEventRepository<AppEvents, Db>(db);
const eventQueries = createEffectKyselyEventQueries<AppEvents, Db>(db);These adapters wrap Kysely queries with Effect.promise() — they work with any standard Kysely instance.
Subpath exports
| Import path | What | Requires |
|---|---|---|
| @l-etabli/events | Promise-based ports, in-memory adapters, crawler | — |
| @l-etabli/events/kysely | Promise-based Kysely adapters | kysely |
| @l-etabli/events/effect | Effect-native ports, in-memory adapters, crawler | effect |
| @l-etabli/events/effect-kysely | Effect-native Kysely adapters | effect, kysely |
Tracing
Every span this library opens is tagged with app.span.category: "event_mechanics",
so consumers can filter or aggregate outbox-related spans without listing
individual span names. Spans emitted on the Effect path:
EventCrawler.poll/EventCrawler.retry— one per non-empty cycleEventCrawler.markInProcess— batch transition before publish (poll only)EventBus.publish ${topic}— one per event dispatchedSubscription: ${subscriptionId}— one per subscriber invocation
EventCrawler.markProcessed— batch persistence of successfully published eventsEventCrawler.markFailed— batch persistence of failed/quarantined events
markProcessed and markFailed are siblings of EventBus.publish under the
poll/retry span — not children of publish. Empty cycles emit no spans.
Examples
See examples/ for Kysely integration, cascading events and serverless usage.
