@emmett-community/emmett-google-pubsub
v0.3.0
Published
Google Cloud Pub/Sub message bus for Emmett - Event Sourcing development made simple
Downloads
376
Maintainers
Readme
@emmett-community/emmett-google-pubsub
Google Cloud Pub/Sub message bus implementation for Emmett, the Node.js event sourcing framework.
Features
- ✅ Distributed Message Bus - Scale command/event handling across multiple instances
- ✅ Type-Safe - Full TypeScript support with comprehensive types
- ✅ Automatic Topic Management - Auto-creates topics and subscriptions
- ✅ Message Scheduling - Schedule commands/events for future execution
- ✅ Error Handling - Built-in retry logic and dead letter queue support
- ✅ Emulator Support - Local development with PubSub emulator
- ✅ Emmett Compatible - Drop-in replacement for in-memory message bus
- ✅ Producer-Only Mode - Use without starting consumers
Installation
npm install @emmett-community/emmett-google-pubsub @google-cloud/pubsubPeer Dependencies
@event-driven-io/emmett^0.39.0
Quick Start
import { PubSub } from '@google-cloud/pubsub';
import { getPubSubMessageBus } from '@emmett-community/emmett-google-pubsub';
// Initialize PubSub client
const pubsub = new PubSub({ projectId: 'your-project-id' });
// Create message bus
const messageBus = getPubSubMessageBus({ pubsub });
// Register command handler
messageBus.handle(async (command) => {
console.log('Processing:', command.type, command.data);
}, 'AddProductItem');
// Subscribe to events
messageBus.subscribe(async (event) => {
console.log('Received:', event.type, event.data);
}, 'ProductItemAdded');
// Start listening
await messageBus.start();
// Send commands and publish events
await messageBus.send({
type: 'AddProductItem',
data: { productId: '123', quantity: 2 },
});
await messageBus.publish({
type: 'ProductItemAdded',
data: { productId: '123', quantity: 2 },
});How It Works
Topic/Subscription Strategy
The message bus uses a topic-per-type strategy:
Commands (1-to-1):
Topic: {prefix}-cmd-{CommandType}
Subscription: {prefix}-cmd-{CommandType}-{instanceId}
→ Only ONE handler processes each command
Events (1-to-many):
Topic: {prefix}-evt-{EventType}
Subscription: {prefix}-evt-{EventType}-{subscriberId}
→ ALL subscribers receive each eventExample topic names:
emmett-cmd-AddProductItem
emmett-cmd-AddProductItem-instance-abc123
emmett-evt-ProductItemAdded
emmett-evt-ProductItemAdded-subscriber-xyz789Message Lifecycle
1. REGISTRATION 2. STARTUP 3. RUNTIME 4. SHUTDOWN
handle() start() send/publish close()
subscribe() → Create topics → Route messages → Stop listeners
→ Create subs → Execute handlers → Cleanup
→ Attach listeners → Ack/NackProducer-Only Mode
You can use the message bus to only produce messages without consuming:
const messageBus = getPubSubMessageBus({ pubsub });
// No handlers, no start() needed
await messageBus.send({ type: 'MyCommand', data: {} });
await messageBus.publish({ type: 'MyEvent', data: {} });API Reference
getPubSubMessageBus(config)
Creates a message bus instance.
const messageBus = getPubSubMessageBus({
pubsub, // Required: PubSub client
topicPrefix: 'myapp', // Topic name prefix (default: "emmett")
instanceId: 'worker-1', // Instance ID (default: auto-generated)
useEmulator: true, // Emulator mode (default: false)
autoCreateResources: true, // Auto-create topics/subs (default: true)
cleanupOnClose: false, // Delete subs on close (default: false)
closePubSubClient: true, // Close PubSub on close (default: true)
subscriptionOptions: { // Subscription config
ackDeadlineSeconds: 60,
retryPolicy: {
minimumBackoff: { seconds: 10 },
maximumBackoff: { seconds: 600 },
},
deadLetterPolicy: {
deadLetterTopic: 'projects/.../topics/dead-letters',
maxDeliveryAttempts: 5,
},
},
});Methods
| Method | Description |
|--------|-------------|
| send(command) | Send a command (1-to-1) |
| publish(event) | Publish an event (1-to-many) |
| handle(handler, ...types) | Register command handler |
| subscribe(handler, ...types) | Subscribe to events |
| schedule(message, options) | Schedule for future delivery |
| dequeue() | Get scheduled messages (emulator only) |
| start() | Start listening for messages |
| close() | Graceful shutdown |
| isStarted() | Check if running |
See docs/API.md for complete API documentation.
Configuration
Basic Configuration
const messageBus = getPubSubMessageBus({
pubsub: new PubSub({ projectId: 'my-project' }),
topicPrefix: 'orders',
});Emulator Configuration
// Set environment variable
process.env.PUBSUB_EMULATOR_HOST = 'localhost:8085';
const pubsub = new PubSub({ projectId: 'demo-project' });
const messageBus = getPubSubMessageBus({
pubsub,
useEmulator: true, // Enables in-memory scheduling
});Production Configuration
const pubsub = new PubSub({
projectId: process.env.GCP_PROJECT_ID,
// Uses Application Default Credentials or Workload Identity
});
const messageBus = getPubSubMessageBus({
pubsub,
topicPrefix: 'prod-myapp',
subscriptionOptions: {
ackDeadlineSeconds: 120,
retryPolicy: {
minimumBackoff: { seconds: 5 },
maximumBackoff: { seconds: 300 },
},
},
});Testing
Testing Utilities
import { PubSub } from '@google-cloud/pubsub';
import { getPubSubMessageBus } from '@emmett-community/emmett-google-pubsub';
describe('My Tests', () => {
let pubsub: PubSub;
let messageBus: ReturnType<typeof getPubSubMessageBus>;
beforeAll(() => {
pubsub = new PubSub({ projectId: 'test-project' });
});
beforeEach(() => {
messageBus = getPubSubMessageBus({
pubsub,
useEmulator: true,
topicPrefix: `test-${Date.now()}`,
cleanupOnClose: true,
closePubSubClient: false,
});
});
afterEach(async () => {
await messageBus.close();
});
afterAll(async () => {
await pubsub.close();
});
it('should handle commands', async () => {
const received: unknown[] = [];
messageBus.handle(async (cmd) => {
received.push(cmd);
}, 'TestCommand');
await messageBus.start();
await messageBus.send({
type: 'TestCommand',
data: { value: 42 },
});
// Wait for async delivery
await new Promise((r) => setTimeout(r, 500));
expect(received).toHaveLength(1);
});
});Running Tests
# Unit tests
npm run test:unit
# Integration tests (in-memory)
npm run test:int
# E2E tests (PubSub emulator via Testcontainers, requires Docker)
npm run test:e2e
# All tests
npm testE2E tests start the emulator automatically via Testcontainers.
Examples
Complete Shopping Cart Example
See examples/shopping-cart for a full application including:
- Event-sourced shopping cart with Firestore
- Express.js API with OpenAPI spec
- Docker Compose setup with all emulators
- Unit, integration, and E2E tests
cd examples/shopping-cart
docker-compose up
# API: http://localhost:3000
# Firebase UI: http://localhost:4000
# PubSub UI: http://localhost:4001Multiple Event Subscribers
// Analytics service
messageBus.subscribe(async (event) => {
await analytics.track(event);
}, 'OrderCreated');
// Notification service
messageBus.subscribe(async (event) => {
await email.sendConfirmation(event.data.customerId);
}, 'OrderCreated');
// Inventory service
messageBus.subscribe(async (event) => {
await inventory.reserve(event.data.items);
}, 'OrderCreated');
// All three receive every OrderCreated eventScheduled Messages
// Schedule for future
messageBus.schedule(
{ type: 'SendReminder', data: { userId: '123' } },
{ afterInMs: 24 * 60 * 60 * 1000 } // 24 hours
);
// Schedule for specific time
messageBus.schedule(
{ type: 'SendReminder', data: { userId: '123' } },
{ at: new Date('2024-12-25T10:00:00Z') }
);See docs/EXAMPLES.md for more examples.
Architecture
Message Format
Messages are wrapped in an envelope for transport:
interface PubSubMessageEnvelope {
type: string; // Message type name
kind: 'command' | 'event';
data: unknown; // Serialized data
metadata?: unknown; // Optional metadata
timestamp: string; // ISO 8601
messageId: string; // UUID for idempotency
}Date Serialization
JavaScript Date objects are preserved through serialization:
// Original
{ createdAt: new Date('2024-01-15T10:00:00Z') }
// Serialized
{ createdAt: { __type: 'Date', value: '2024-01-15T10:00:00.000Z' } }
// Deserialized (restored as Date object)
{ createdAt: Date('2024-01-15T10:00:00Z') }Error Handling
| Scenario | Behavior | |----------|----------| | Handler succeeds | Message acknowledged | | Transient error | Message nack'd, retried with backoff | | Permanent error | Message ack'd, logged | | No handler | Message nack'd for retry |
See docs/ARCHITECTURE.md for design decisions.
Observability
The package supports optional observability through structured logging and OpenTelemetry tracing.
Logging
Logging is opt-in and completely silent by default. To enable logging, provide a logger that implements the canonical (context, message) contract:
const messageBus = getPubSubMessageBus({
pubsub,
observability: {
logger: {
debug: (context, message) => console.debug(message, context),
info: (context, message) => console.info(message, context),
warn: (context, message) => console.warn(message, context),
error: (context, message) => console.error(message, context),
},
},
});Logger Contract:
The logger MUST implement the canonical (context, message) contract:
context: Structured data asRecord<string, unknown>(first parameter)message: Human-readable log message (second parameter, optional)
Pino is natively compatible. For Winston, use an adapter.
Log Levels:
info- Lifecycle events (start, stop)debug- External I/O operations (publish, subscribe)warn- Recoverable failures (timeouts, retries)error- Failures (with Error objects in{ err: error }format)
Tracing (OpenTelemetry)
The package creates OpenTelemetry spans for key operations. Tracing is passive - the @opentelemetry/api is a no-op by default.
To enable tracing, configure OpenTelemetry in your application:
import { NodeSDK } from '@opentelemetry/sdk-node';
const sdk = new NodeSDK({ /* config */ });
sdk.start();
// Spans from emmett-google-pubsub are now captured
const messageBus = getPubSubMessageBus({ pubsub });Notes:
- The package never initializes OpenTelemetry
- No tracing flags needed - spans are always created (no-op if SDK not initialized)
- Message types and payloads are never included in spans or logs
Compatibility
- Node.js: >= 18.0.0
- Emmett: ^0.39.0
- @google-cloud/pubsub: ^4.8.0
Contributing
Contributions are welcome! Please:
- Fork the repository
- Create a feature branch
- Add tests for new functionality
- Ensure all tests pass
- Submit a pull request
Development
# Install dependencies
npm install
# Build
npm run build
# Run tests
npm test
# Run unit tests only
npm run test:unit
# Run integration tests (in-memory)
npm run test:int
# Run E2E tests (requires Docker)
npm run test:e2e
# Lint
npm run lint
# Format
npm run formatLicense
MIT
Related Packages
- @event-driven-io/emmett - Core Emmett framework
- @emmett-community/emmett-google-firestore - Firestore event store
- @emmett-community/emmett-google-realtime-db - Realtime Database inline projections
- @event-driven-io/emmett-mongodb - MongoDB event store
Support
Made with ❤️ by the Emmett Community
