npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2026 – Pkg Stats / Ryan Hefner

@emmett-community/emmett-google-pubsub

v0.3.0

Published

Google Cloud Pub/Sub message bus for Emmett - Event Sourcing development made simple

Downloads

376

Readme

@emmett-community/emmett-google-pubsub

Google Cloud Pub/Sub message bus implementation for Emmett, the Node.js event sourcing framework.

npm version License: MIT

Features

  • Distributed Message Bus - Scale command/event handling across multiple instances
  • Type-Safe - Full TypeScript support with comprehensive types
  • Automatic Topic Management - Auto-creates topics and subscriptions
  • Message Scheduling - Schedule commands/events for future execution
  • Error Handling - Built-in retry logic and dead letter queue support
  • Emulator Support - Local development with PubSub emulator
  • Emmett Compatible - Drop-in replacement for in-memory message bus
  • Producer-Only Mode - Use without starting consumers

Installation

npm install @emmett-community/emmett-google-pubsub @google-cloud/pubsub

Peer Dependencies

  • @event-driven-io/emmett ^0.39.0

Quick Start

import { PubSub } from '@google-cloud/pubsub';
import { getPubSubMessageBus } from '@emmett-community/emmett-google-pubsub';

// Initialize PubSub client
const pubsub = new PubSub({ projectId: 'your-project-id' });

// Create message bus
const messageBus = getPubSubMessageBus({ pubsub });

// Register command handler
messageBus.handle(async (command) => {
  console.log('Processing:', command.type, command.data);
}, 'AddProductItem');

// Subscribe to events
messageBus.subscribe(async (event) => {
  console.log('Received:', event.type, event.data);
}, 'ProductItemAdded');

// Start listening
await messageBus.start();

// Send commands and publish events
await messageBus.send({
  type: 'AddProductItem',
  data: { productId: '123', quantity: 2 },
});

await messageBus.publish({
  type: 'ProductItemAdded',
  data: { productId: '123', quantity: 2 },
});

How It Works

Topic/Subscription Strategy

The message bus uses a topic-per-type strategy:

Commands (1-to-1):
  Topic: {prefix}-cmd-{CommandType}
  Subscription: {prefix}-cmd-{CommandType}-{instanceId}
  → Only ONE handler processes each command

Events (1-to-many):
  Topic: {prefix}-evt-{EventType}
  Subscription: {prefix}-evt-{EventType}-{subscriberId}
  → ALL subscribers receive each event

Example topic names:

emmett-cmd-AddProductItem
emmett-cmd-AddProductItem-instance-abc123

emmett-evt-ProductItemAdded
emmett-evt-ProductItemAdded-subscriber-xyz789

Message Lifecycle

1. REGISTRATION     2. STARTUP          3. RUNTIME           4. SHUTDOWN
   handle()            start()             send/publish         close()
   subscribe()         → Create topics     → Route messages     → Stop listeners
                       → Create subs       → Execute handlers   → Cleanup
                       → Attach listeners  → Ack/Nack

Producer-Only Mode

You can use the message bus to only produce messages without consuming:

const messageBus = getPubSubMessageBus({ pubsub });

// No handlers, no start() needed
await messageBus.send({ type: 'MyCommand', data: {} });
await messageBus.publish({ type: 'MyEvent', data: {} });

API Reference

getPubSubMessageBus(config)

Creates a message bus instance.

const messageBus = getPubSubMessageBus({
  pubsub,                          // Required: PubSub client
  topicPrefix: 'myapp',            // Topic name prefix (default: "emmett")
  instanceId: 'worker-1',          // Instance ID (default: auto-generated)
  useEmulator: true,               // Emulator mode (default: false)
  autoCreateResources: true,       // Auto-create topics/subs (default: true)
  cleanupOnClose: false,           // Delete subs on close (default: false)
  closePubSubClient: true,         // Close PubSub on close (default: true)
  subscriptionOptions: {           // Subscription config
    ackDeadlineSeconds: 60,
    retryPolicy: {
      minimumBackoff: { seconds: 10 },
      maximumBackoff: { seconds: 600 },
    },
    deadLetterPolicy: {
      deadLetterTopic: 'projects/.../topics/dead-letters',
      maxDeliveryAttempts: 5,
    },
  },
});

