@emmett-community/emmett-google-firestore
v0.4.0
Published
Google Firestore event store implementation for Emmett
Downloads
467
Maintainers
Readme
@emmett-community/emmett-google-firestore
Google Firestore event store implementation for Emmett, the Node.js event sourcing framework.
Features
- ✅ Event Storage & Retrieval - Store and read events from Google Firestore
- ✅ Optimistic Concurrency - Built-in version conflict detection
- ✅ Type-Safe - Full TypeScript support with comprehensive types
- ✅ Minimal Boilerplate - Simple, intuitive API
- ✅ Subcollection-based - Efficient Firestore-native structure (no size limits!)
- ✅ Global Event Ordering - Maintain total ordering across all streams
- ✅ Testing Utilities - Helper functions for easy testing
- ✅ Emmett Compatible - Works seamlessly with the Emmett ecosystem
Installation
npm install @emmett-community/emmett-google-firestore @google-cloud/firestoreQuick Start
import { Firestore } from '@google-cloud/firestore';
import { getFirestoreEventStore } from '@emmett-community/emmett-google-firestore';
// Initialize Firestore
const firestore = new Firestore({
projectId: 'your-project-id',
keyFilename: 'path/to/service-account.json',
});
// Create event store
const eventStore = getFirestoreEventStore(firestore);
// Define your events
type UserRegistered = Event<'UserRegistered', { userId: string; email: string }>;
type UserEvent = UserRegistered | /* other events */;
// Append events
await eventStore.appendToStream('User-123', [
{
type: 'UserRegistered',
data: { userId: '123', email: '[email protected]' },
},
]);
// Read events
const events = await eventStore.readStream<UserEvent>('User-123');
// Aggregate state
const state = await eventStore.aggregateStream(
'User-123',
evolve,
initialState,
);How It Works
Firestore Structure
Events are stored using a subcollection pattern for optimal performance:
/streams/ # Root collection
{streamName}/ # Stream document (metadata)
version: number
createdAt: Timestamp
updatedAt: Timestamp
/events/ # Subcollection (actual events)
0000000000: { type, data, ... } # Zero-padded version IDs
0000000001: { type, data, ... }
0000000002: { type, data, ... }
/_counters/ # System collection
global_position/
value: numberBenefits of this structure:
- ✅ No document size limits (Firestore 1MB limit doesn't apply to subcollections)
- ✅ Natural isolation per stream
- ✅ Automatic ordering (document IDs sort naturally)
- ✅ No composite indexes needed
- ✅ Efficient queries
Optimistic Concurrency
The event store uses optimistic locking to prevent conflicts:
// Append with version check
await eventStore.appendToStream(
'User-123',
events,
{ expectedStreamVersion: 5 } // Will fail if version ≠ 5
);
// Or use special version markers
import { NO_STREAM, STREAM_EXISTS, ANY } from '@emmett-community/emmett-google-firestore';
// Stream must not exist
await eventStore.appendToStream('User-123', events, {
expectedStreamVersion: NO_STREAM
});
// Stream must exist (any version)
await eventStore.appendToStream('User-123', events, {
expectedStreamVersion: STREAM_EXISTS
});
// No version check
await eventStore.appendToStream('User-123', events, {
expectedStreamVersion: ANY
});API Reference
getFirestoreEventStore(firestore, options?)
Creates a Firestore event store instance.
Parameters:
firestore: Firestore instanceoptions: Optional configurationcollections: Custom collection namesstreams: Stream collection name (default:"streams")counters: Counter collection name (default:"_counters")
Returns: FirestoreEventStore
const eventStore = getFirestoreEventStore(firestore, {
collections: {
streams: 'my_streams',
counters: 'my_counters',
},
});eventStore.appendToStream(streamName, events, options?)
Appends events to a stream.
Parameters:
streamName: Stream identifier (e.g.,"User-123")events: Array of events to appendoptions: Optional append optionsexpectedStreamVersion: Version constraint
Returns: Promise<AppendToStreamResult>
const result = await eventStore.appendToStream(
'User-123',
[{ type: 'UserRegistered', data: {...} }],
{ expectedStreamVersion: 0 }
);
console.log(result.nextExpectedStreamVersion); // 1
console.log(result.createdNewStream); // true/falseeventStore.readStream(streamName, options?)
Reads events from a stream.
Parameters:
streamName: Stream identifieroptions: Optional read optionsfrom: Start version (inclusive)to: End version (inclusive)maxCount: Maximum number of events to read
Returns: Promise<FirestoreReadEvent[]>
// Read all events
const events = await eventStore.readStream('User-123');
// Read from version 10 onwards
const events = await eventStore.readStream('User-123', { from: 10n });
// Read range
const events = await eventStore.readStream('User-123', {
from: 5n,
to: 10n
});
// Limit results
const events = await eventStore.readStream('User-123', {
maxCount: 100
});eventStore.aggregateStream(streamName, evolve, initialState, options?)
Aggregates stream events into state.
Parameters:
streamName: Stream identifierevolve: Function to apply events to stateinitialState: Function returning initial stateoptions: Optional read options (same asreadStream)
Returns: Promise<State>
const state = await eventStore.aggregateStream(
'User-123',
(state, event) => {
switch (event.type) {
case 'UserRegistered':
return { ...state, ...event.data };
default:
return state;
}
},
() => ({ status: 'empty' }),
);Testing
Running Tests
# Unit tests
npm run test:unit
# Integration tests (in-memory)
npm run test:int
# E2E tests (Firestore Emulator via Testcontainers, requires Docker)
npm run test:e2e
# All tests
npm test
# Coverage
npm run test:coverageTest files live in test/ and are selected by filename suffix:
*.unit.spec.ts(unit tests, pure logic)*.int.spec.ts(integration tests, in-memory Firestore)*.e2e.spec.ts(E2E tests, Firestore emulator via Testcontainers)
Support fixtures live under test/support (including Firebase emulator configs in test/support/firebase).
Using Firestore Emulator
For local development and manual testing:
# Install Firebase CLI
npm install -g firebase-tools
# Start emulator
firebase emulators:start --only firestoreSet environment variables:
export FIRESTORE_PROJECT_ID=test-project
export FIRESTORE_EMULATOR_HOST=localhost:8080E2E tests start the emulator automatically via Testcontainers.
Examples
Complete Shopping Cart Example
See examples/shopping-cart for a full application including:
- Event-sourced shopping cart
- Express.js API with OpenAPI spec
- Docker Compose setup
- Unit, integration, and E2E tests
cd examples/shopping-cart
docker-compose upBasic Usage Example
import { Firestore } from '@google-cloud/firestore';
import { getFirestoreEventStore } from '@emmett-community/emmett-google-firestore';
import type { Event } from '@event-driven-io/emmett';
// Define events
type AccountOpened = Event<'AccountOpened', {
accountId: string;
initialBalance: number;
}>;
type MoneyDeposited = Event<'MoneyDeposited', {
accountId: string;
amount: number;
}>;
type BankAccountEvent = AccountOpened | MoneyDeposited;
// Define state
type BankAccount = {
accountId: string;
balance: number;
status: 'open' | 'closed';
};
// Evolve function
const evolve = (state: BankAccount, event: BankAccountEvent): BankAccount => {
switch (event.type) {
case 'AccountOpened':
return {
accountId: event.data.accountId,
balance: event.data.initialBalance,
status: 'open',
};
case 'MoneyDeposited':
return {
...state,
balance: state.balance + event.data.amount,
};
default:
return state;
}
};
const initialState = (): BankAccount => ({
accountId: '',
balance: 0,
status: 'closed',
});
// Usage
const firestore = new Firestore({ projectId: 'my-project' });
const eventStore = getFirestoreEventStore(firestore);
// Open account
await eventStore.appendToStream('BankAccount-123', [
{
type: 'AccountOpened',
data: { accountId: '123', initialBalance: 100 }
},
]);
// Deposit money
await eventStore.appendToStream('BankAccount-123', [
{
type: 'MoneyDeposited',
data: { accountId: '123', amount: 50 }
},
]);
// Get current state
const account = await eventStore.aggregateStream(
'BankAccount-123',
evolve,
initialState,
);
console.log(account.balance); // 150Configuration
Custom Collection Names
const eventStore = getFirestoreEventStore(firestore, {
collections: {
streams: 'app_streams',
counters: 'app_counters',
},
});Firestore Emulator (Development)
const firestore = new Firestore({
projectId: 'demo-project',
host: 'localhost:8080',
ssl: false,
});Production Configuration
const firestore = new Firestore({
projectId: process.env.GCP_PROJECT_ID,
keyFilename: process.env.GCP_KEY_FILE,
// Optional: specify database
databaseId: '(default)',
});Architecture
Event Sourcing Pattern
This package implements the Event Sourcing pattern:
- Commands → Validate and create events
- Events → Immutable facts that happened
- State → Rebuilt by replaying events
Command → Decide → Events → Append to Firestore → Evolve → StateFirestore Transaction Flow
When appending events, the following happens atomically:
- Read current stream version
- Validate expected version
- Increment global position counter
- Append events to subcollection
- Update stream metadata
- Commit transaction
If any step fails or versions don't match, the entire transaction is rolled back.
Performance Considerations
Batch Size
Firestore transactions are limited to 500 operations. When appending many events:
// Good: Small batches
await eventStore.appendToStream('stream', events.slice(0, 100));
// Avoid: Very large batches (>400 events)Query Optimization
// Good: Use range queries
const recent = await eventStore.readStream('stream', {
from: lastKnownVersion,
});
// Good: Limit results
const events = await eventStore.readStream('stream', {
maxCount: 100
});Firestore Costs
- Reads: Each document read counts (events + metadata)
- Writes: Each event appended counts
- Storage: Charged per GB stored
Use the emulator for development to avoid costs!
Error Handling
import { ExpectedVersionConflictError } from '@emmett-community/emmett-google-firestore';
try {
await eventStore.appendToStream('stream', events, {
expectedStreamVersion: 5
});
} catch (error) {
if (error instanceof ExpectedVersionConflictError) {
console.log('Version conflict:', error.expected, 'vs', error.actual);
// Handle conflict (retry, merge, etc.)
}
}Observability
The event store supports optional logging and OpenTelemetry tracing.
Logging
Pass an optional logger compatible with Pino:
import pino from 'pino';
const eventStore = getFirestoreEventStore(firestore, {
observability: {
logger: pino({ level: 'debug' }),
},
});Logging points:
info: Initializationdebug: I/O operations (queries, transactions)warn: Version conflicts (recoverable)error: Failures before rethrowing
Note: Event payloads are never logged.
Tracing
The package uses @opentelemetry/api directly. Spans are created passively:
- If your application initializes OpenTelemetry, spans will be recorded
- If not, the tracing calls are no-ops with zero overhead
Span names:
emmett.firestore.read_streamemmett.firestore.append_to_stream
Attributes:
emmett.stream_nameemmett.event_countemmett.new_versionemmett.created_new_stream
To enable tracing, initialize OpenTelemetry in your application (this package never initializes tracing itself).
TypeScript Support
The package is written in TypeScript and includes full type definitions:
import type {
FirestoreEventStore,
FirestoreReadEvent,
AppendToStreamOptions,
ExpectedStreamVersion,
} from '@emmett-community/emmett-google-firestore';Compatibility
- Node.js: >= 18.0.0
- Emmett: ^0.39.0
- Firestore: ^7.10.0
Contributing
Contributions are welcome! Please:
- Fork the repository
- Create a feature branch
- Add tests for new functionality
- Ensure all tests pass
- Submit a pull request
Development
# Install dependencies
npm install
# Build
npm run build
# Run tests
npm test
# Lint
npm run lint
# Format
npm run formatLicense
MIT © Emmett Community
Resources
Support
- Issues: GitHub Issues
- Discussions: GitHub Discussions
- Emmett Discord: Join Discord
Acknowledgments
- Built for the Emmett framework by Oskar Dudycz
- Inspired by emmett-mongodb
- Part of the Emmett Community
