npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

@crossdelta/cloudevents

v0.8.2

Published

CloudEvents toolkit for TypeScript - Zod validation, handler discovery, NATS JetStream & Core

Downloads

1,541

Readme

@crossdelta/cloudevents

Type-safe event-driven microservices with NATS and Zod validation, using the CloudEvents specification.

                         NATS JetStream
                        ┌──────────────┐
  ┌──────────────┐      │              │      ┌──────────────┐
  │    Service   │      │   Stream:    │      │   Service    │
  │   (publish)  │─────▶│   ORDERS     │─────▶│  (consume)   │
  └──────────────┘      │              │      └──────────────┘
                        └──────────────┘
        │                                            │
        │  publish(...)                              │  handleEvent(...)
        ▼                                            ▼
  ┌──────────────┐                            ┌──────────────┐
  │ { orderId,   │                            │ Zod schema   │
  │   total }    │                            │ validates    │
  └──────────────┘                            └──────────────┘
bun add @crossdelta/cloudevents zod@4

Prerequisites: A running NATS server with JetStream enabled.

Note: Requires Zod v4 for full TypeScript support.

Quick Start

Note: Services never create streams. Streams must exist before services consume from them:

  • Development: Use NATS CLI or Platform SDK (pf dev auto-creates from contracts)
  • Production: Use infrastructure-as-code (Pulumi, Terraform) or the Stream Setup function below

1. Create an event handler (src/events/order-created.handler.ts):

import { handleEvent } from '@crossdelta/cloudevents'
import { z } from 'zod'

// Export schema for mock generation
export const OrderCreatedSchema = z.object({
  orderId: z.string(),
  total: z.number(),
})

// Export type for use in use-cases
export type OrderCreatedEvent = z.infer<typeof OrderCreatedSchema>

export default handleEvent(
  {
    schema: OrderCreatedSchema,
    type: 'order.created',
  },
  async (data) => {
    console.log(`New order: ${data.orderId}, total: ${data.total}`)
  },
)

2. Start consuming:

import { consumeJetStreams } from '@crossdelta/cloudevents'

consumeJetStreams({
  streams: ['ORDERS'],
  consumer: 'my-service',
  discover: './src/events/**/*.handler.ts',
})

3. Publish from another service:

import { publish } from '@crossdelta/cloudevents'

await publish('order.created', { orderId: 'ord_123', total: 99.99 })

That's it. Handlers are auto-discovered, validated with Zod, and messages persist in JetStream.

Stream creation is handled by your development environment or infrastructure — not by services:


Why use this?

| Problem | Solution | |---------|----------| | Messages lost on restart | JetStream persists messages | | Scattered handler registration | Auto-discovery via glob patterns | | Runtime type errors | Zod validation with TypeScript inference | | Poison messages crash services | DLQ quarantines invalid messages |


Core Concepts

Event Type vs. Event Data

Important distinction:

  • Event Type (order.created): Lives in the CloudEvent envelope (ce.type). Used for routing and handler matching.
  • Event Data ({ orderId, total }): The actual payload. Does not include the type.
const Schema = z.object({
  orderId: z.string(), 
})

export default handleEvent(
  {
    schema: Schema,
    type: 'order.created',
  },
  async (data) => { ... }
)

Handlers

Drop a *.handler.ts file anywhere — it's auto-registered:

// src/events/user-signup.handler.ts
import { z } from 'zod'

const UserSignupSchema = z.object({ 
  email: z.email(),
  name: z.string(),
})

// Export type for use in use-cases
export type UserSignupEvent = z.infer<typeof UserSignupSchema>

export default handleEvent(
  {
    schema: UserSignupSchema,
    type: 'user.signup',
  },
  async (data) => {
    await sendWelcomeEmail(data.email)
  },
)

Publishing

await publish('order.created', orderData)

Stream Setup

For infrastructure use only - Services should never call this function.

Create streams during infrastructure setup (Pulumi, deployment scripts):

import { ensureJetStreams } from '@crossdelta/cloudevents'

await ensureJetStreams({
  streams: [
    {
      stream: 'ORDERS',
      subjects: ['orders.>'],
      config: {
        maxAge: 7 * 24 * 60 * 60 * 1000, // 7 days retention
        replicas: 3,                       // For HA
      },
    },
  ],
})
})

Consuming

import { consumeJetStreams } from '@crossdelta/cloudevents'

// JetStream (recommended) — persistent, retries, exactly-once
consumeJetStreams({
  streams: ['ORDERS'],
  consumer: 'billing',
  discover: './src/events/**/*.handler.ts',
})

// Core NATS — fire-and-forget, simpler
await consumeNatsEvents({
  subject: 'notifications.*',
  consumerName: 'my-service',
  discover: './src/events/**/*.handler.ts',
})

