nx-mongo
v4.0.0
Published
A lightweight, feature-rich MongoDB helper library for Node.js and TypeScript with pagination, transactions, aggregation, and more
Downloads
771
Maintainers
Readme
nx-mongo
Version: 4.0.0
🚀 Env-Ready Component (ERC 2.0)
This component supports zero-config initialization via environment variables using nx-config2.
A lightweight, feature-rich MongoDB helper library for Node.js and TypeScript. Provides a simple, intuitive API for common MongoDB operations with built-in retry logic, pagination, transactions, config-driven ref mapping, and signature-based deduplication.
Features
- ✅ Simple API - Easy-to-use methods for common MongoDB operations
- ✅ TypeScript Support - Full TypeScript support with type safety
- ✅ Connection Retry - Automatic retry with exponential backoff
- ✅ Automatic Cleanup - Connections automatically close on app exit (SIGINT, SIGTERM, etc.)
- ✅ Pagination - Built-in pagination support with metadata
- ✅ Transactions - Full transaction support for multi-operation consistency
- ✅ Aggregation - Complete aggregation pipeline support
- ✅ Index Management - Create, drop, and list indexes
- ✅ Count Operations - Accurate and estimated document counting
- ✅ Session Support - Transaction sessions for complex operations
- ✅ Config-driven Ref Mapping - Map application-level refs to MongoDB collections
- ✅ Signature-based Deduplication - Automatic duplicate prevention using document signatures
- ✅ Append/Replace Modes - Flexible write modes for data pipelines
Installation
npm install nx-mongoERC 2.0 Setup
Copy
.env.exampleto.env:cp node_modules/nx-mongo/.env.example .envFill in required values in
.env:MONGO_CONNECTION_STRING=mongodb://localhost:27017/Use with zero config:
const helper = new SimpleMongoHelper(); await helper.initialize();
Quick Start
Zero-Config Mode (ERC 2.0)
import { SimpleMongoHelper } from 'nx-mongo';
// Auto-discovers configuration from environment variables
// Set MONGO_CONNECTION_STRING or MONGODB_URI in your .env file
const helper = new SimpleMongoHelper();
// Initialize connection
await helper.initialize();Environment Variables:
MONGO_CONNECTION_STRINGorMONGODB_URI(required) - MongoDB connection stringMONGO_MAX_RETRIES(optional, default: 3) - Maximum retry attemptsMONGO_RETRY_DELAY(optional, default: 1000) - Initial retry delay in millisecondsMONGO_EXPONENTIAL_BACKOFF(optional, default: true) - Use exponential backoff
See .env.example for the complete list of required and optional variables with descriptions.
Advanced Mode (Programmatic Configuration)
import { SimpleMongoHelper } from 'nx-mongo';
// Explicit configuration (bypasses auto-discovery)
const helper = new SimpleMongoHelper('mongodb://localhost:27017/', {
maxRetries: 5,
retryDelay: 2000
});
// Or use config object
const helper = new SimpleMongoHelper({
connectionString: 'mongodb://localhost:27017/',
retryOptions: {
maxRetries: 5,
retryDelay: 2000
}
});
// Initialize connection
await helper.initialize();Legacy Mode (Backward Compatible)
import { SimpleMongoHelper } from 'nx-mongo';
// Connection string: database name is ignored/stripped automatically
// Use base connection string: mongodb://localhost:27017/
const helper = new SimpleMongoHelper('mongodb://localhost:27017/');
// Initialize connection
await helper.initialize();
// Insert a document (defaults to 'admin' database)
await helper.insert('users', {
name: 'John Doe',
email: '[email protected]',
age: 30
});
// Insert into a specific database
await helper.insert('users', {
name: 'Jane Doe',
email: '[email protected]',
age: 28
}, {}, 'mydb'); // Specify database name
// Find documents from 'admin' database (default)
const users = await helper.loadCollection('users');
// Find documents from specific database
const mydbUsers = await helper.loadCollection('users', {}, undefined, 'mydb');
// Find one document
const user = await helper.findOne('users', { email: '[email protected]' });
// Find from specific database
const mydbUser = await helper.findOne('users', { email: '[email protected]' }, undefined, 'mydb');
// Update document
await helper.update(
'users',
{ email: '[email protected]' },
{ $set: { age: 31 } }
);
// Update in specific database
await helper.update(
'users',
{ email: '[email protected]' },
{ $set: { age: 29 } },
undefined,
'mydb'
);
// Delete document
await helper.delete('users', { email: '[email protected]' });
// Disconnect
await helper.disconnect();Note: The connection string database name (if present) is automatically stripped. All operations default to the 'admin' database unless you specify a different database name as the last parameter.
API Reference
Constructor
// Zero-Config Mode (ERC 2.0)
new SimpleMongoHelper()
// Advanced Mode (Config Object)
new SimpleMongoHelper(config: SimpleMongoHelperConfig)
// Legacy Mode (Backward Compatible)
new SimpleMongoHelper(connectionString: string, retryOptions?: RetryOptions, config?: HelperConfig)Parameters:
Zero-Config Mode:
- No parameters - auto-discovers from environment variables
Advanced Mode (Config Object):
config.connectionString(optional) - MongoDB connection string (defaults toMONGO_CONNECTION_STRINGorMONGODB_URIenv var)config.retryOptions(optional) - Retry configurationmaxRetries?: number- Maximum retry attempts (default: 3, orMONGO_MAX_RETRIESenv var)retryDelay?: number- Initial retry delay in ms (default: 1000, orMONGO_RETRY_DELAYenv var)exponentialBackoff?: boolean- Use exponential backoff (default: true, orMONGO_EXPONENTIAL_BACKOFFenv var)
config.config(optional) - HelperConfig for ref-based operations
Legacy Mode:
connectionString- MongoDB base connection string (database name is automatically stripped if present)- Example:
'mongodb://localhost:27017/'or'mongodb://localhost:27017/admin'(both work the same)
- Example:
retryOptions(optional) - Retry configurationconfig(optional) - HelperConfig for ref-based operations
Examples:
// Zero-Config Mode (ERC 2.0)
const helper = new SimpleMongoHelper(); // Uses MONGO_CONNECTION_STRING from env
// Advanced Mode
const helper = new SimpleMongoHelper({
connectionString: 'mongodb://localhost:27017/',
retryOptions: { maxRetries: 5, retryDelay: 2000 }
});
// Legacy Mode (still supported)
const helper = new SimpleMongoHelper(
'mongodb://localhost:27017/',
{ maxRetries: 5, retryDelay: 2000 }
);Important: The database name in the connection string is automatically stripped. All operations default to the 'admin' database unless you specify a different database name per operation.
Connection Methods
testConnection(): Promise<{ success: boolean; error?: { type: string; message: string; details?: string } }>
Tests the MongoDB connection and returns detailed error information if it fails. This method does not establish a persistent connection - use initialize() for that.
Returns:
success: boolean- Whether the connection test succeedederror?: object- Error details if connection failedtype- Error type:'missing_credentials' | 'invalid_connection_string' | 'connection_failed' | 'authentication_failed' | 'config_error' | 'unknown'message- Human-readable error messagedetails- Detailed error information and troubleshooting tips
Example:
const result = await helper.testConnection();
if (!result.success) {
console.error('Connection test failed!');
console.error('Error Type:', result.error?.type);
console.error('Error Message:', result.error?.message);
console.error('Error Details:', result.error?.details);
// Handle error based on type
switch (result.error?.type) {
case 'connection_failed':
console.error('Cannot connect to MongoDB server. Check if server is running.');
// On Windows, try using 127.0.0.1 instead of localhost
break;
case 'authentication_failed':
console.error('Invalid credentials. Check username and password.');
break;
case 'invalid_connection_string':
console.error('Connection string format is invalid.');
break;
default:
console.error('Unknown error occurred.');
}
} else {
console.log('Connection test passed!');
await helper.initialize();
}Error Types:
missing_credentials- Username or password missing in connection stringinvalid_connection_string- Connection string format is invalidconnection_failed- Cannot reach MongoDB server (timeout, DNS, network, etc.)authentication_failed- Invalid credentials or insufficient permissionsconfig_error- Configuration issuesunknown- Unexpected error
Troubleshooting Tips:
- Windows users: If using
localhostfails, try127.0.0.1instead (e.g.,mongodb://127.0.0.1:27017/) to avoid IPv6 resolution issues - Connection timeout: Verify MongoDB is running and accessible on the specified host and port
- Connection refused: Check if MongoDB is listening on the correct port (default: 27017)
- Authentication failed: Verify username and password in the connection string
initialize(): Promise<void>
Establishes MongoDB connection with automatic retry logic. Must be called before using other methods.
await helper.initialize();disconnect(): Promise<void>
Closes the MongoDB connection and cleans up resources. Note: Connections are automatically closed when your application exits (handles SIGINT, SIGTERM, and beforeExit events), so manual disconnection is optional but recommended for explicit cleanup.
await helper.disconnect();Automatic Cleanup:
- Connections are automatically closed when the Node.js process receives
SIGINT(Ctrl+C) orSIGTERMsignals - All
SimpleMongoHelperinstances are cleaned up in parallel with a 5-second timeout - Multiple instances are handled gracefully through a global registry
- Manual
disconnect()is still recommended for explicit cleanup in your code
Query Methods
loadCollection<T>(collectionName: string, query?: Filter<T>, options?: PaginationOptions, database?: string): Promise<WithId<T>[] | PaginatedResult<T>>
Loads documents from a collection with optional query filter and pagination.
Parameters:
collectionName- Name of the collectionquery(optional) - MongoDB query filteroptions(optional) - Pagination and sorting optionspage?: number- Page number (1-indexed)limit?: number- Documents per pagesort?: Sort- Sort specification
database(optional) - Database name (defaults to'admin')
Returns:
- Without pagination:
WithId<T>[] - With pagination:
PaginatedResult<T>with metadata
Examples:
// Load all documents from 'admin' database (default)
const allUsers = await helper.loadCollection('users');
// Load from specific database
const mydbUsers = await helper.loadCollection('users', {}, undefined, 'mydb');
// Load with query
const activeUsers = await helper.loadCollection('users', { active: true });
// Load with pagination
const result = await helper.loadCollection('users', {}, {
page: 1,
limit: 10,
sort: { createdAt: -1 }
});
// result.data - array of documents
// result.total - total count
// result.page - current page
// result.totalPages - total pages
// result.hasNext - has next page
// result.hasPrev - has previous page
// Load with pagination from specific database
const mydbResult = await helper.loadCollection('users', {}, {
page: 1,
limit: 10,
sort: { createdAt: -1 }
}, 'mydb');findOne<T>(collectionName: string, query: Filter<T>, options?: { sort?: Sort; projection?: Document }, database?: string): Promise<WithId<T> | null>
Finds a single document in a collection.
Parameters:
collectionName- Name of the collectionquery- MongoDB query filteroptions(optional) - Find optionssort?: Sort- Sort specificationprojection?: Document- Field projection
database(optional) - Database name (defaults to'admin')
Example:
const user = await helper.findOne('users', { email: '[email protected]' });
const latestUser = await helper.findOne('users', {}, { sort: { createdAt: -1 } });Insert Methods
insert<T>(collectionName: string, data: T | T[], options?: { session?: ClientSession }, database?: string): Promise<any>
Inserts one or more documents into a collection.
Parameters:
collectionName- Name of the collectiondata- Single document or array of documentsoptions(optional) - Insert optionssession?: ClientSession- Transaction session
Examples:
// Insert single document
await helper.insert('users', {
name: 'John Doe',
email: '[email protected]'
});
// Insert multiple documents
await helper.insert('users', [
{ name: 'John', email: '[email protected]' },
{ name: 'Jane', email: '[email protected]' }
]);
// Insert within transaction
const session = helper.startSession();
await session.withTransaction(async () => {
await helper.insert('users', { name: 'John' }, { session });
});Update Methods
update<T>(collectionName: string, filter: Filter<T>, updateData: UpdateFilter<T>, options?: { upsert?: boolean; multi?: boolean; session?: ClientSession }, database?: string): Promise<any>
Updates documents in a collection.
Parameters:
collectionName- Name of the collectionfilter- MongoDB query filterupdateData- Update operationsoptions(optional) - Update optionsupsert?: boolean- Create if not existsmulti?: boolean- Update multiple documents (default: false)session?: ClientSession- Transaction session
Examples:
// Update single document
await helper.update(
'users',
{ email: '[email protected]' },
{ $set: { age: 31 } }
);
// Update multiple documents
await helper.update(
'users',
{ role: 'user' },
{ $set: { lastLogin: new Date() } },
{ multi: true }
);
// Upsert (create if not exists)
await helper.update(
'users',
{ email: '[email protected]' },
{ $set: { name: 'John Doe', email: '[email protected]' } },
{ upsert: true }
);Delete Methods
delete<T>(collectionName: string, filter: Filter<T>, options?: { multi?: boolean }, database?: string): Promise<any>
Deletes documents from a collection.
Parameters:
collectionName- Name of the collectionfilter- MongoDB query filteroptions(optional) - Delete optionsmulti?: boolean- Delete multiple documents (default: false)
Examples:
// Delete single document
await helper.delete('users', { email: '[email protected]' });
// Delete multiple documents
await helper.delete('users', { role: 'guest' }, { multi: true });Collection Merge Methods
mergeCollections(options: MergeCollectionsOptions): Promise<MergeCollectionsResult>
Merges two collections into a new target collection using various strategies (index-based, key-based, or composite-key). Useful for combining original records with assessment results or joining related data.
Parameters:
sourceCollection1- Name of first source collection (e.g., original records)sourceCollection2- Name of second source collection (e.g., assessment results)targetCollection- Name of target collection for merged resultsstrategy- Merge strategy:'index' | 'key' | 'composite'key- (For 'key' strategy) Field name to match on (supports dot notation)compositeKeys- (For 'composite' strategy) Array of field names for composite key matchingjoinType- (For 'key' and 'composite' strategies) SQL-style join type:'inner' | 'left' | 'right' | 'outer'(optional, overrides onUnmatched flags)fieldPrefix1- Prefix for fields from collection 1 (default: 'record')fieldPrefix2- Prefix for fields from collection 2 (default: 'assessment')includeIndex- Include original index in merged document (default: true for index strategy)onUnmatched1- (Deprecated: usejoinTypeinstead) What to do with unmatched records from collection 1: 'include' | 'skip' (default: 'include')onUnmatched2- (Deprecated: usejoinTypeinstead) What to do with unmatched records from collection 2: 'include' | 'skip' (default: 'include')session- Optional transaction sessiondatabase- Optional database name (defaults to'admin')
Returns:
interface MergeCollectionsResult {
merged: number; // Total merged documents
unmatched1: number; // Unmatched documents from collection 1
unmatched2: number; // Unmatched documents from collection 2
errors: Array<{ index: number; error: Error; doc?: any }>;
}Strategies:
Index-based (
strategy: 'index'): Merges by array position. Assumes both collections are in the same order.const result = await helper.mergeCollections({ sourceCollection1: 'original_records', sourceCollection2: 'assessments', targetCollection: 'merged_results', strategy: 'index', fieldPrefix1: 'record', fieldPrefix2: 'assessment', includeIndex: true }); // Result: { recordIndex: 0, record: {...}, assessment: {...} }Key-based (
strategy: 'key'): Merges by matching a single unique field. Supports SQL-style join types.// INNER JOIN - Only matched records const result = await helper.mergeCollections({ sourceCollection1: 'applications', sourceCollection2: 'assessments', targetCollection: 'merged', strategy: 'key', key: 'id', joinType: 'inner' // Only records with matching assessments }); // LEFT JOIN - All records, with assessments where available const result = await helper.mergeCollections({ sourceCollection1: 'applications', sourceCollection2: 'assessments', targetCollection: 'merged', strategy: 'key', key: 'id', joinType: 'left' // All apps, null assessment if no match }); // RIGHT JOIN - All assessments, with records where available const result = await helper.mergeCollections({ sourceCollection1: 'applications', sourceCollection2: 'assessments', targetCollection: 'merged', strategy: 'key', key: 'id', joinType: 'right' // All assessments, null record if no match }); // FULL OUTER JOIN - Everything from both sides const result = await helper.mergeCollections({ sourceCollection1: 'applications', sourceCollection2: 'assessments', targetCollection: 'merged', strategy: 'key', key: 'id', joinType: 'outer' // All apps and all assessments });Composite-key (
strategy: 'composite'): Merges by matching multiple fields (e.g., name + ports + zones). Also supports join types.const result = await helper.mergeCollections({ sourceCollection1: 'original_records', sourceCollection2: 'assessments', targetCollection: 'merged', strategy: 'composite', compositeKeys: ['name', 'ports[]', 'zones[]'], // Arrays are sorted for matching joinType: 'left', // All records, assessments where match fieldPrefix1: 'record', fieldPrefix2: 'assessment' });
SQL-Style Join Types:
'inner'- INNER JOIN: Returns only records that have matches in both collections'left'- LEFT JOIN: Returns all records from collection 1, with matching records from collection 2 (null if no match)'right'- RIGHT JOIN: Returns all records from collection 2, with matching records from collection 1 (null if no match)'outer'- FULL OUTER JOIN: Returns all records from both collections, matching where possible
Multiple Matches: When a key appears multiple times in collection 2, the merge creates multiple rows (one per match), just like SQL joins. For example, if "app1" has 2 assessments, you'll get 2 merged rows.
Examples:
// Index-based merge (fast but requires same order)
const result1 = await helper.mergeCollections({
sourceCollection1: 'records',
sourceCollection2: 'assessments',
targetCollection: 'merged',
strategy: 'index'
});
console.log(`Merged ${result1.merged} documents, ${result1.unmatched1} unmatched from collection 1`);
// INNER JOIN - Only complete records (both sides matched)
const result2 = await helper.mergeCollections({
sourceCollection1: 'apps',
sourceCollection2: 'assessments',
targetCollection: 'merged',
strategy: 'key',
key: 'appId',
joinType: 'inner' // Only apps that have assessments
});
// LEFT JOIN - All apps, assessments where available
const result3 = await helper.mergeCollections({
sourceCollection1: 'apps',
sourceCollection2: 'assessments',
targetCollection: 'merged',
strategy: 'key',
key: 'appId',
joinType: 'left' // All apps, null assessment if no match
});
// RIGHT JOIN - All assessments, apps where available
const result4 = await helper.mergeCollections({
sourceCollection1: 'apps',
sourceCollection2: 'assessments',
targetCollection: 'merged',
strategy: 'key',
key: 'appId',
joinType: 'right' // All assessments, null app if no match
});
// FULL OUTER JOIN - Everything from both sides
const result5 = await helper.mergeCollections({
sourceCollection1: 'apps',
sourceCollection2: 'assessments',
targetCollection: 'merged',
strategy: 'key',
key: 'appId',
joinType: 'outer' // All apps and all assessments
});
// Composite-key merge with LEFT JOIN
const result6 = await helper.mergeCollections({
sourceCollection1: 'original',
sourceCollection2: 'assessments',
targetCollection: 'merged',
strategy: 'composite',
compositeKeys: ['name', 'ports[]', 'zones[]'],
joinType: 'left',
fieldPrefix1: 'record',
fieldPrefix2: 'assessment',
includeIndex: true
});
// Handling multiple matches (one app, multiple assessments)
// If "app1" has 2 assessments, you'll get 2 merged rows:
// - { record: {id: 1, name: "app1"}, assessment: {appId: 1, risk: "high"} }
// - { record: {id: 1, name: "app1"}, assessment: {appId: 1, risk: "medium"} }Notes:
- Index-based merging is fast but fragile if collections are reordered
- Key-based merging is safer and recommended when you have unique identifiers
- Composite-key merging handles cases where no single unique field exists
- SQL-style join types (
inner,left,right,outer) provide explicit control over unmatched records - Multiple matches create multiple rows (SQL-style) - if a key has duplicates, you get one row per match
- Array fields in composite keys are automatically sorted for consistent matching
- Supports dot notation for nested fields (e.g.,
'meta.id','ports[]') - Transaction support available via
sessionoption - Legacy
onUnmatched1/onUnmatched2flags still work but are deprecated in favor ofjoinType
Count Methods
countDocuments<T>(collectionName: string, query?: Filter<T>, database?: string): Promise<number>
Counts documents matching a query (accurate count).
Parameters:
collectionName- Name of the collectionquery(optional) - MongoDB query filterdatabase(optional) - Database name (defaults to'admin')
Example:
const userCount = await helper.countDocuments('users');
const activeUserCount = await helper.countDocuments('users', { active: true });estimatedDocumentCount(collectionName: string): Promise<number>
Gets estimated document count (faster but less accurate).
Example:
const estimatedCount = await helper.estimatedDocumentCount('users');Aggregation Methods
aggregate<T>(collectionName: string, pipeline: Document[], database?: string): Promise<T[]>
Runs an aggregation pipeline on a collection.
Parameters:
collectionName- Name of the collectionpipeline- Array of aggregation pipeline stagesdatabase(optional) - Database name (defaults to'admin')
Example:
const result = await helper.aggregate('orders', [
{ $match: { status: 'completed' } },
{ $group: {
_id: '$customerId',
total: { $sum: '$amount' },
count: { $sum: 1 }
}},
{ $sort: { total: -1 } }
]);Index Methods
createIndex(collectionName: string, indexSpec: IndexSpecification, options?: CreateIndexesOptions, database?: string): Promise<string>
Creates an index on a collection.
Parameters:
collectionName- Name of the collectionindexSpec- Index specificationoptions(optional) - Index creation optionsdatabase(optional) - Database name (defaults to'admin')
Example:
// Simple index
await helper.createIndex('users', { email: 1 });
// Unique index
await helper.createIndex('users', { email: 1 }, { unique: true });
// Compound index
await helper.createIndex('users', { email: 1, createdAt: -1 });dropIndex(collectionName: string, indexName: string, database?: string): Promise<any>
Drops an index from a collection.
Parameters:
collectionName- Name of the collectionindexName- Name of the index to dropdatabase(optional) - Database name (defaults to'admin')
Example:
await helper.dropIndex('users', 'email_1');listIndexes(collectionName: string, database?: string): Promise<Document[]>
Lists all indexes on a collection.
Parameters:
collectionName- Name of the collectiondatabase(optional) - Database name (defaults to'admin')
Example:
const indexes = await helper.listIndexes('users');
indexes.forEach(idx => console.log(idx.name));Transaction Methods
startSession(): ClientSession
Starts a new client session for transactions.
Example:
const session = helper.startSession();withTransaction<T>(callback: (session: ClientSession) => Promise<T>): Promise<T>
Executes a function within a transaction.
Example:
await helper.withTransaction(async (session) => {
await helper.insert('users', { name: 'John' }, { session });
await helper.update('accounts', { userId: '123' }, { $inc: { balance: 100 } }, { session });
return 'Transaction completed';
});Note: Transactions require a MongoDB replica set or sharded cluster.
Config-driven Ref Mapping and Signature-based Deduplication
Overview
The helper supports config-driven collection mapping and signature-based deduplication. All logic (queries, keys, hashing, append/replace) is generic and built into the helper - applications only pass refs and documents.
Configuration Schema
interface HelperConfig {
inputs: Array<{
ref: string; // Application-level reference name
collection: string; // MongoDB collection name
query?: Filter<any>; // Optional MongoDB query filter
}>;
outputs: Array<{
ref: string; // Application-level reference name
collection: string; // MongoDB collection name
keys?: string[]; // Optional: dot-paths for signature generation
mode?: "append" | "replace"; // Optional: write mode (default from global)
}>;
output?: {
mode?: "append" | "replace"; // Global default mode (default: "append")
};
progress?: {
collection?: string; // Progress collection name (default: "progress_states")
uniqueIndexKeys?: string[]; // Unique index keys (default: ["provider","key"])
provider?: string; // Default provider namespace for this helper instance
};
databases?: Array<{
ref: string; // Reference identifier
type: string; // Type identifier
database: string; // Database name to use
}>;
}Example Configuration:
const config = {
inputs: [
{ ref: "topology", collection: "topology-definition-neo-data", query: {} },
{ ref: "vulnerabilities", collection: "vulnerabilities-data", query: { severity: { "$in": ["high","critical"] } } },
{ ref: "entities", collection: "entities-data" },
{ ref: "crownJewels", collection: "entities-data", query: { type: "crown_jewel" } }
],
outputs: [
{ ref: "paths", collection: "paths-neo-data", keys: ["segments[]","edges[].from","edges[].to","target_role"], mode: "append" },
{ ref: "prioritizedPaths", collection: "prioritized_paths-neo-data", keys: ["segments[]","outside","contains_crown_jewel"], mode: "replace" },
{ ref: "assetPaths", collection: "asset_paths-neo-data", keys: ["asset_ip","segments[]"], mode: "append" }
],
output: { mode: "append" }
};Constructor with Config
new SimpleMongoHelper(connectionString: string, retryOptions?: RetryOptions, config?: HelperConfig)Example:
const helper = new SimpleMongoHelper(
'mongodb://localhost:27017/my-db',
{ maxRetries: 5 },
config
);Config Methods
useConfig(config: HelperConfig): this
Sets or updates the configuration for ref-based operations.
helper.useConfig(config);Database Selection via Ref/Type Map
The helper supports config-driven database selection using ref and type parameters. This allows you to map logical identifiers to database names without hardcoding them in your application code.
Configuration:
const config = {
// ... inputs, outputs, etc.
databases: [
{ ref: "app1", type: "production", database: "app1_prod" },
{ ref: "app1", type: "staging", database: "app1_staging" },
{ ref: "app2", type: "production", database: "app2_prod" },
{ ref: "app2", type: "staging", database: "app2_staging" },
]
};Usage in CRUD Operations:
All CRUD operations now support optional ref and type parameters for automatic database resolution:
// Priority 1: Direct database parameter (highest priority)
await helper.insert('users', { name: 'John' }, {}, 'mydb');
// Priority 2: Using ref + type (exact match)
await helper.insert('users', { name: 'John' }, {}, undefined, 'app1', 'production');
// Resolves to 'app1_prod' database
// Priority 3: Using ref alone (must have exactly one match)
await helper.insert('users', { name: 'John' }, {}, undefined, 'app1');
// Throws error if multiple matches found
// Priority 4: Using type alone (must have exactly one match)
await helper.insert('users', { name: 'John' }, {}, undefined, undefined, 'production');
// Throws error if multiple matches foundDatabase Resolution Priority:
- Direct
databaseparameter - If provided, it's used immediately (highest priority) ref+type- If both provided, finds exact match in databases maprefalone - If only ref provided, finds entries matching ref (must be exactly one)typealone - If only type provided, finds entries matching type (must be exactly one)- Default - If none provided, defaults to
'admin'database
Error Handling:
- If no match found: throws error with descriptive message
- If multiple matches found: throws error suggesting to use additional parameter to narrow down
Example:
const config = {
databases: [
{ ref: "tenant1", type: "prod", database: "tenant1_prod" },
{ ref: "tenant1", type: "dev", database: "tenant1_dev" },
{ ref: "tenant2", type: "prod", database: "tenant2_prod" },
]
};
const helper = new SimpleMongoHelper('mongodb://localhost:27017/', undefined, config);
await helper.initialize();
// Use ref + type for exact match
await helper.insert('users', { name: 'John' }, {}, undefined, 'tenant1', 'prod');
// Uses 'tenant1_prod' database
// Use ref alone (only works if exactly one match)
// This would throw error because tenant1 has 2 matches (prod and dev)
// await helper.insert('users', { name: 'John' }, {}, undefined, 'tenant1');
// Use type alone (only works if exactly one match)
// This would throw error because 'prod' has 2 matches (tenant1 and tenant2)
// await helper.insert('users', { name: 'John' }, {}, undefined, undefined, 'prod');Ref-based Operations
loadByRef<T>(ref: string, options?: PaginationOptions & { session?: ClientSession; database?: string; ref?: string; type?: string }): Promise<WithId<T>[] | PaginatedResult<T>>
Loads data from a collection using a ref name from the configuration.
Parameters:
ref- Application-level reference name (must exist in config.inputs)options(optional) - Pagination and session optionspage?: number- Page number (1-indexed)limit?: number- Documents per pagesort?: Sort- Sort specificationsession?: ClientSession- Transaction sessiondatabase?: string- Database name (defaults to'admin')ref?: string- Optional ref for database resolutiontype?: string- Optional type for database resolution
Example:
// Load using ref (applies query automatically)
const topology = await helper.loadByRef('topology');
const vulns = await helper.loadByRef('vulnerabilities');
// With pagination
const result = await helper.loadByRef('topology', {
page: 1,
limit: 10,
sort: { createdAt: -1 }
});writeByRef(ref: string, documents: any[], options?: { session?: ClientSession; ensureIndex?: boolean; database?: string; ref?: string; type?: string }): Promise<WriteByRefResult>
Writes documents to a collection using a ref name from the configuration. Supports signature-based deduplication and append/replace modes.
Parameters:
ref- Application-level reference name (must exist in config.outputs)documents- Array of documents to writeoptions(optional) - Write optionssession?: ClientSession- Transaction sessionensureIndex?: boolean- Whether to ensure signature index exists (default: true)database?: string- Database name (defaults to'admin')ref?: string- Optional ref for database resolutiontype?: string- Optional type for database resolution
Returns:
interface WriteByRefResult {
inserted: number;
updated: number;
errors: Array<{ index: number; error: Error; doc?: any }>;
indexCreated: boolean;
}Example:
// Write using ref (automatic deduplication, uses keys from config)
const result = await helper.writeByRef('paths', pathDocuments);
console.log(`Inserted: ${result.inserted}, Updated: ${result.updated}`);
console.log(`Index created: ${result.indexCreated}`);
// Replace mode (clears collection first)
await helper.writeByRef('prioritizedPaths', prioritizedDocs);writeStage(ref: string, documents: any[], options?: WriteStageOptions): Promise<WriteStageResult>
Writes documents to a collection and optionally marks a stage as complete atomically. See the Progress Tracking section for details and examples.
Parameters:
ref- Application-level reference name (must exist in config.outputs)documents- Array of documents to writeoptions(optional) - Write and completion optionssession?: ClientSession- Transaction sessionensureIndex?: boolean- Whether to ensure signature index exists (default: true)database?: string- Database name (defaults to'admin')complete?: object- Stage completion information (optional)
Example:
// Write and mark stage complete in one call
await helper.writeStage('tier1', documents, {
complete: {
key: 'tier1',
process: 'processA',
name: 'System Inventory',
provider: 'nessus',
metadata: { itemCount: documents.length }
}
});Signature Index Management
ensureSignatureIndex(collectionName: string, options?: { fieldName?: string; unique?: boolean }): Promise<EnsureSignatureIndexResult>
Ensures a unique index exists on the signature field for signature-based deduplication.
Parameters:
collectionName- Name of the collectionoptions(optional) - Index configurationfieldName?: string- Field name for signature (default: "_sig")unique?: boolean- Whether index should be unique (default: true)
Returns:
interface EnsureSignatureIndexResult {
created: boolean;
indexName: string;
}Example:
const result = await helper.ensureSignatureIndex('paths-neo-data');
console.log(`Index created: ${result.created}, Name: ${result.indexName}`);Progress Tracking
Overview
The helper provides built-in support for tracking provider-defined pipeline stages. This enables applications to:
- Track completion status of different stages (e.g., "tier1", "tier2", "enrichment")
- Skip already-completed stages on resumption
- Atomically write documents and mark stages complete
- Support multi-provider databases with provider namespaces
Configuration
Progress tracking is configured via the progress option in HelperConfig:
const config = {
// ... inputs and outputs
progress: {
collection: "progress_states", // Optional: default "progress_states"
uniqueIndexKeys: ["process", "provider", "key"], // Optional: default ["process","provider","key"]
provider: "nessus" // Optional: default provider for this instance
}
};Progress API
The progress API is available via helper.progress:
isCompleted(key: string, options?: { process?: string; provider?: string; session?: ClientSession }): Promise<boolean>
Checks if a stage is completed. Stages are scoped by process, so the same key can exist in different processes.
Example:
// Check stage in a specific process
if (await helper.progress.isCompleted('tier1', { process: 'processA', provider: 'nessus' })) {
console.log('Stage "tier1" in processA already completed, skipping...');
}
// Same key, different process
if (await helper.progress.isCompleted('tier1', { process: 'processB', provider: 'nessus' })) {
console.log('Stage "tier1" in processB already completed, skipping...');
}start(identity: StageIdentity, options?: { session?: ClientSession }): Promise<void>
Marks a stage as started. Idempotent - safe to call multiple times. Stages are scoped by process.
Example:
await helper.progress.start({
key: 'tier1',
process: 'processA',
name: 'System Inventory',
provider: 'nessus'
});complete(identity: StageIdentity & { metadata?: StageMetadata }, options?: { session?: ClientSession }): Promise<void>
Marks a stage as completed with optional metadata. Idempotent - safe to call multiple times. Stages are scoped by process.
Example:
await helper.progress.complete({
key: 'tier1',
process: 'processA',
name: 'System Inventory',
provider: 'nessus',
metadata: {
itemCount: 150,
durationMs: 5000
}
});getCompleted(options?: { process?: string; provider?: string; session?: ClientSession }): Promise<Array<{ key: string; name?: string; completedAt?: Date }>>
Gets a list of all completed stages, optionally filtered by process and/or provider.
Example:
// Get all completed stages for a specific process
const completed = await helper.progress.getCompleted({ process: 'processA', provider: 'nessus' });
// → [{ key: 'tier1', name: 'System Inventory', completedAt: Date }, ...]
// Get all completed stages across all processes for a provider
const allCompleted = await helper.progress.getCompleted({ provider: 'nessus' });getProgress(options?: { process?: string; provider?: string; session?: ClientSession }): Promise<StageRecord[]>
Gets all stage records (both completed and in-progress), optionally filtered by process and/or provider.
Example:
// Get all stages for a specific process
const allStages = await helper.progress.getProgress({ process: 'processA', provider: 'nessus' });
// Get all stages for a provider across all processes
const allProviderStages = await helper.progress.getProgress({ provider: 'nessus' });reset(key: string, options?: { process?: string; provider?: string; session?: ClientSession }): Promise<void>
Resets a stage to not-started state (clears completion status). Stages are scoped by process.
Example:
await helper.progress.reset('tier1', { process: 'processA', provider: 'nessus' });Stage-Aware Writes
writeStage(ref: string, documents: any[], options?: WriteStageOptions): Promise<WriteStageResult>
Writes documents to a collection and optionally marks a stage as complete in a single call. If a session is provided, both operations are atomic within the transaction.
Parameters:
ref- Application-level reference name (must exist in config.outputs)documents- Array of documents to writeoptions(optional) - Write and completion optionsensureIndex?: boolean- Whether to ensure signature index exists (default: true)session?: ClientSession- Transaction session (makes write and complete atomic)complete?: { key: string; name?: string; provider?: string; metadata?: StageMetadata }- Stage completion info
Returns:
interface WriteStageResult extends WriteByRefResult {
completed?: boolean; // true if stage was marked complete
}Examples:
// Skip completed stages, then save-and-complete in one call
const processName = 'processA';
if (!force && (await helper.progress.isCompleted('tier1', { process: processName, provider: 'nessus' }))) {
console.log('Skipping stage "tier1" in processA');
} else {
const docs = [
{ type: 'server_status', ...status },
...scanners.map(s => ({ type: 'scanner', ...s }))
];
await helper.writeStage('tier1', docs, {
complete: {
key: 'tier1',
process: processName,
name: 'System Inventory',
provider: 'nessus',
metadata: { itemCount: docs.length }
}
});
}
// Transactional multi-write with explicit completion
const session = helper.startSession();
try {
await session.withTransaction(async () => {
await helper.writeByRef('tier2_scans', scans, { session });
await helper.writeByRef('tier2_hosts', hosts, { session });
await helper.progress.complete({
key: 'tier2',
process: 'processA',
name: 'Scan Inventory',
provider: 'nessus',
metadata: { itemCount: hosts.length }
}, { session });
});
} finally {
await session.endSession();
}Usage Patterns
Resumption Pattern
const processName = 'processA';
const stages = ['tier1', 'tier2', 'tier3'];
for (const stageKey of stages) {
if (await helper.progress.isCompleted(stageKey, { process: processName, provider: 'nessus' })) {
console.log(`Skipping completed stage: ${stageKey} in ${processName}`);
continue;
}
await helper.progress.start({ key: stageKey, process: processName, provider: 'nessus' });
try {
const docs = await processStage(stageKey);
await helper.writeStage(`ref_${stageKey}`, docs, {
complete: { key: stageKey, process: processName, provider: 'nessus' }
});
} catch (error) {
console.error(`Stage ${stageKey} in ${processName} failed:`, error);
// Stage remains incomplete, can be retried
}
}
// Different process can have same stage keys independently
const processB = 'processB';
if (!await helper.progress.isCompleted('tier1', { process: processB, provider: 'nessus' })) {
// Process B's tier1 is independent from Process A's tier1
await helper.progress.start({ key: 'tier1', process: processB, provider: 'nessus' });
}Utility Functions
getByDotPath(value: any, path: string): any[]
Extracts values from an object using dot-notation paths with array wildcard support.
Parameters:
value- The object to extract values frompath- Dot-notation path (e.g., "meta.id", "edges[].from", "segments[]")
Returns: Array of extracted values (flattened and deduplicated for arrays)
Examples:
import { getByDotPath } from 'nx-mongo';
// Simple path
getByDotPath({ meta: { id: "123" } }, "meta.id"); // ["123"]
// Array wildcard
getByDotPath({ segments: [1, 2, 3] }, "segments[]"); // [1, 2, 3]
// Nested array access
getByDotPath({ edges: [{ from: "A" }, { from: "B" }] }, "edges[].from"); // ["A", "B"]computeSignature(doc: any, keys: string[], options?: { algorithm?: "sha256" | "sha1" | "md5" }): string
Computes a deterministic signature for a document based on specified keys.
Parameters:
doc- The document to compute signature forkeys- Array of dot-notation paths to extract values fromoptions(optional) - Configurationalgorithm?: "sha256" | "sha1" | "md5"- Hash algorithm (default: "sha256")
Returns: Hex string signature
Example:
import { computeSignature } from 'nx-mongo';
const sig = computeSignature(
{ segments: [1, 2], role: "admin" },
["segments[]", "role"]
);Signature Algorithm
The signature generation follows these steps:
- Extract values for each key using
getByDotPath - Normalize values:
- Strings: As-is
- Numbers:
String(value) - Booleans:
"true"or"false" - Dates:
value.toISOString()(UTC) - Null/Undefined:
"null" - Objects:
JSON.stringify(value, Object.keys(value).sort())(sorted keys) - Arrays: Flatten recursively, normalize each element, deduplicate, sort lexicographically
- Create canonical map:
{ key1: [normalized values], key2: [normalized values], ... } - Sort keys alphabetically
- Stringify:
JSON.stringify(canonicalMap) - Hash: SHA-256 (or configurable algorithm)
- Return: Hex string
Usage Examples
Basic Usage with Config
import { SimpleMongoHelper } from 'nx-mongo';
const config = {
inputs: [
{ ref: "topology", collection: "topology-definition", query: {} },
{ ref: "vulnerabilities", collection: "vulnerabilities", query: { severity: "high" } }
],
outputs: [
{ ref: "paths", collection: "paths", keys: ["segments[]", "target_role"], mode: "append" },
{ ref: "prioritizedPaths", collection: "prioritized_paths", keys: ["segments[]"], mode: "replace" }
],
output: { mode: "append" }
};
const helper = new SimpleMongoHelper('mongodb://localhost:27017/mydb', undefined, config);
await helper.initialize();
// Load using ref (applies query automatically)
const topology = await helper.loadByRef('topology');
const vulns = await helper.loadByRef('vulnerabilities');
// Write using ref (automatic deduplication, uses keys from config)
const result = await helper.writeByRef('paths', pathDocuments);
console.log(`Inserted: ${result.inserted}, Updated: ${result.updated}`);
// Replace mode (clears collection first)
await helper.writeByRef('prioritizedPaths', prioritizedDocs);With Transactions
const session = helper.startSession();
try {
await session.withTransaction(async () => {
await helper.writeByRef('paths', docs, { session });
await helper.writeByRef('prioritizedPaths', prioDocs, { session });
});
} finally {
await session.endSession();
}Standalone Utilities
import { getByDotPath, computeSignature } from 'nx-mongo';
// Extract values
const values = getByDotPath(doc, "edges[].from"); // ["A", "B", "C"]
// Compute signature
const sig = computeSignature(doc, ["segments[]", "target_role"]);TypeScript Interfaces
PaginationOptions
interface PaginationOptions {
page?: number;
limit?: number;
sort?: Sort;
}PaginatedResult
interface PaginatedResult<T> {
data: WithId<T>[];
total: number;
page: number;
limit: number;
totalPages: number;
hasNext: boolean;
hasPrev: boolean;
}RetryOptions
interface RetryOptions {
maxRetries?: number;
retryDelay?: number;
exponentialBackoff?: boolean;
}HelperConfig
interface HelperConfig {
inputs: InputConfig[];
outputs: OutputConfig[];
output?: {
mode?: 'append' | 'replace';
};
progress?: {
collection?: string;
uniqueIndexKeys?: string[];
provider?: string;
};
}
interface InputConfig {
ref: string;
collection: string;
query?: Filter<any>;
}
interface OutputConfig {
ref: string;
collection: string;
keys?: string[];
mode?: 'append' | 'replace';
}WriteByRefResult
interface WriteByRefResult {
inserted: number;
updated: number;
errors: Array<{ index: number; error: Error; doc?: any }>;
indexCreated: boolean;
}EnsureSignatureIndexResult
interface EnsureSignatureIndexResult {
created: boolean;
indexName: string;
}Progress Tracking Interfaces
interface StageIdentity {
key: string;
process?: string;
provider?: string;
name?: string;
}
interface StageMetadata {
itemCount?: number;
errorCount?: number;
durationMs?: number;
[key: string]: any;
}
interface StageRecord extends StageIdentity {
completed: boolean;
startedAt?: Date;
completedAt?: Date;
metadata?: StageMetadata;
}
interface WriteStageOptions {
ensureIndex?: boolean;
session?: ClientSession;
complete?: {
key: string;
process?: string;
name?: string;
provider?: string;
metadata?: StageMetadata;
};
}
interface WriteStageResult extends WriteByRefResult {
completed?: boolean;
}Merge Collections Interfaces
interface MergeCollectionsOptions {
sourceCollection1: string;
sourceCollection2: string;
targetCollection: string;
strategy: 'index' | 'key' | 'composite';
key?: string;
compositeKeys?: string[];
joinType?: 'inner' | 'left' | 'right' | 'outer'; // SQL-style join type
fieldPrefix1?: string;
fieldPrefix2?: string;
includeIndex?: boolean;
onUnmatched1?: 'include' | 'skip'; // Deprecated: use joinType instead
onUnmatched2?: 'include' | 'skip'; // Deprecated: use joinType instead
session?: ClientSession;
}
interface MergeCollectionsResult {
merged: number;
unmatched1: number;
unmatched2: number;
errors: Array<{ index: number; error: Error; doc?: any }>;
}Error Handling
All methods throw errors with descriptive messages. Always wrap operations in try-catch blocks:
try {
await helper.initialize();
const users = await helper.loadCollection('users');
} catch (error) {
console.error('Operation failed:', error.message);
}Best Practices
Always initialize before use:
await helper.initialize();Use transactions for multi-operation consistency:
await helper.withTransaction(async (session) => { // Multiple operations });Use pagination for large datasets:
const result = await helper.loadCollection('users', {}, { page: 1, limit: 50 });Create indexes for frequently queried fields:
await helper.createIndex('users', { email: 1 }, { unique: true });Disconnect when done (optional but recommended):
await helper.disconnect();Note: Connections automatically close on app exit (SIGINT/SIGTERM), but explicit disconnection is recommended for better control and immediate cleanup.
ERC 2.0 Compliance
✅ Auto-discovers configuration from environment variables
✅ Type-safe with automatic coercion and validation
✅ All dependency requirements documented
✅ Transitive requirements automatically merged
Dependencies:
- ✅
nx-config2(Configuration engine) - ℹ️
mongodb(non-ERC) - requirements manually documented - ℹ️
micro-logs(non-ERC) - no environment variables required
Verification:
npx nx-config2 erc-verifyLicense
ISC
Contributing
Contributions are welcome! Please feel free to submit a Pull Request.
Changelog
3.8.1
- Fixed
testConnection()error reporting bug: Improved error extraction from MongoDB error objects to prevent[object Object]display - Enhanced error handling to extract MongoDB-specific error properties (
code,codeName,errmsg) - Added Windows-specific troubleshooting hints for localhost connection issues (suggests using
127.0.0.1instead oflocalhost) - Improved error messages with error codes and types for better debugging
- Updated documentation with better error handling examples
3.8.0
- Database selection via ref/type map: Added config-driven database selection using
refandtypeparameters - Added
databasesarray toHelperConfigfor mapping ref/type combinations to database names - All CRUD operations now support optional
refandtypeparameters for automatic database resolution - Database resolution priority: direct
databaseparameter >ref+type>refalone >typealone - Throws descriptive errors when no match or multiple matches found in database map
- Updated Progress API to support database resolution via ref/type
- Updated
loadByRef,writeByRef,writeStage, andmergeCollectionsto support database map resolution
3.7.0
- Separated database from connection string: Database name is now specified per operation, not in connection string
- Multi-database support: All operations accept optional
databaseparameter (defaults to'admin') - Connection string database name is automatically stripped if present (e.g.,
mongodb://localhost:27017/adminbecomesmongodb://localhost:27017/) - Updated all methods (
insert,update,delete,loadCollection,findOne,countDocuments,aggregate,createIndex,dropIndex,listIndexes,writeByRef,loadByRef,mergeCollections,writeStage, and progress API) to support per-operation database selection - Breaking change: No backward compatibility - all code must be updated to use new database parameter
3.6.0
- Automatic connection cleanup: Connections now automatically close on app exit (SIGINT, SIGTERM, beforeExit)
- Multi-instance support: Global registry handles multiple
SimpleMongoHelperinstances gracefully - Timeout protection: 5-second timeout prevents hanging during automatic cleanup
- Connections are properly managed and cleaned up even if users forget to call
disconnect()
3.5.0
- Enhanced
mergeCollections()with SQL-style join types (inner,left,right,outer) - Multiple match handling: Now creates multiple rows when keys have duplicates (SQL-style behavior)
- Improved key-based and composite-key merging to handle one-to-many and many-to-many relationships
- Added explicit join type control for better clarity and SQL compatibility
- Legacy
onUnmatched1/onUnmatched2flags deprecated in favor ofjoinTypeparameter
3.4.0
- Added
mergeCollections()method for merging two collections into a new target collection - Supports three merge strategies: index-based, key-based, and composite-key merging
- Index-based merging for same-order collections
- Key-based merging using unique identifiers (supports dot notation)
- Composite-key merging using multiple fields (e.g., name + ports + zones)
- Configurable field prefixes and unmatched record handling
- Transaction support for atomic merge operations
3.3.0
- Added
testConnection()method for detailed connection testing and error diagnostics - Package renamed from
nx-mongodb-helpertonx-mongo(shorter, cleaner name) - Connection test provides detailed error messages for missing credentials, invalid connection strings, authentication failures, and network issues
3.2.0
- Added process-scoped stages support - stages can now be scoped by process identifier
- Updated default unique index to include
processfield:['process', 'provider', 'key'] - All ProgressAPI methods now accept
processparameter for process-scoped stage tracking - Updated
writeStage()to support process-scoped completion - Stages with the same key can exist independently in different processes
3.1.0
- Added built-in progress tracking API (
helper.progress) for provider-defined pipeline stages - Added
writeStage()method that combines document writing with stage completion - Added progress tracking configuration to
HelperConfig(collection, uniqueIndexKeys, provider) - Progress API supports idempotent operations, transactions, and provider namespaces
- All progress operations support optional transaction sessions for atomicity
3.0.0
- Added config-driven ref mapping (HelperConfig, InputConfig, OutputConfig)
- Added signature-based deduplication with automatic index management
- Added
loadByRef()method for loading data by ref name - Added
writeByRef()method with signature computation, bulk upsert, and append/replace modes - Added
ensureSignatureIndex()method for signature index management - Added
useConfig()method for runtime config updates - Added
getByDotPath()utility function for dot-notation path extraction with array wildcards - Added
computeSignature()utility function for deterministic document signatures - Enhanced constructor to accept optional config parameter
- All new methods support transaction sessions
2.0.1
- Package renamed from
nx-mongodb-helpertonx-mongo - Add version number to README header
2.0.0
- Added delete operations
- Added findOne operation
- Added count operations (countDocuments, estimatedDocumentCount)
- Added pagination support
- Added aggregation pipeline support
- Added transaction support
- Added connection retry logic with exponential backoff
- Added index management (createIndex, dropIndex, listIndexes)
- Enhanced insert and update methods with session support
1.0.0
- Initial release with basic CRUD operations