Methods

| Method | Description | |--------|-------------| | send(command) | Send a command (1-to-1) | | publish(event) | Publish an event (1-to-many) | | handle(handler, ...types) | Register command handler | | subscribe(handler, ...types) | Subscribe to events | | schedule(message, options) | Schedule for future delivery | | dequeue() | Get scheduled messages (emulator only) | | start() | Start listening for messages | | close() | Graceful shutdown | | isStarted() | Check if running |

See docs/API.md for complete API documentation.

Configuration

Basic Configuration

const messageBus = getPubSubMessageBus({
  pubsub: new PubSub({ projectId: 'my-project' }),
  topicPrefix: 'orders',
});

Emulator Configuration

// Set environment variable
process.env.PUBSUB_EMULATOR_HOST = 'localhost:8085';

const pubsub = new PubSub({ projectId: 'demo-project' });
const messageBus = getPubSubMessageBus({
  pubsub,
  useEmulator: true,  // Enables in-memory scheduling
});

Production Configuration

const pubsub = new PubSub({
  projectId: process.env.GCP_PROJECT_ID,
  // Uses Application Default Credentials or Workload Identity
});

const messageBus = getPubSubMessageBus({
  pubsub,
  topicPrefix: 'prod-myapp',
  subscriptionOptions: {
    ackDeadlineSeconds: 120,
    retryPolicy: {
      minimumBackoff: { seconds: 5 },
      maximumBackoff: { seconds: 300 },
    },
  },
});

Testing

Testing Utilities

import { PubSub } from '@google-cloud/pubsub';
import { getPubSubMessageBus } from '@emmett-community/emmett-google-pubsub';

describe('My Tests', () => {
  let pubsub: PubSub;
  let messageBus: ReturnType<typeof getPubSubMessageBus>;

  beforeAll(() => {
    pubsub = new PubSub({ projectId: 'test-project' });
  });

  beforeEach(() => {
    messageBus = getPubSubMessageBus({
      pubsub,
      useEmulator: true,
      topicPrefix: `test-${Date.now()}`,
      cleanupOnClose: true,
      closePubSubClient: false,
    });
  });

  afterEach(async () => {
    await messageBus.close();
  });

  afterAll(async () => {
    await pubsub.close();
  });

  it('should handle commands', async () => {
    const received: unknown[] = [];

    messageBus.handle(async (cmd) => {
      received.push(cmd);
    }, 'TestCommand');

    await messageBus.start();

    await messageBus.send({
      type: 'TestCommand',
      data: { value: 42 },
    });

    // Wait for async delivery
    await new Promise((r) => setTimeout(r, 500));

    expect(received).toHaveLength(1);
  });
});

Running Tests

# Unit tests
npm run test:unit

# Integration tests (in-memory)
npm run test:int

# E2E tests (PubSub emulator via Testcontainers, requires Docker)
npm run test:e2e

# All tests
npm test

E2E tests start the emulator automatically via Testcontainers.

Examples

Complete Shopping Cart Example

See examples/shopping-cart for a full application including:

  • Event-sourced shopping cart with Firestore
  • Express.js API with OpenAPI spec
  • Docker Compose setup with all emulators
  • Unit, integration, and E2E tests
cd examples/shopping-cart
docker-compose up

# API: http://localhost:3000
# Firebase UI: http://localhost:4000
# PubSub UI: http://localhost:4001

Multiple Event Subscribers

// Analytics service
messageBus.subscribe(async (event) => {
  await analytics.track(event);
}, 'OrderCreated');

// Notification service
messageBus.subscribe(async (event) => {
  await email.sendConfirmation(event.data.customerId);
}, 'OrderCreated');