// NATS Core Request-Reply — synchronous validation feedback
// Consumer returns result, publisher awaits it
import { request } from '@crossdelta/cloudevents'
const result = await request('call.received', payload)
// result: { accepted: true, data: { ... } } | { accepted: false, error: '...' }

Configuration

Environment Variables

NATS_URL=nats://localhost:4222
NATS_USER=myuser        # optional
NATS_PASSWORD=mypass    # optional

Optional: Google Cloud Pub/Sub

To use Google Cloud Pub/Sub instead of NATS, install the optional dependency:

bun add @google-cloud/pubsub
# or
npm install @google-cloud/pubsub

Then use the publishToPubSub function:

import { publishToPubSub } from '@crossdelta/cloudevents'

await publishToPubSub('order.created', { orderId: 'ord_123', total: 99.99 })

Note: @google-cloud/pubsub is not bundled with this library to keep the package size small. Install it only if you need Google Cloud Pub/Sub support.

Stream Options

await ensureJetStreams({
  streams: [
    {
      // Required
      stream: 'ORDERS',
      subjects: ['orders.>'],

      // Optional
      config: {
        maxAge: 7 * 24 * 60 * 60 * 1000,  // Message retention (ms)
        replicas: 1,                        // Replication factor
        storage: 'file',                    // 'file' or 'memory'
        retention: 'limits',                // 'limits', 'interest', or 'workqueue'
      },
    },
  ],
  // Optional
  servers: 'nats://localhost:4222',
})

Consumer Options

consumeJetStreams({
  // Required
  streams: ['ORDERS'],
  consumer: 'my-service',
  discover: './src/events/**/*.handler.ts',

  // Optional
  servers: 'nats://localhost:4222',
  filterSubjects: ['order.created'],  // Filter specific subjects
  maxDeliver: 5,                       // Retry attempts
  ackWait: 30_000,                     // Timeout per attempt (ms)
  quarantineTopic: 'dlq',              // For poison messages
})

Advanced Features

Contracts can include channel metadata to define which NATS JetStream stream they belong to. This enables infrastructure-as-code workflows where streams are materialized from contracts.

Basic Contract with Channel

import { createContract } from '@crossdelta/cloudevents'
import { z } from 'zod'

export const OrdersCreatedContract = createContract({
  type: 'order.created',
  channel: {
    stream: 'ORDERS',
    // subject defaults to 'order.created' if omitted
  },
  schema: z.object({
    orderId: z.string(),
    total: z.number(),
  }),
})

Channel Configuration

interface ChannelConfig {
  stream: string    // JetStream stream name (e.g., 'ORDERS')
  subject?: string  // NATS subject (optional override)
}

Subject Routing (CRITICAL):

  • Without subject: Domain is auto-pluralized: customer.created → subject customers.created
  • With subject: Uses exact subject specified (no auto-pluralization)
  • Why pluralize: Stream subjects are plural (customers.*), event types are singular (customer.created)

Examples:

// Auto-pluralized subject
createContract({
  type: 'order.created',          // Event type: singular
  channel: { stream: 'ORDERS' },  // Subject: order.created (plural)
})

// Custom subject (no auto-pluralization)
createContract({
  type: 'order.created',
  channel: { 
    stream: 'ORDERS',
    subject: 'orders.v1.created'  // Exact subject, no auto-pluralization
  },
})

Multiple Events, Same Stream

// order.created → ORDERS stream
export const OrdersCreatedContract = createContract({
  type: 'order.created',
  channel: { stream: 'ORDERS' },
  schema: OrderCreatedSchema,
})

// order.updated → ORDERS stream (same stream!)
export const OrdersUpdatedContract = createContract({
  type: 'order.updated',
  channel: { stream: 'ORDERS' },
  schema: OrderUpdatedSchema,
})

Result: Both events share the ORDERS stream with subjects order.created and order.updated.

Infrastructure Materialization

Channel metadata is conceptual - it defines routing, not infrastructure policy.

Contracts define:

  • Event type (order.created)
  • Stream routing (ORDERS)
  • Subject mapping (order.created)

Infrastructure defines:

  • Retention (7 days, 14 days, etc.)
  • Storage (disk, memory)
  • Replicas (1, 3, etc.)
  • Limits (max message size, etc.)

Example Infrastructure (Pulumi):

import { collectStreamDefinitions } from './helpers/streams'
import { ensureJetStreams } from '@crossdelta/cloudevents'

// Collect from contracts
const streams = collectStreamDefinitions()
// [{ stream: 'ORDERS', subjects: ['order.created', 'order.updated'] }]

// Materialize with policies
await ensureJetStreams({
  streams: streams.map(({ stream, subjects }) => ({
    stream,
    subjects,
    config: {
      maxAge: 14 * 24 * 60 * 60 * 1000, // 14 days
      replicas: 3,                       // High availability
      storage: 'file',                   // Persistent
    },
  })),
})

Development vs. Production

