@contract-kit/provider-event-bus-memory
v1.0.0
Published
In-memory event bus provider for contract-kit - implements EventBusPort for single-process applications
Maintainers
Readme
@contract-kit/provider-event-bus-memory
In-memory event bus provider for contract-kit that implements the EventBusPort interface. Suitable for single-process applications, testing, and development.
For distributed systems, consider implementing EventBusPort with a message broker like RabbitMQ, Kafka, or Redis Pub/Sub.
Installation
npm install @contract-kit/provider-event-bus-memory
# or
bun add @contract-kit/provider-event-bus-memoryTypeScript requirements
This package requires TypeScript 5.0 or higher for proper type inference.
Usage
Basic setup
import { defineEvent } from "@contract-kit/events";
import { createInMemoryEventBus } from "@contract-kit/provider-event-bus-memory";
import { z } from "zod";
// Define your domain events
const UserRegistered = defineEvent("user.registered", {
payload: z.object({
userId: z.string(),
email: z.string().email(),
}),
});
// Create the event bus
const eventBus = createInMemoryEventBus();
// Subscribe to events
const unsubscribe = eventBus.subscribe(UserRegistered, (payload) => {
console.log(`User registered: ${payload.email}`);
// Send welcome email, update analytics, etc.
});
// Publish events
eventBus.publish(UserRegistered, {
userId: "123",
email: "[email protected]",
});
// Unsubscribe when done
unsubscribe();Integration with Contract Kit
import { createServer } from "@contract-kit/server";
import { definePorts } from "@contract-kit/ports";
import { createInMemoryEventBus } from "@contract-kit/provider-event-bus-memory";
const basePorts = definePorts({
eventBus: createInMemoryEventBus(),
// ... other ports
});
const app = await createServer({
ports: basePorts,
createContext: ({ ports }) => ({
ports,
// ... other context
}),
routes: [
// ... your routes
],
});Devtools
Pass ctx.ports.devtools or a devtools port when creating the event bus to
record published events under the eventBus watcher:
const eventBus = createInMemoryEventBus(ctx.ports.devtools);Using in use cases
import { defineEvent } from "@contract-kit/events";
import { z } from "zod";
const OrderPlaced = defineEvent("order.placed", {
payload: z.object({
orderId: z.string(),
total: z.number(),
}),
});
// Subscribe to events in your application setup
ctx.ports.eventBus.subscribe(OrderPlaced, async (payload) => {
// Send order confirmation email
await ctx.ports.mailer.send({
to: customer.email,
subject: "Order Confirmation",
text: `Your order ${payload.orderId} has been placed!`,
});
});
// Publish validated events from your use cases
import { publishEvent } from "@contract-kit/events";
async function placeOrder(ctx: AppCtx, order: Order) {
// Save order to database
await ctx.ports.db.orders.create(order);
// Publish domain event
await publishEvent(ctx.ports.eventBus, OrderPlaced, {
orderId: order.id,
total: order.total,
});
}EventBus port API
publish<E>(event: E, payload: InferEventPayload<E>): void
Publish a domain event with a typed payload.
eventBus.publish(UserRegistered, {
userId: "123",
email: "[email protected]",
});Note: Event handlers are executed fire-and-forget. They are not awaited, and errors are not propagated. Wrap your handlers in try/catch for error handling.
subscribe<E>(event: E, handler: (payload) => void | Promise<void>): () => void
Subscribe to a domain event. Returns an unsubscribe function.
const unsubscribe = eventBus.subscribe(UserRegistered, (payload) => {
console.log(`New user: ${payload.email}`);
});
// Later, when you want to stop listening:
unsubscribe();TypeScript support
The event bus provides full type safety:
import type { EventBusPort } from "@contract-kit/ports";
import { definePorts } from "@contract-kit/ports";
// Type-safe ports definition
const ports = definePorts({
eventBus: createInMemoryEventBus() as EventBusPort,
// ... other ports
});
type AppPorts = typeof ports;Testing
The in-memory event bus is perfect for testing:
import { describe, expect, it, mock } from "bun:test";
describe("User Registration", () => {
it("should publish UserRegistered event", () => {
const eventBus = createInMemoryEventBus();
const handler = mock(() => {});
eventBus.subscribe(UserRegistered, handler);
// Perform registration
registerUser(ctx, { email: "[email protected]" });
expect(handler).toHaveBeenCalledWith({
userId: expect.any(String),
email: "[email protected]",
});
});
});Behavior
- Fire-and-forget: Event handlers are executed asynchronously and not awaited
- In-process: Events are only delivered within the same process
- Memory-only: No persistence - events are lost if the process crashes
- Order: Handlers are called in the order they were subscribed
- Multiple handlers: Multiple handlers can subscribe to the same event
- Error handling: Errors in handlers do not affect other handlers or the publisher
When to use
✅ Good for:
- Single-process applications
- Development and testing
- Simple event-driven workflows
- Decoupling application components
❌ Not suitable for:
- Distributed systems
- Event persistence requirements
- Guaranteed delivery needs
- Cross-service communication
For production distributed systems, implement EventBusPort with a proper message broker.
License
MIT