// Inventory service
messageBus.subscribe(async (event) => {
  await inventory.reserve(event.data.items);
}, 'OrderCreated');

// All three receive every OrderCreated event

Scheduled Messages

// Schedule for future
messageBus.schedule(
  { type: 'SendReminder', data: { userId: '123' } },
  { afterInMs: 24 * 60 * 60 * 1000 }  // 24 hours
);

// Schedule for specific time
messageBus.schedule(
  { type: 'SendReminder', data: { userId: '123' } },
  { at: new Date('2024-12-25T10:00:00Z') }
);

See docs/EXAMPLES.md for more examples.

Architecture

Message Format

Messages are wrapped in an envelope for transport:

interface PubSubMessageEnvelope {
  type: string;           // Message type name
  kind: 'command' | 'event';
  data: unknown;          // Serialized data
  metadata?: unknown;     // Optional metadata
  timestamp: string;      // ISO 8601
  messageId: string;      // UUID for idempotency
}

Date Serialization

JavaScript Date objects are preserved through serialization:

// Original
{ createdAt: new Date('2024-01-15T10:00:00Z') }

// Serialized
{ createdAt: { __type: 'Date', value: '2024-01-15T10:00:00.000Z' } }

// Deserialized (restored as Date object)
{ createdAt: Date('2024-01-15T10:00:00Z') }

Error Handling

| Scenario | Behavior | |----------|----------| | Handler succeeds | Message acknowledged | | Transient error | Message nack'd, retried with backoff | | Permanent error | Message ack'd, logged | | No handler | Message nack'd for retry |

See docs/ARCHITECTURE.md for design decisions.

Observability

The package supports optional observability through structured logging and OpenTelemetry tracing.

Logging

Logging is opt-in and completely silent by default. To enable logging, provide a logger that implements the canonical (context, message) contract:

const messageBus = getPubSubMessageBus({
  pubsub,
  observability: {
    logger: {
      debug: (context, message) => console.debug(message, context),
      info: (context, message) => console.info(message, context),
      warn: (context, message) => console.warn(message, context),
      error: (context, message) => console.error(message, context),
    },
  },
});

Logger Contract:

The logger MUST implement the canonical (context, message) contract:

  • context: Structured data as Record<string, unknown> (first parameter)
  • message: Human-readable log message (second parameter, optional)

Pino is natively compatible. For Winston, use an adapter.

Log Levels:

  • info - Lifecycle events (start, stop)
  • debug - External I/O operations (publish, subscribe)
  • warn - Recoverable failures (timeouts, retries)
  • error - Failures (with Error objects in { err: error } format)

Tracing (OpenTelemetry)

The package creates OpenTelemetry spans for key operations. Tracing is passive - the @opentelemetry/api is a no-op by default.

To enable tracing, configure OpenTelemetry in your application:

import { NodeSDK } from '@opentelemetry/sdk-node';
const sdk = new NodeSDK({ /* config */ });
sdk.start();

// Spans from emmett-google-pubsub are now captured
const messageBus = getPubSubMessageBus({ pubsub });

Notes:

  • The package never initializes OpenTelemetry
  • No tracing flags needed - spans are always created (no-op if SDK not initialized)
  • Message types and payloads are never included in spans or logs

Compatibility

  • Node.js: >= 18.0.0
  • Emmett: ^0.39.0
  • @google-cloud/pubsub: ^4.8.0

Contributing

Contributions are welcome! Please:

  1. Fork the repository
  2. Create a feature branch
  3. Add tests for new functionality
  4. Ensure all tests pass
  5. Submit a pull request

Development

# Install dependencies
npm install

# Build
npm run build

# Run tests
npm test

# Run unit tests only
npm run test:unit

# Run integration tests (in-memory)
npm run test:int

# Run E2E tests (requires Docker)
npm run test:e2e

# Lint
npm run lint

# Format
npm run format

License

MIT

Related Packages

Support


Made with ❤️ by the Emmett Community