@beignet/provider-event-bus-memory
v0.0.3
Published
In-memory event bus provider for Beignet - implements EventBusPort for single-process applications
Downloads
412
Maintainers
Readme
@beignet/provider-event-bus-memory
In-memory EventBusPort adapter for Beignet applications.
Use it for tests, local development, and single-process apps. Distributed
systems should adapt a queue, stream, outbox, or message broker behind the same
EventBusPort interface.
Install
bun add @beignet/provider-event-bus-memoryDirect setup
import { defineEvent } from "@beignet/core/events";
import { createInMemoryEventBus } from "@beignet/provider-event-bus-memory";
import { z } from "zod";
// Define your domain events
const UserRegistered = defineEvent("user.registered", {
payload: z.object({
userId: z.string(),
email: z.string().email(),
}),
});
// Create the event bus
const eventBus = createInMemoryEventBus();
// Subscribe to events
const unsubscribe = eventBus.subscribe(UserRegistered, (payload) => {
console.log(`User registered: ${payload.email}`);
// Send welcome email, update analytics, etc.
});
// Publish events
await eventBus.publish(UserRegistered, {
userId: "123",
email: "[email protected]",
});
// Unsubscribe when done
unsubscribe();Framework setup
import { createNextServer } from "@beignet/next";
import { createInMemoryEventBusProvider } from "@beignet/provider-event-bus-memory";
import { appPorts } from "@/infra/app-ports";
import { routes } from "@/server/routes";
export const server = await createNextServer({
ports: appPorts,
providers: [createInMemoryEventBusProvider()],
createContext: ({ ports }) => ({
ports,
}),
routes,
});Use createInMemoryEventBus() directly when you want to manually assign an
event bus under ports.
Instrumentation
Pass a provider instrumentation target when creating the direct event bus to
record published events under the eventBus watcher:
const eventBus = createInMemoryEventBus({
instrumentation: ports,
});Using in use cases
import { defineEvent } from "@beignet/core/events";
import { z } from "zod";
const OrderPlaced = defineEvent("order.placed", {
payload: z.object({
orderId: z.string(),
total: z.number(),
}),
});
// Subscribe to events in your application setup
ctx.ports.eventBus.subscribe(OrderPlaced, async (payload) => {
// Send order confirmation email
await ctx.ports.mailer.send({
to: customer.email,
subject: "Order Confirmation",
text: `Your order ${payload.orderId} has been placed!`,
});
});
const placeOrder = useCase
.command("orders.place")
.input(PlaceOrderInput)
.output(OrderOutput)
.emits([OrderPlaced])
.run(async ({ ctx, input, events }) => {
return ctx.ports.uow.transaction(async (tx) => {
const order = await tx.orders.create(input);
await events.record(tx.events, OrderPlaced, {
orderId: order.id,
total: order.total,
});
return order;
});
});EventBus port API
publish<E>(event: E, payload: InferEventPayload<E>): Promise<void> | void
Publish a domain event with a typed payload.
await eventBus.publish(UserRegistered, {
userId: "123",
email: "[email protected]",
});By default, the in-memory bus awaits handlers so local development and tests are
deterministic. Handler errors are rethrown unless onHandlerError is provided.
Use delivery: "fire-and-forget" when you intentionally want detached
in-process delivery.
subscribe<E>(event: E, handler: (payload) => void | Promise<void>): () => void
Subscribe to a domain event. Returns an unsubscribe function.
const unsubscribe = eventBus.subscribe(UserRegistered, (payload) => {
console.log(`New user: ${payload.email}`);
});
// Later, when you want to stop listening:
unsubscribe();TypeScript support
The event bus provides full type safety:
import type { EventBusPort } from "@beignet/core/ports";
import { definePorts } from "@beignet/core/ports";
// Type-safe ports definition
const appPorts = definePorts({
eventBus: createInMemoryEventBus() as EventBusPort,
// ... other ports
});
type AppPorts = typeof appPorts;Testing
The in-memory event bus is perfect for testing:
import { describe, expect, it, mock } from "bun:test";
describe("User Registration", () => {
it("should publish UserRegistered event", async () => {
const eventBus = createInMemoryEventBus();
const handler = mock(() => {});
eventBus.subscribe(UserRegistered, handler);
// Perform registration
await registerUser(ctx, { email: "[email protected]" });
expect(handler).toHaveBeenCalledWith({
userId: expect.any(String),
email: "[email protected]",
});
});
});Behavior
- Awaited by default:
publish(...)waits for subscribed handlers unlessdelivery: "fire-and-forget"is configured - In-process: Events are only delivered within the same process
- Memory-only: No persistence - events are lost if the process crashes
- Order: Handlers are called in the order they were subscribed
- Multiple handlers: Multiple handlers can subscribe to the same event
- Error handling: Handler errors reject
publish(...)unlessonHandlerErroris configured
When to use
Good for:
- Single-process applications
- Development and testing
- Simple event-driven workflows
- Decoupling application components
Not suitable for:
- Distributed systems
- Event persistence requirements
- Guaranteed delivery needs
- Cross-service communication
For production distributed systems, implement EventBusPort with a proper message broker.
License
MIT
