@crossdelta/cloudevents
v0.8.2
Published
CloudEvents toolkit for TypeScript - Zod validation, handler discovery, NATS JetStream & Core
Downloads
1,541
Maintainers
Readme
@crossdelta/cloudevents
Type-safe event-driven microservices with NATS and Zod validation, using the CloudEvents specification.
NATS JetStream
┌──────────────┐
┌──────────────┐ │ │ ┌──────────────┐
│ Service │ │ Stream: │ │ Service │
│ (publish) │─────▶│ ORDERS │─────▶│ (consume) │
└──────────────┘ │ │ └──────────────┘
└──────────────┘
│ │
│ publish(...) │ handleEvent(...)
▼ ▼
┌──────────────┐ ┌──────────────┐
│ { orderId, │ │ Zod schema │
│ total } │ │ validates │
└──────────────┘ └──────────────┘bun add @crossdelta/cloudevents zod@4Prerequisites: A running NATS server with JetStream enabled.
Note: Requires Zod v4 for full TypeScript support.
Quick Start
Note: Services never create streams. Streams must exist before services consume from them:
- Development: Use NATS CLI or Platform SDK (
pf devauto-creates from contracts)- Production: Use infrastructure-as-code (Pulumi, Terraform) or the Stream Setup function below
1. Create an event handler (src/events/order-created.handler.ts):
import { handleEvent } from '@crossdelta/cloudevents'
import { z } from 'zod'
// Export schema for mock generation
export const OrderCreatedSchema = z.object({
orderId: z.string(),
total: z.number(),
})
// Export type for use in use-cases
export type OrderCreatedEvent = z.infer<typeof OrderCreatedSchema>
export default handleEvent(
{
schema: OrderCreatedSchema,
type: 'order.created',
},
async (data) => {
console.log(`New order: ${data.orderId}, total: ${data.total}`)
},
)2. Start consuming:
import { consumeJetStreams } from '@crossdelta/cloudevents'
consumeJetStreams({
streams: ['ORDERS'],
consumer: 'my-service',
discover: './src/events/**/*.handler.ts',
})3. Publish from another service:
import { publish } from '@crossdelta/cloudevents'
await publish('order.created', { orderId: 'ord_123', total: 99.99 })That's it. Handlers are auto-discovered, validated with Zod, and messages persist in JetStream.
Stream creation is handled by your development environment or infrastructure — not by services:
- Development: NATS CLI, @crossdelta/platform-sdk, or manual setup
- Production: See Stream Setup below for
ensureJetStreams()usage in infrastructure code
Why use this?
| Problem | Solution | |---------|----------| | Messages lost on restart | JetStream persists messages | | Scattered handler registration | Auto-discovery via glob patterns | | Runtime type errors | Zod validation with TypeScript inference | | Poison messages crash services | DLQ quarantines invalid messages |
Core Concepts
Event Type vs. Event Data
Important distinction:
- Event Type (
order.created): Lives in the CloudEvent envelope (ce.type). Used for routing and handler matching. - Event Data (
{ orderId, total }): The actual payload. Does not include the type.
const Schema = z.object({
orderId: z.string(),
})
export default handleEvent(
{
schema: Schema,
type: 'order.created',
},
async (data) => { ... }
)Handlers
Drop a *.handler.ts file anywhere — it's auto-registered:
// src/events/user-signup.handler.ts
import { z } from 'zod'
const UserSignupSchema = z.object({
email: z.email(),
name: z.string(),
})
// Export type for use in use-cases
export type UserSignupEvent = z.infer<typeof UserSignupSchema>
export default handleEvent(
{
schema: UserSignupSchema,
type: 'user.signup',
},
async (data) => {
await sendWelcomeEmail(data.email)
},
)Publishing
await publish('order.created', orderData)Stream Setup
For infrastructure use only - Services should never call this function.
Create streams during infrastructure setup (Pulumi, deployment scripts):
import { ensureJetStreams } from '@crossdelta/cloudevents'
await ensureJetStreams({
streams: [
{
stream: 'ORDERS',
subjects: ['orders.>'],
config: {
maxAge: 7 * 24 * 60 * 60 * 1000, // 7 days retention
replicas: 3, // For HA
},
},
],
})
})Consuming
import { consumeJetStreams } from '@crossdelta/cloudevents'
// JetStream (recommended) — persistent, retries, exactly-once
consumeJetStreams({
streams: ['ORDERS'],
consumer: 'billing',
discover: './src/events/**/*.handler.ts',
})
// Core NATS — fire-and-forget, simpler
await consumeNatsEvents({
subject: 'notifications.*',
consumerName: 'my-service',
discover: './src/events/**/*.handler.ts',
})
// NATS Core Request-Reply — synchronous validation feedback
// Consumer returns result, publisher awaits it
import { request } from '@crossdelta/cloudevents'
const result = await request('call.received', payload)
// result: { accepted: true, data: { ... } } | { accepted: false, error: '...' }Configuration
Environment Variables
NATS_URL=nats://localhost:4222
NATS_USER=myuser # optional
NATS_PASSWORD=mypass # optionalOptional: Google Cloud Pub/Sub
To use Google Cloud Pub/Sub instead of NATS, install the optional dependency:
bun add @google-cloud/pubsub
# or
npm install @google-cloud/pubsubThen use the publishToPubSub function:
import { publishToPubSub } from '@crossdelta/cloudevents'
await publishToPubSub('order.created', { orderId: 'ord_123', total: 99.99 })Note: @google-cloud/pubsub is not bundled with this library to keep the package size small. Install it only if you need Google Cloud Pub/Sub support.
Stream Options
await ensureJetStreams({
streams: [
{
// Required
stream: 'ORDERS',
subjects: ['orders.>'],
// Optional
config: {
maxAge: 7 * 24 * 60 * 60 * 1000, // Message retention (ms)
replicas: 1, // Replication factor
storage: 'file', // 'file' or 'memory'
retention: 'limits', // 'limits', 'interest', or 'workqueue'
},
},
],
// Optional
servers: 'nats://localhost:4222',
})Consumer Options
consumeJetStreams({
// Required
streams: ['ORDERS'],
consumer: 'my-service',
discover: './src/events/**/*.handler.ts',
// Optional
servers: 'nats://localhost:4222',
filterSubjects: ['order.created'], // Filter specific subjects
maxDeliver: 5, // Retry attempts
ackWait: 30_000, // Timeout per attempt (ms)
quarantineTopic: 'dlq', // For poison messages
})Advanced Features
Contracts can include channel metadata to define which NATS JetStream stream they belong to. This enables infrastructure-as-code workflows where streams are materialized from contracts.
Basic Contract with Channel
import { createContract } from '@crossdelta/cloudevents'
import { z } from 'zod'
export const OrdersCreatedContract = createContract({
type: 'order.created',
channel: {
stream: 'ORDERS',
// subject defaults to 'order.created' if omitted
},
schema: z.object({
orderId: z.string(),
total: z.number(),
}),
})Channel Configuration
interface ChannelConfig {
stream: string // JetStream stream name (e.g., 'ORDERS')
subject?: string // NATS subject (optional override)
}Subject Routing (CRITICAL):
- Without
subject: Domain is auto-pluralized:customer.created→ subjectcustomers.created - With
subject: Uses exact subject specified (no auto-pluralization) - Why pluralize: Stream subjects are plural (
customers.*), event types are singular (customer.created)
Examples:
// Auto-pluralized subject
createContract({
type: 'order.created', // Event type: singular
channel: { stream: 'ORDERS' }, // Subject: order.created (plural)
})
// Custom subject (no auto-pluralization)
createContract({
type: 'order.created',
channel: {
stream: 'ORDERS',
subject: 'orders.v1.created' // Exact subject, no auto-pluralization
},
})Multiple Events, Same Stream
// order.created → ORDERS stream
export const OrdersCreatedContract = createContract({
type: 'order.created',
channel: { stream: 'ORDERS' },
schema: OrderCreatedSchema,
})
// order.updated → ORDERS stream (same stream!)
export const OrdersUpdatedContract = createContract({
type: 'order.updated',
channel: { stream: 'ORDERS' },
schema: OrderUpdatedSchema,
})Result: Both events share the ORDERS stream with subjects order.created and order.updated.
Infrastructure Materialization
Channel metadata is conceptual - it defines routing, not infrastructure policy.
Contracts define:
- Event type (
order.created) - Stream routing (
ORDERS) - Subject mapping (
order.created)
Infrastructure defines:
- Retention (7 days, 14 days, etc.)
- Storage (disk, memory)
- Replicas (1, 3, etc.)
- Limits (max message size, etc.)
Example Infrastructure (Pulumi):
import { collectStreamDefinitions } from './helpers/streams'
import { ensureJetStreams } from '@crossdelta/cloudevents'
// Collect from contracts
const streams = collectStreamDefinitions()
// [{ stream: 'ORDERS', subjects: ['order.created', 'order.updated'] }]
// Materialize with policies
await ensureJetStreams({
streams: streams.map(({ stream, subjects }) => ({
stream,
subjects,
config: {
maxAge: 14 * 24 * 60 * 60 * 1000, // 14 days
replicas: 3, // High availability
storage: 'file', // Persistent
},
})),
})Development vs. Production
| Environment | Streams | How | Retention | |-------------|---------|-----|-----------| | Development | Ephemeral | Auto-created by services | None (memory) | | Production | Persistent | Materialized via infrastructure | Explicit (7d, 14d, etc.) |
Key Principle: Contracts define what and where, infrastructure defines how and how long.
Backward Compatibility
Channel metadata is optional - contracts without channel work as before:
// Legacy contract (still works)
export const LegacyContract = createContract({
type: 'legacy.event',
schema: LegacySchema,
// No channel - handler still works
})When multiple services consume the same event, use createContract to create shared event contracts:
// packages/contracts/src/events/orders/created.ts
import { createContract } from '@crossdelta/cloudevents'
import { z } from 'zod'
export const OrderCreatedSchema = z.object({
orderId: z.string(),
total: z.number(),
})
export type OrderCreatedData = z.infer<typeof OrderCreatedSchema>
export const OrdersCreatedContract = createContract({
type: 'order.created',
schema: OrderCreatedSchema,
})Then import and use in handlers:
import { handleEvent } from '@crossdelta/cloudevents'
import { OrdersCreatedContract } from '@my-org/contracts'
export default handleEvent(
OrdersCreatedContract,
async (data) => {
// data is typed as OrderCreatedData
console.log(data.orderId)
},
)Benefits:
- Single source of truth for event schemas
- No schema duplication
- Type safety across services
Filter events by tenant:
export default handleEvent({
type: 'order.created',
schema: OrderSchema,
tenantId: 'tenant-a', // Only process tenant-a events
}, async (data) => { ... })Use type: '*' to create a handler that processes all events, regardless of type. Useful for audit logging, metrics collection, or event replay.
import { handleEvent } from '@crossdelta/cloudevents'
import { z } from 'zod'
export default handleEvent(
{
type: '*',
schema: z.record(z.string(), z.unknown()),
},
async (data, context) => {
await persistAuditEvent({
eventType: context.eventType,
source: context.source,
data,
})
},
)Routing behavior:
- Specific handlers (
type: 'order.created') always take priority - The wildcard handler runs only when no specific handler matches
- Only one wildcard handler per consumer is recommended
- Works with both JetStream and NATS Core consumers
Add custom filter logic:
export default handleEvent({
type: 'order.created',
schema: OrderSchema,
match: (event) => event.data.total > 100, // Only high-value orders
}, async (data) => { ... })Deduplication is built-in. For distributed systems, provide a Redis store:
const redisStore = {
has: (id) => redis.exists(`idem:${id}`),
add: (id, ttl) => redis.set(`idem:${id}`, '1', 'PX', ttl),
}
await consumeJetStreams({
streams: ['ORDERS'],
consumer: 'my-service',
discover: './src/events/**/*.handler.ts',
idempotencyStore: redisStore,
})Invalid messages are quarantined, not lost:
await consumeJetStreams({
streams: ['ORDERS'],
consumer: 'my-service',
discover: './src/events/**/*.handler.ts',
quarantineTopic: 'events.quarantine', // For malformed messages
errorTopic: 'events.errors', // For handler errors
})Messages that fail validation go to quarantineTopic, messages that crash handlers go to errorTopic.
import { Hono } from 'hono'
import { cloudEvents } from '@crossdelta/cloudevents'
const app = new Hono()
app.use('/events', cloudEvents({ discover: 'src/events/**/*.handler.ts' }))API
| Function | Purpose |
|----------|---------|
| handleEvent(options, handler) | Create a handler (supports type: '*' wildcard) |
| createContract(options) | Create shared event contract |
| ensureJetStreams(options) | Create/update JetStream streams |
| consumeJetStreams(options) | Consume from multiple streams |
| consumeNatsEvents(options) | Consume fire-and-forget |
| publish(type, data) | Publish event (fire-and-forget) |
| request(type, data, opts?) | Publish event and await reply (Request-Reply) |
| isNatsConnected() | Check if NATS publisher connection is live (for health checks) |
| isConsumerConnected() | Check if any NATS consumer connection is live (for readiness probes) |
| configureNatsPublisher(config) | Set connection options for the publisher singleton |
| closeConnection() | Drain and close the NATS connection (for CLI tools) |
Distributed Tracing
When @crossdelta/telemetry is installed (optional peer dependency), trace context is propagated automatically across NATS:
- Publish: The current W3C
traceparentis injected as a CloudEvent extension field. - Consume: The
traceparentextension is extracted and restored in AsyncLocalStorage before the handler runs.
This enables end-to-end distributed traces in Grafana:
Storefront → API Gateway → NATS → Orders Service → NATS → Audit Service
└── all spans share the same traceId ──┘No configuration required. If @crossdelta/telemetry is not installed, CloudEvents work normally without trace propagation.
License
MIT © crossdelta
