@tarileo/events
v0.1.2
Published
Event system for the Tari framework - type-safe events, SSE streaming, and React hooks
Maintainers
Readme
@tarileo/events
Event system for the Tari framework. Provides type-safe event definitions, SSE (Server-Sent Events) streaming, and React hooks for real-time subscriptions.
Why This Exists
@tarileo/events was extracted from Elena's monolithic packages/shared into a standalone, framework-agnostic package. The goal was to decouple the event system from Elena's domain so it can be reused across any Tari-based application.
Key design decisions:
- Type-safe event descriptors --
defineEvent()returns a typed descriptor that the publisher and subscriber both consume, eliminating string-based mismatches betweenbus.publish()anduseSSESubscribe(). - Zod validation (optional) -- Schemas are validated at
create()time on the server, catching malformed payloads before they ever hit the wire. - SSE over WebSockets -- SSE is simpler (plain HTTP, no upgrade handshake, works through proxies), auto-reconnects via
fetchEventSource, and integrates naturally with Elysia'sReadableStreamresponses. - Channel-based routing -- Events are scoped as
tenant,actor, orglobal. The channel string (e.g.tenant:rest_abc) is derived from the scope + request context, so a restaurant staff member only receives events for their own restaurant. - In-memory EventBus -- Uses Node.js
EventEmitterinternally. Zero-latency, no Redis dependency. Perfect for single-instance deployments. TheEventBusAPI is abstract enough that a Redis-backed implementation can be swapped in later. - Separate entry points --
@tarileo/events/servercontains Node.js-only code (EventEmitter, ReadableStream) and is never bundled into browser builds.@tarileo/events/reactcontains React hooks. The root@tarileo/eventsexports only browser-safe type definitions.
Architecture
┌──────────────────────────────────────────────────────────────────┐
│ SERVER │
│ │
│ defineEvent("order.created", schema, { scope: "tenant" }) │
│ │ │
│ ▼ │
│ EventBus.publish(event, ctx, data) │
│ │ │
│ │ resolveChannel(scope, ctx) → "tenant:rest_abc" │
│ ▼ │
│ Node EventEmitter ──► streamMultiple(channels) ── async iter │
│ │ │
│ createSSEStream() ◄── wraps async iter in ReadableStream │
│ │ │
│ │ GET /api/v1/events/stream?channels=tenant:rest_abc │
│ ▼ │
│ ─── SSE response (text/event-stream) ───────────────────────► │
│ │
├──────────────────────────────────────────────────────────────────┤
│ CLIENT │
│ │
│ SSEProvider ◄── wraps SSEManager │
│ │ │
│ │ fetchEventSource() with Bearer token │
│ │ auto-reconnect (exponential backoff) │
│ │ heartbeat monitoring (90s timeout) │
│ │ visibility-change reconnection │
│ ▼ │
│ useSSE() → { isConnected, error, reconnectAttempts, ... } │
│ │ │
│ ▼ │
│ useSSESubscribe(MyEvent, callback) │
│ │ │
│ │ Matches on event.type === descriptor.type │
│ │ Calls callback with typed data │
│ ▼ │
│ Component re-renders with fresh data │
└──────────────────────────────────────────────────────────────────┘Package structure
src/
├── shared/ # Browser-safe: shared by server & client
│ ├── base-event.ts # BaseEvent class (legacy), SerializedEvent type
│ ├── define-event.ts # defineEvent() factory + EventDescriptor type
│ ├── channels.ts # EventChannels static helpers
│ └── events/ # Pre-defined event descriptors
│ ├── ai.events.ts # SubAgent, Plan, Task lifecycle events
│ ├── agent.events.ts # AgentExecution lifecycle events
│ ├── session.events.ts # Session CRUD events
│ └── streaming.events.ts # Message, text delta, tool call events
├── server/ # Node.js only (EventEmitter, ReadableStream)
│ ├── event-bus.ts # EventBus: publish + stream/channel routing
│ ├── sse-stream.ts # createSSEStream(): ReadableStream wrapper
│ └── types.ts # Event, EventContext
├── react/ # React 19 hooks
│ ├── provider.tsx # SSEProvider + useSSE()
│ ├── use-sse-subscribe.ts # useSSESubscribe(): typed per-event hook
│ └── types.ts # SSEContextValue
├── core/ # Framework-agnostic client runtime
│ ├── sse-manager.ts # SSEManager: fetchEventSource, reconnect, heartbeat
│ ├── event-emitter.ts # Generic typed EventEmitter
│ └── types.ts # SSEStatus, SSEManagerConfig
└── index.ts # Root barrel: re-exports shared typesInstallation
bun add @tarileo/eventsThe package has three entry points:
| Entry | Environment | Contents |
|---|---|---|
| @tarileo/events | Browser & Server | Types, defineEvent(), pre-defined event descriptors |
| @tarileo/events/server | Server only | EventBus, createSSEStream() |
| @tarileo/events/react | Browser only | SSEProvider, useSSE(), useSSESubscribe() |
Usage
1. Define an event
import { defineEvent } from "@tarileo/events";
import { z } from "zod";
// With Zod validation (recommended)
const OrderCreatedData = z.object({
orderId: z.string(),
total: z.number(),
});
export const OrderCreatedEvent = defineEvent(
"order.created",
OrderCreatedData,
{ scope: "tenant" }
);
// Without schema (interface only)
export const PingEvent = defineEvent<{ at: string }>(
"system.ping",
{ scope: "global" }
);2. Emit an event from the server
import { EventBus } from "@tarileo/events/server";
import { OrderCreatedEvent } from "./events";
const bus = new EventBus({
getTenantId: (ctx) => ctx.restaurantId as string,
getActorId: (ctx) => ctx.userId as string,
});
// Inside a request handler:
bus.publish(OrderCreatedEvent, ctx, {
orderId: "ord_123",
total: 45.99,
});3. Expose an SSE endpoint
import { Elysia } from "elysia";
import { createSSEStream } from "@tarileo/events/server";
new Elysia()
.get("/api/events/stream", ({ query, request }) => {
const channels = query.channels?.split(",") ?? [];
return new Response(
createSSEStream({
channels,
eventBus: bus,
signal: request.signal,
}),
{
headers: {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
Connection: "keep-alive",
},
}
);
});4. Subscribe in React
First, wrap your app with SSEProvider:
import { SSEProvider } from "@tarileo/events/react";
function App() {
return (
<SSEProvider url="/api/v1/events/stream">
<YourComponents />
</SSEProvider>
);
}Then subscribe to specific events in any component:
import { useSSE, useSSESubscribe } from "@tarileo/events/react";
import { OrderCreatedEvent } from "./events";
function OrderListener() {
const { isConnected, error } = useSSE();
useSSESubscribe(OrderCreatedEvent, (data) => {
// data is fully typed: { orderId: string; total: number }
console.log("New order:", data.orderId, data.total);
});
if (!isConnected) return <div>Connecting...</div>;
return null;
}5. Use pre-defined Tari events
The package ships with ready-to-use event descriptors for AI agent workflows:
import {
PlanCreatedEvent,
TaskStartedEvent,
TaskCompletedEvent,
TextStreamDeltaEvent,
} from "@tarileo/events";
import { useSSESubscribe } from "@tarileo/events/react";
function AgentMonitor({ sessionId }: { sessionId: string }) {
useSSESubscribe(PlanCreatedEvent, (data) => {
// data: { sessionId, planId, description?, taskCount, tasks }
console.log(`Plan ${data.planId} created with ${data.taskCount} tasks`);
});
useSSESubscribe(TaskStartedEvent, (data) => {
// data: { sessionId, planId, taskId, subagentName, task }
});
useSSESubscribe(TextStreamDeltaEvent, (data) => {
// data: { sessionId, delta, accumulatedText }
});
return null;
}Connection Lifecycle
The SSEManager (used internally by SSEProvider) handles:
| Feature | Behavior |
|---|---|
| Authentication | Reads bearer_token from localStorage, reconnects on token change |
| Reconnection | Exponential backoff: 1s → 1.5s → 2.25s → ... → max 10s |
| Heartbeat | Expects system.heartbeat event; disconnects after 90s of silence |
| Visibility | Reconnects when the browser tab becomes visible again |
| Connection timeout | Aborts if no onopen within 5s |
Access connection state via useSSE():
const {
isConnected, // boolean
isConnecting, // boolean
isReconnecting, // boolean
reconnectAttempts, // number
lastConnectedAt, // Date | null
lastHeartbeatAt, // Date | null
error, // Error | null
reconnect, // () => void — force reconnect
} = useSSE();Channel Routing
Events are scoped to control which clients receive them:
| Scope | Channel format | Example |
|---|---|---|
| "tenant" | tenant:<tenantId> | tenant:rest_abc |
| "actor" | actor:<actorId> | actor:user_xyz |
| "global" | global | global |
The server resolves the channel from the event's scope and the request context. The client connects to the channels it cares about (typically the current tenant's channel).
License
MIT
