@crossdelta/cloudevents
v0.3.2
Published
CloudEvents toolkit for TypeScript - Zod validation, handler discovery, NATS JetStream
Maintainers
Readme
@crossdelta/cloudevents
Type-safe event-driven microservices with NATS and Zod validation, using the CloudEvents specification.
NATS JetStream
┌──────────────┐
┌──────────────┐ │ │ ┌──────────────┐
│ Service │ │ Stream: │ │ Service │
│ (publish) │─────▶│ ORDERS │─────▶│ (consume) │
└──────────────┘ │ │ └──────────────┘
└──────────────┘
│ │
│ publishNatsRawEvent(...) │ handleEvent(...)
▼ ▼
┌──────────────┐ ┌──────────────┐
│ { orderId, │ │ Zod schema │
│ total } │ │ validates │
└──────────────┘ └──────────────┘bun add @crossdelta/cloudevents zod@4Prerequisites: A running NATS server with JetStream enabled.
Note: Requires Zod v4 for full TypeScript support.
Quick Start
1. Create an event handler (src/handlers/order-created.event.ts):
import { handleEvent } from '@crossdelta/cloudevents'
import { z } from 'zod'
const OrderCreatedSchema = z.object({
orderId: z.string(),
total: z.number(),
})
// Export type for use in use-cases
export type OrderCreatedEvent = z.infer<typeof OrderCreatedSchema>
export default handleEvent(
{
schema: OrderCreatedSchema,
type: 'orders.created',
},
async (data) => {
console.log(`New order: ${data.orderId}, total: ${data.total}`)
},
)2. Start consuming:
import { consumeJetStreamEvents } from '@crossdelta/cloudevents'
await consumeJetStreamEvents({
stream: 'ORDERS', // Auto-created if not exists
subjects: ['orders.*'],
consumer: 'my-service',
discover: './src/handlers/**/*.event.ts',
})3. Publish from another service:
import { publish } from '@crossdelta/cloudevents'
await publish('orders.created', { orderId: 'ord_123', total: 99.99 })That's it. Handlers are auto-discovered, validated with Zod, and messages persist in JetStream.
Why use this?
| Problem | Solution | |---------|----------| | Messages lost on restart | JetStream persists messages | | Scattered handler registration | Auto-discovery via glob patterns | | Runtime type errors | Zod validation with TypeScript inference | | Poison messages crash services | DLQ quarantines invalid messages |
Core Concepts
Event Type vs. Event Data
Important distinction:
- Event Type (
orders.created): Lives in the CloudEvent envelope (ce.type). Used for routing and handler matching. - Event Data (
{ orderId, total }): The actual payload. Does not include the type.
const Schema = z.object({
orderId: z.string(),
})
export default handleEvent(
{
schema: Schema,
type: 'orders.created',
},
async (data) => { ... }
)Handlers
Drop a *.event.ts file anywhere — it's auto-registered:
// src/handlers/user-signup.event.ts
import { z } from 'zod'
const UserSignupSchema = z.object({
email: z.string().email(),
name: z.string(),
})
// Export type for use in use-cases
export type UserSignupEvent = z.infer<typeof UserSignupSchema>
export default handleEvent(
{
schema: UserSignupSchema,
type: 'users.signup',
},
async (data) => {
await sendWelcomeEmail(data.email)
},
)Publishing
await publish('orders.created', orderData)Consuming
// JetStream (recommended) — persistent, retries, exactly-once
await consumeJetStreamEvents({
stream: 'ORDERS',
subjects: ['orders.*'],
consumer: 'billing',
discover: './src/handlers/**/*.event.ts',
})
// Core NATS — fire-and-forget, simpler
await consumeNatsEvents({
subjects: ['notifications.*'],
discover: './src/handlers/**/*.event.ts',
})Configuration
Environment Variables
NATS_URL=nats://localhost:4222
NATS_USER=myuser # optional
NATS_PASSWORD=mypass # optionalConsumer Options
await consumeJetStreamEvents({
// Required
stream: 'ORDERS',
subjects: ['orders.*'],
consumer: 'my-service',
discover: './src/handlers/**/*.event.ts',
// Optional
servers: 'nats://localhost:4222',
maxDeliver: 5, // Retry attempts
ackWait: 30_000, // Timeout per attempt (ms)
quarantineTopic: 'dlq', // For poison messages
})Advanced Features
Filter events by tenant:
export default handleEvent({
type: 'orders.created',
schema: OrderSchema,
tenantId: 'tenant-a', // Only process tenant-a events
}, async (data) => { ... })Add custom filter logic:
export default handleEvent({
type: 'orders.created',
schema: OrderSchema,
match: (event) => event.data.total > 100, // Only high-value orders
}, async (data) => { ... })Deduplication is built-in. For distributed systems, provide a Redis store:
const redisStore = {
has: (id) => redis.exists(`idem:${id}`),
add: (id, ttl) => redis.set(`idem:${id}`, '1', 'PX', ttl),
}
await consumeJetStreamEvents({
// ...
idempotencyStore: redisStore,
})Invalid messages are quarantined, not lost:
await consumeJetStreamEvents({
// ...
quarantineTopic: 'events.quarantine',
errorTopic: 'events.errors',
})import { Hono } from 'hono'
import { cloudEvents } from '@crossdelta/cloudevents'
const app = new Hono()
app.use('/events', cloudEvents({ discover: 'src/handlers/**/*.event.ts' }))API
| Function | Purpose |
|----------|---------|
| handleEvent(options, handler) | Create a handler |
| consumeJetStreamEvents(options) | Consume with persistence |
| consumeNatsEvents(options) | Consume fire-and-forget |
| publish(type, data) | Publish event |
License
MIT