| Environment | Streams | How | Retention | |-------------|---------|-----|-----------| | Development | Ephemeral | Auto-created by services | None (memory) | | Production | Persistent | Materialized via infrastructure | Explicit (7d, 14d, etc.) |

Key Principle: Contracts define what and where, infrastructure defines how and how long.

Backward Compatibility

Channel metadata is optional - contracts without channel work as before:

// Legacy contract (still works)
export const LegacyContract = createContract({
  type: 'legacy.event',
  schema: LegacySchema,
  // No channel - handler still works
})

When multiple services consume the same event, use createContract to create shared event contracts:

// packages/contracts/src/events/orders/created.ts
import { createContract } from '@crossdelta/cloudevents'
import { z } from 'zod'

export const OrderCreatedSchema = z.object({
  orderId: z.string(),
  total: z.number(),
})

export type OrderCreatedData = z.infer<typeof OrderCreatedSchema>

export const OrdersCreatedContract = createContract({
  type: 'order.created',
  schema: OrderCreatedSchema,
})

Then import and use in handlers:

import { handleEvent } from '@crossdelta/cloudevents'
import { OrdersCreatedContract } from '@my-org/contracts'

export default handleEvent(
  OrdersCreatedContract,
  async (data) => {
    // data is typed as OrderCreatedData
    console.log(data.orderId)
  },
)

Benefits:

  • Single source of truth for event schemas
  • No schema duplication
  • Type safety across services

Filter events by tenant:

export default handleEvent({
  type: 'order.created',
  schema: OrderSchema,
  tenantId: 'tenant-a',  // Only process tenant-a events
}, async (data) => { ... })

Use type: '*' to create a handler that processes all events, regardless of type. Useful for audit logging, metrics collection, or event replay.

import { handleEvent } from '@crossdelta/cloudevents'
import { z } from 'zod'

export default handleEvent(
  {
    type: '*',
    schema: z.record(z.string(), z.unknown()),
  },
  async (data, context) => {
    await persistAuditEvent({
      eventType: context.eventType,
      source: context.source,
      data,
    })
  },
)

Routing behavior:

  • Specific handlers (type: 'order.created') always take priority
  • The wildcard handler runs only when no specific handler matches
  • Only one wildcard handler per consumer is recommended
  • Works with both JetStream and NATS Core consumers

Add custom filter logic:

export default handleEvent({
  type: 'order.created',
  schema: OrderSchema,
  match: (event) => event.data.total > 100,  // Only high-value orders
}, async (data) => { ... })

Deduplication is built-in. For distributed systems, provide a Redis store:

const redisStore = {
  has: (id) => redis.exists(`idem:${id}`),
  add: (id, ttl) => redis.set(`idem:${id}`, '1', 'PX', ttl),
}

await consumeJetStreams({
  streams: ['ORDERS'],
  consumer: 'my-service',
  discover: './src/events/**/*.handler.ts',
  idempotencyStore: redisStore,
})

Invalid messages are quarantined, not lost:

await consumeJetStreams({
  streams: ['ORDERS'],
  consumer: 'my-service',
  discover: './src/events/**/*.handler.ts',
  quarantineTopic: 'events.quarantine',  // For malformed messages
  errorTopic: 'events.errors',            // For handler errors
})

Messages that fail validation go to quarantineTopic, messages that crash handlers go to errorTopic.

import { Hono } from 'hono'
import { cloudEvents } from '@crossdelta/cloudevents'

const app = new Hono()
app.use('/events', cloudEvents({ discover: 'src/events/**/*.handler.ts' }))

API

| Function | Purpose | |----------|---------| | handleEvent(options, handler) | Create a handler (supports type: '*' wildcard) | | createContract(options) | Create shared event contract | | ensureJetStreams(options) | Create/update JetStream streams | | consumeJetStreams(options) | Consume from multiple streams | | consumeNatsEvents(options) | Consume fire-and-forget | | publish(type, data) | Publish event (fire-and-forget) | | request(type, data, opts?) | Publish event and await reply (Request-Reply) | | isNatsConnected() | Check if NATS publisher connection is live (for health checks) | | isConsumerConnected() | Check if any NATS consumer connection is live (for readiness probes) | | configureNatsPublisher(config) | Set connection options for the publisher singleton | | closeConnection() | Drain and close the NATS connection (for CLI tools) |

Distributed Tracing

When @crossdelta/telemetry is installed (optional peer dependency), trace context is propagated automatically across NATS:

  • Publish: The current W3C traceparent is injected as a CloudEvent extension field.
  • Consume: The traceparent extension is extracted and restored in AsyncLocalStorage before the handler runs.

This enables end-to-end distributed traces in Grafana:

Storefront → API Gateway → NATS → Orders Service → NATS → Audit Service
                └── all spans share the same traceId ──┘

No configuration required. If @crossdelta/telemetry is not installed, CloudEvents work normally without trace propagation.

License

MIT © crossdelta