@aimf/event-mesh
v0.1.1
Published
Real-time Event Mesh for Pub/Sub Messaging in AIMF
Maintainers
Readme
@aimf/event-mesh
Real-time Event Mesh for Pub/Sub Messaging in the AI-MCP Framework.
Overview
The Event Mesh package provides a high-performance, topic-based publish/subscribe messaging system with advanced features like pattern matching, typed channels, request/reply, and event aggregation.
Features
- Topic-based Pub/Sub: Flexible topic patterns with wildcards (
*and#) - Typed Channels: Type-safe event channels with TypeScript generics
- Request/Reply: RPC-style communication over the event bus
- Event Aggregation: Batch and window events by count or time
- Dead Letter Queue: Capture and handle failed events
- Priority Handling: Execute handlers in priority order
- Event Filtering: Filter events before they reach handlers
Installation
pnpm add @aimf/event-meshQuick Start
import { createEventBus, createChannel } from "@aimf/event-mesh";
// Create an event bus
const bus = createEventBus();
// Simple pub/sub
bus.subscribe("users.created", (data) => {
console.log("User created:", data);
});
await bus.publish("users.created", { id: 1, name: "Alice" });
// Pattern matching with wildcards
bus.subscribe("users.*", (data) => {
console.log("User event:", data);
});
bus.subscribe("users.#", (data) => {
console.log("Any user event:", data);
});
// Clean up
await bus.destroy();Topic Patterns
The event mesh supports two types of wildcards:
*- Matches exactly one segment#- Matches zero or more segments
// Match any direct child topic
bus.subscribe("users.*", handler);
// Matches: users.created, users.updated, users.deleted
// Does NOT match: users.profile.updated
// Match any descendant topic
bus.subscribe("users.#", handler);
// Matches: users, users.created, users.profile.updated
// Complex patterns
bus.subscribe("*.events.#", handler);
// Matches: users.events.login, orders.events.created.successTyped Channels
Create type-safe channels for specific event types:
import { createChannel, createChannelRegistry } from "@aimf/event-mesh";
interface UserCreatedEvent {
id: number;
name: string;
email: string;
}
// Create a typed channel
const userChannel = createChannel<UserCreatedEvent>(bus, "users.created");
// Subscribe with type safety
userChannel.subscribe((user) => {
console.log(user.name); // TypeScript knows this is a string
});
// Publish with type checking
await userChannel.publish({
id: 1,
name: "Alice",
email: "[email protected]"
});
// Channel Registry for managing multiple channels
const registry = createChannelRegistry(bus);
registry.register<UserCreatedEvent>("users.created");
registry.register<{ orderId: string }>("orders.created");
const channel = registry.get("users.created");Request/Reply Pattern
Implement RPC-style communication:
import { createRequestClient, createRequestResponder } from "@aimf/event-mesh";
// Server side - create responder
const responder = createRequestResponder(bus, "math.add");
responder.handle<{ a: number; b: number }, number>(({ a, b }) => a + b);
// Client side - send requests
const client = createRequestClient(bus);
const result = await client.request<{ a: number; b: number }, number>(
"math.add",
{ a: 5, b: 3 }
);
console.log(result); // 8
// Clean up
responder.stop();Event Aggregation
Batch events by count or time window:
import { createEventAggregator, aggregators } from "@aimf/event-mesh";
const aggregator = createEventAggregator(bus);
// Count-based aggregation
aggregator.addRule({
id: "batch-orders",
pattern: "orders.#",
windowType: "count",
windowSize: 10, // Emit after 10 events
outputTopic: "orders.batch",
aggregator: aggregators.collect,
});
// Time-based aggregation
aggregator.addRule({
id: "metrics-window",
pattern: "metrics.#",
windowType: "time",
windowSize: 5000, // Emit every 5 seconds
outputTopic: "metrics.batch",
aggregator: aggregators.sum,
});
// Subscribe to batched events
bus.subscribe("orders.batch", (batch) => {
console.log(`Processing ${batch.length} orders`);
});
// Built-in aggregators
// - aggregators.collect - Collect into array
// - aggregators.sum - Sum numeric values
// - aggregators.average - Average numeric values
// - aggregators.count - Count events
// - aggregators.first - Get first value
// - aggregators.last - Get last value
// - aggregators.min - Get minimum value
// - aggregators.max - Get maximum value
// - aggregators.groupBy(key) - Group by object key
// Clean up
aggregator.destroy();Event Filtering
Filter events before they reach handlers:
bus.subscribe(
"users.#",
(user) => {
console.log("Admin user:", user);
},
{
filter: (data) => data.role === "admin",
}
);Priority Handling
Control handler execution order:
// Higher priority handlers execute first
bus.subscribe("events.important", handler1, { priority: 1 });
bus.subscribe("events.important", handler2, { priority: 10 }); // Runs first
bus.subscribe("events.important", handler3, { priority: 5 });Dead Letter Queue
Capture and handle failed events:
// Listen for dead letters
bus.onDeadLetter((deadLetter) => {
console.error(`Event failed: ${deadLetter.topic}`, deadLetter.error);
// Retry or log the failure
});
// Get dead letter queue
const deadLetters = bus.getDeadLetterQueue();
// Clear the queue
bus.clearDeadLetterQueue();Waiting for Events
Wait for specific events with timeout:
// Wait for an event (with timeout)
const envelope = await bus.waitFor("users.verified", 5000);
console.log("User verified:", envelope.data);
// One-time subscription
bus.once("startup.complete", () => {
console.log("Application started");
});Event Envelope
Access full event metadata:
bus.subscribe("events.test", (data, envelope) => {
console.log("Event ID:", envelope.metadata.id);
console.log("Timestamp:", envelope.metadata.timestamp);
console.log("Correlation ID:", envelope.metadata.correlationId);
console.log("Source:", envelope.metadata.source);
});
// Publish with metadata
await bus.publish("events.test", data, {
correlationId: "request-123",
source: "user-service",
});Configuration
const bus = createEventBus({
maxDeadLetterSize: 1000, // Maximum dead letters to keep
generateId: () => crypto.randomUUID(), // Custom ID generator
});Statistics
Monitor event bus activity:
const stats = bus.getStatistics();
console.log("Published:", stats.publishedCount);
console.log("Subscriptions:", stats.subscriptionCount);
console.log("Dead Letters:", stats.deadLetterCount);
// Aggregator statistics
const aggStats = aggregator.getStatistics("rule-id");
console.log("Events Processed:", aggStats.eventsProcessed);
console.log("Batches Emitted:", aggStats.batchesEmitted);Namespaced Channels
Create isolated channel namespaces:
import { createNamespacedChannels } from "@aimf/event-mesh";
const appChannels = createNamespacedChannels(bus, "myapp");
const usersChannel = appChannels.channel<UserEvent>("users");
// Topic: myapp.users
const ordersChannel = appChannels.channel<OrderEvent>("orders");
// Topic: myapp.ordersAPI Reference
EventBus
| Method | Description |
|--------|-------------|
| subscribe(topic, handler, options?) | Subscribe to events |
| unsubscribe(subscriptionId) | Unsubscribe by ID |
| publish(topic, data, metadata?) | Publish an event |
| once(topic, handler) | Subscribe for one event |
| waitFor(topic, timeout?) | Wait for an event |
| getSubscriptions() | Get all subscriptions |
| getTopics() | Get unique topics |
| hasSubscribers(topic) | Check for subscribers |
| getDeadLetterQueue() | Get failed events |
| clearDeadLetterQueue() | Clear dead letters |
| onDeadLetter(handler) | Handle dead letters |
| getStatistics() | Get bus statistics |
| destroy() | Clean up resources |
Topic Matching
| Function | Description |
|----------|-------------|
| parseTopicPattern(pattern) | Parse a topic pattern |
| matchTopic(pattern, topic) | Match topic against pattern |
| validateTopic(topic, allowPatterns?) | Validate topic format |
| isPattern(topic) | Check if topic contains wildcards |
Channel Functions
| Function | Description |
|----------|-------------|
| createChannel(bus, topic) | Create a typed channel |
| createChannelRegistry(bus) | Create a channel registry |
| createTypedChannel(bus, topic, options) | Create with options |
| createNamespacedChannels(bus, namespace) | Create namespaced factory |
Request/Reply
| Function | Description |
|----------|-------------|
| createRequestClient(bus, options?) | Create request client |
| createRequestResponder(bus, topic) | Create responder |
| createRequestReply(bus, topic) | Create paired client/responder |
Aggregator
| Function | Description |
|----------|-------------|
| createEventAggregator(bus) | Create aggregator |
| aggregators.collect | Collect events into array |
| aggregators.sum | Sum numeric events |
| aggregators.average | Average numeric events |
| aggregators.count | Count events |
| aggregators.first | Get first event |
| aggregators.last | Get last event |
| aggregators.min | Get minimum |
| aggregators.max | Get maximum |
| aggregators.groupBy(key) | Group by object key |
License
MIT
