@qrvey/connect-library
v0.0.1-beta
Published
Unified Elasticsearch/OpenSearch client library with connection management and operations abstraction
Maintainers
Keywords
Readme
@qrvey/connect-library
Unified Elasticsearch / OpenSearch client library with connection management, retry logic, and operations abstraction.
Overview
@qrvey/connect-library provides a single, engine-agnostic API to interact with Elasticsearch 7.10 and OpenSearch 3.x clusters. It consolidates the connection and operation patterns previously scattered across ElasticsearchDAO, ElasticsearchService, and various helper modules into one well-tested package.
Key capabilities:
- Engine-agnostic client — one API surface for both ES 7.10 and OS 3.x
- Connection management — persistent connections with configurable TTL and concurrent-safe pool (ideal for Kubernetes pods and AWS credential rotation)
- AWS credential lifecycle —
credentialProvidercallback for automatic STS token refresh in long-running EKS pods - 30+ operations — bulk, document, index, mapping, cluster, scroll, snapshot, and script operations
- Retry with backoff — exponential backoff + jitter, configurable retryable status codes and error types
- Error classification — structured
ConnectErrorwith codes, retryability flag, and bulk error summaries - Three auth modes — Basic, AWS SigV4, None
- Dual module output — CJS + ESM + TypeScript declarations
Supported Engines
| Target | Engine | Server Versions | Client Library |
|--------|--------|-----------------|----------------|
| AWS Managed | OpenSearch | 3.x (tested with 3.5, 3.10) | @opensearch-project/opensearch 3.5.1 |
| AWS Managed | Elasticsearch | 7.10 | @elastic/elasticsearch 7.13.0 |
| Kubernetes | OpenSearch | 3.x | @opensearch-project/opensearch 3.5.1 |
| Kubernetes | Elasticsearch | 7.10 | @elastic/elasticsearch 7.13.0 |
Note: The OpenSearch JS client 3.x is compatible with any OpenSearch server 3.x. The client library auto-generates API bindings from the OpenSearch API spec weekly, so core operations (
bulk,search,scroll,indices.*) work across all 3.x server versions.
Installation
# yarn (monorepo)
yarn add @qrvey/connect-library
# npm
npm install @qrvey/connect-libraryQuick Start
1. Direct Client (Lambda / short-lived process)
import { ClientFactory, EngineType, AuthType } from '@qrvey/connect-library';
import { defaultProvider } from '@aws-sdk/credential-provider-node';
const factory = new ClientFactory();
const client = await factory.createClient({
host: 'https://my-cluster.es.amazonaws.com',
engineType: EngineType.OPENSEARCH,
authType: AuthType.AWS_SIGV4,
region: 'us-east-1',
credentialProvider: defaultProvider(),
});
// Use it
const isAlive = await client.ping(); // true
const results = await client.search('my-index', {
query: { match: { status: 'active' } }
});
const count = await client.count('my-index');
await client.close();2. Connection Manager (Server / Kubernetes pod)
For long-running processes (Express/Fastify servers, EKS pods), use ConnectionManager to automatically refresh connections when AWS credentials expire (default TTL: 50 min).
import { ConnectionManager, AuthType, EngineType } from '@qrvey/connect-library';
import { defaultProvider } from '@aws-sdk/credential-provider-node';
const manager = new ConnectionManager();
// First call creates the connection; subsequent calls reuse it.
// If TTL has expired, the connection is transparently refreshed.
// Concurrent calls for the same host share a single in-flight creation (no leaks).
const client = await manager.getConnection({
host: process.env.SEARCH_ENGINE_HOST,
engineType: EngineType.OPENSEARCH,
authType: AuthType.AWS_SIGV4,
region: process.env.AWS_REGION,
credentialProvider: defaultProvider(),
keepAlive: true,
connectionTTL: 3_000_000, // 50 minutes (matches AWS STS default)
maxConnections: 10,
});
await client.bulk('my-index', bulkBody);
// On shutdown
await manager.closeAll();How credential refresh works by engine:
| Engine | SigV4 Strategy | Credential Freshness |
|--------|---------------|---------------------|
| OpenSearch 3.x | AwsSigv4Signer with getCredentials callback | Per-request (always fresh) |
| Elasticsearch 7.x | aws-elasticsearch-connector with pre-resolved credentials | At client creation; refreshed when ConnectionManager TTL expires (default 50 min) |
3. Raw Client Access
When you need engine-specific APIs not covered by the abstraction:
const client = await factory.createClient(options);
const nativeClient = client.getClient(); // @elastic/elasticsearch Client or @opensearch-project/opensearch Client
// Use raw client directly
const { body } = await nativeClient.cat.indices({ format: 'json' });4. Environment Variable Fallbacks
Use resolveConnectionOptions() to build options from env vars:
import { resolveConnectionOptions } from '@qrvey/connect-library';
// Reads SEARCH_ENGINE_HOST, SEARCH_ENGINE_TYPE, SEARCH_ENGINE_AUTH_TYPE, etc.
const options = resolveConnectionOptions({ compression: true });
const client = await factory.createClient(options);Environment Variables
The library reads these environment variables as fallbacks when options are not explicitly provided.
It supports both new (SEARCH_ENGINE_*) and legacy (ELASTICSEARCH_*) variable names — new
variables take precedence, so existing deployments work without infra changes.
Connection & Engine
| Variable | Legacy Fallback | Description | Default |
|----------|----------------|-------------|---------|
| SEARCH_ENGINE_HOST | ELASTICSEARCH_HOST | Cluster URL (e.g. https://vpc-xxx.es.amazonaws.com) | — (required; throws if missing) |
| SEARCH_ENGINE_TYPE | — | Engine type: elasticsearch or opensearch | — (required; throws if missing) |
| SEARCH_ENGINE_AUTH_TYPE | (auto-detect) | Auth mode: basic, aws_sigv4, or none | auto-detected (see below) |
Authentication — Basic
| Variable | Legacy Fallback | Description | Default |
|----------|----------------|-------------|---------|
| SEARCH_ENGINE_USERNAME | ELASTICSEARCH_AUTH_USER | Basic auth username | — (required when auth = basic) |
| SEARCH_ENGINE_PASSWORD | ELASTICSEARCH_AUTH_PASSWORD | Basic auth password | — (required when auth = basic) |
Authentication — AWS SigV4
| Variable | Legacy Fallback | Description | Default |
|----------|----------------|-------------|---------|
| AWS_REGION | AWS_DEFAULT_REGION | AWS region for SigV4 signing | us-east-1 (adapter fallback) |
| AWS_ROLE_ARN | — | IAM role ARN (used by IRSA/EKS) | — |
| AWS_WEB_IDENTITY_TOKEN_FILE | — | OIDC token file path (used by IRSA/EKS) | — |
| AWS_ACCESS_KEY_ID | — | Static access key (Lambda, EC2, local dev) | — |
| AWS_SECRET_ACCESS_KEY | — | Static secret key | — |
| AWS_SESSION_TOKEN | — | STS session token | — |
Note:
AWS_*variables are consumed by the AWS SDK v3defaultProvider()chain that the consumer passes ascredentialProvider. In EKS pods with IRSA,AWS_ROLE_ARNandAWS_WEB_IDENTITY_TOKEN_FILEare injected automatically by the Kubernetes service account.
Auth Type Resolution Order
SEARCH_ENGINE_AUTH_TYPE is optional — the library auto-detects the correct mode:
1. Explicit authType in code → resolveConnectionOptions({ authType: AuthType.BASIC })
2. SEARCH_ENGINE_AUTH_TYPE env var → SEARCH_ENGINE_AUTH_TYPE=aws_sigv4
3. Auto-detect: username + password → AuthType.BASIC (legacy ELASTICSEARCH_AUTH_USER/PASSWORD)
4. Auto-detect: credentialProvider → AuthType.AWS_SIGV4
5. Default → AuthType.NONEKey insight: You can write unified code that always passes
credentialProviderand let env vars decide the auth strategy. IfELASTICSEARCH_AUTH_USER/PASSWORDare present, the library choosesBASIC(step 3). If they're absent,credentialProviderkicks in asAWS_SIGV4(step 4). This means AWS pods only need one new env var (SEARCH_ENGINE_TYPE).
Minimum New Env Vars Per Pod
When integrating with projects that already have ELASTICSEARCH_* vars set:
| Deployment | Required New Vars | Auth Resolution |
|------------|------------------|----------------|
| Kube — Basic Auth | SEARCH_ENGINE_TYPE | Auto-detected from ELASTICSEARCH_AUTH_USER + ELASTICSEARCH_AUTH_PASSWORD |
| Kube — No Auth | SEARCH_ENGINE_TYPE | Default NONE |
| AWS — SigV4 | SEARCH_ENGINE_TYPE | Auto-detected from credentialProvider passed in code |
If you prefer not to pass credentialProvider in code (e.g., in zero-config mode), set
SEARCH_ENGINE_AUTH_TYPE=aws_sigv4 explicitly as a second env var.
Deployment Recipes
Copy the recipe that matches your deployment scenario.
Best practice: Always set
SEARCH_ENGINE_AUTH_TYPEexplicitly. Auto-detection is a convenience for migration scenarios, but explicit configuration is clearer and easier to debug.
AWS + OpenSearch 3.x (Lambda)
Environment variables:
SEARCH_ENGINE_TYPE=opensearch
SEARCH_ENGINE_HOST=https://vpc-my-domain.us-east-1.es.amazonaws.com
SEARCH_ENGINE_AUTH_TYPE=aws_sigv4
AWS_REGION=us-east-1
# AWS_ACCESS_KEY_ID / AWS_SECRET_ACCESS_KEY injected by Lambda runtimeCode:
import { ClientFactory, resolveConnectionOptions } from '@qrvey/connect-library';
import { defaultProvider } from '@aws-sdk/credential-provider-node';
const factory = new ClientFactory();
const client = await factory.createClient(
resolveConnectionOptions({ credentialProvider: defaultProvider() }),
);
// Auth: AWS_SIGV4 (auto-detected from credentialProvider)AWS + Elasticsearch 7.10 (Lambda)
Environment variables:
SEARCH_ENGINE_TYPE=elasticsearch
SEARCH_ENGINE_HOST=https://vpc-my-domain.us-east-1.es.amazonaws.com
SEARCH_ENGINE_AUTH_TYPE=aws_sigv4
AWS_REGION=us-east-1Code:
import { ClientFactory, resolveConnectionOptions } from '@qrvey/connect-library';
import { defaultProvider } from '@aws-sdk/credential-provider-node';
const factory = new ClientFactory();
const client = await factory.createClient(
resolveConnectionOptions({ credentialProvider: defaultProvider() }),
);
// Auth: AWS_SIGV4 (auto-detected from credentialProvider)
// Note: credentials are pre-resolved at creation time;
// use ConnectionManager for TTL-based refresh in long-lived processes.AWS + OpenSearch 3.x (EKS pod with IRSA)
Environment variables:
SEARCH_ENGINE_TYPE=opensearch
SEARCH_ENGINE_HOST=https://vpc-my-domain.us-east-1.es.amazonaws.com
SEARCH_ENGINE_AUTH_TYPE=aws_sigv4
AWS_REGION=us-east-1
# AWS_ROLE_ARN and AWS_WEB_IDENTITY_TOKEN_FILE injected by service accountCode:
import { ConnectionManager, resolveConnectionOptions } from '@qrvey/connect-library';
import { defaultProvider } from '@aws-sdk/credential-provider-node';
const manager = new ConnectionManager();
// Per request:
const client = await manager.getConnection(
resolveConnectionOptions({ credentialProvider: defaultProvider() }),
);
// Auth: AWS_SIGV4 (auto-detected)
// Credentials refreshed per-request by OpenSearch's AwsSigv4Signer
// On shutdown:
await manager.closeAll();AWS + Elasticsearch 7.10 (EKS pod with IRSA)
Environment variables:
SEARCH_ENGINE_TYPE=elasticsearch
SEARCH_ENGINE_HOST=https://vpc-my-domain.us-east-1.es.amazonaws.com
SEARCH_ENGINE_AUTH_TYPE=aws_sigv4
AWS_REGION=us-east-1
# AWS_ROLE_ARN and AWS_WEB_IDENTITY_TOKEN_FILE injected by service accountCode:
import { ConnectionManager, resolveConnectionOptions } from '@qrvey/connect-library';
import { defaultProvider } from '@aws-sdk/credential-provider-node';
const manager = new ConnectionManager();
// Per request:
const client = await manager.getConnection(
resolveConnectionOptions({ credentialProvider: defaultProvider() }),
);
// Auth: AWS_SIGV4 (auto-detected)
// Credentials refreshed when ConnectionManager TTL expires (default 50 min)
// On shutdown:
await manager.closeAll();Kubernetes + OpenSearch 3.x (Basic Auth)
Environment variables:
SEARCH_ENGINE_TYPE=opensearch
SEARCH_ENGINE_HOST=https://opensearch-node1:9200
SEARCH_ENGINE_AUTH_TYPE=basic
SEARCH_ENGINE_USERNAME=admin
SEARCH_ENGINE_PASSWORD=adminOr with legacy variables (no changes needed):
SEARCH_ENGINE_TYPE=opensearch
SEARCH_ENGINE_AUTH_TYPE=basic # recommended even with legacy vars
ELASTICSEARCH_HOST=https://opensearch-node1:9200
ELASTICSEARCH_AUTH_USER=admin
ELASTICSEARCH_AUTH_PASSWORD=adminCode:
import { ConnectionManager, resolveConnectionOptions } from '@qrvey/connect-library';
const manager = new ConnectionManager();
const client = await manager.getConnection(resolveConnectionOptions());
// Auth: BASIC (auto-detected from username + password)Kubernetes + Elasticsearch 7.10 (Basic Auth)
Environment variables:
SEARCH_ENGINE_TYPE=elasticsearch
SEARCH_ENGINE_HOST=https://elasticsearch-node1:9200
SEARCH_ENGINE_AUTH_TYPE=basic
SEARCH_ENGINE_USERNAME=elastic
SEARCH_ENGINE_PASSWORD=changemeOr with legacy variables:
SEARCH_ENGINE_TYPE=elasticsearch
SEARCH_ENGINE_AUTH_TYPE=basic # recommended even with legacy vars
ELASTICSEARCH_HOST=https://elasticsearch-node1:9200
ELASTICSEARCH_AUTH_USER=elastic
ELASTICSEARCH_AUTH_PASSWORD=changemeCode:
import { ConnectionManager, resolveConnectionOptions } from '@qrvey/connect-library';
const manager = new ConnectionManager();
const client = await manager.getConnection(resolveConnectionOptions());
// Auth: BASIC (auto-detected from username + password)Unified Code (same codebase, env vars decide)
For projects that deploy to both AWS and Kubernetes, use a single code path:
import { ConnectionManager, resolveConnectionOptions } from '@qrvey/connect-library';
import { defaultProvider } from '@aws-sdk/credential-provider-node';
const manager = new ConnectionManager();
const client = await manager.getConnection(
resolveConnectionOptions({ credentialProvider: defaultProvider() }),
);
// Kube pods with USER/PASS → BASIC (auto-detected, credentialProvider ignored)
// AWS pods without USER/PASS → AWS_SIGV4 (auto-detected from credentialProvider)API Reference
Enums
| Enum | Values | Description |
|------|--------|-------------|
| EngineType | ELASTICSEARCH, OPENSEARCH | Target engine selection |
| AuthType | BASIC, AWS_SIGV4, NONE | Authentication mode |
| ConnectionStatus | CONNECTED, DISCONNECTED, REFRESHING | Connection state |
| ConnectErrorCode | CONNECTION_ERROR, TIMEOUT_ERROR, AUTH_ERROR, INDEX_NOT_FOUND, MAPPING_ERROR, BULK_ERROR, CLUSTER_ERROR, RESOURCE_NOT_FOUND, THROTTLE_ERROR, BAD_REQUEST, UNKNOWN_ERROR | Classified error codes |
Connection Options (IConnectionOptions)
interface IConnectionOptions {
host: string; // Cluster URL
engineType?: EngineType; // Falls back to SEARCH_ENGINE_TYPE env var
authType: AuthType; // Required
region?: string; // Required for AWS_SIGV4
credentials?: { // Pre-resolved AWS credentials (internal use)
accessKeyId: string;
secretAccessKey: string;
sessionToken?: string;
};
credentialProvider?: () => Promise<{ // AWS credential provider function
accessKeyId: string;
secretAccessKey: string;
sessionToken?: string;
}>;
username?: string; // Required for BASIC
password?: string; // Required for BASIC
requestTimeout?: number; // ms, default: 30000
maxRetries?: number; // Client-level retries, default: 3
keepAlive?: boolean; // HTTP keep-alive, default: true
compression?: boolean; // gzip, default: false
ssl?: { rejectUnauthorized: boolean };
connectionTTL?: number; // ms, default: 300000 (5 min)
maxConnections?: number; // Max sockets, default: 10
}Prefer
credentialProviderovercredentials— the provider function enables automatic credential refresh. PassdefaultProvider()from@aws-sdk/credential-provider-nodewhich handles IRSA (EKS), EC2 instance roles, ECS task roles, Lambda execution roles, and static env vars.
Services
ClientFactory
Creates engine-specific client instances. createClient is async to support AWS credential resolution.
const factory = new ClientFactory();
const client = await factory.createClient(options); // → Promise<ISearchEngineClient>ConnectionManager
Manages a pool of persistent connections keyed by host URL.
| Method | Description |
|--------|-------------|
| getConnection(options) | Returns existing or creates new connection. Auto-refreshes if TTL expired. Concurrent calls for the same host share a single in-flight creation promise (no duplicate connections). |
| refreshConnection(options) | Forces connection refresh (closes old, creates new). |
| closeConnection(host) | Closes and removes a specific connection. |
| closeAll() | Closes all connections and clears the pool. |
| isConnectionExpired(host) | Checks if TTL has elapsed. |
| getStatus(host) | Returns ConnectionStatus for a host. |
| size | Number of active connections. |
RetryHandler
Wraps any async operation with exponential backoff + jitter.
import { RetryHandler } from '@qrvey/connect-library';
const retry = new RetryHandler({
maxRetries: 5,
baseDelay: 1000,
maxDelay: 30000,
jitterPercent: 25,
retryableStatusCodes: [429, 502, 503, 504],
onRetry: (attempt, delay, error) => console.log(`Retry ${attempt} in ${delay}ms`),
});
const result = await retry.execute(() => client.search('idx', query));Default retryable errors: NoLivingConnectionsError, ConnectionError, TimeoutError, ECONNRESET, ECONNREFUSED, ETIMEDOUT, EAI_AGAIN.
ErrorHandler / ConnectError
Classifies raw ES/OS errors into structured ConnectError instances.
import { ErrorHandler, ConnectError } from '@qrvey/connect-library';
try {
await client.search('missing-index', query);
} catch (err) {
const classified = ErrorHandler.classify(err);
// classified.code → ConnectErrorCode.INDEX_NOT_FOUND
// classified.retryable → false
// classified.statusCode → 404
// classified.message → "index_not_found_exception"
}
// Bulk error analysis
const summary = ErrorHandler.fromBulkResponse(bulkResult);
// summary.retryableItems → items that can be retried (429, 503)
// summary.permanentItems → items with permanent errors (400, 404)Client Operations (ISearchEngineClient)
All operations are available on any client returned by ClientFactory or ConnectionManager.
Core
| Method | Signature | Description |
|--------|-----------|-------------|
| ping() | () → Promise<boolean> | Connectivity check |
| info() | () → Promise<any> | Cluster version and info |
| close() | () → Promise<void> | Close the client connection |
| getClient() | () → any | Access the underlying native client |
Document Operations
| Method | Signature | Description |
|--------|-----------|-------------|
| search | (index, query, options?) → Promise<any> | Search with DSL query |
| mget | (index, docs, options?) → Promise<any> | Multi-get by IDs |
| count | (index, query?) → Promise<number> | Document count |
| index | (index, id, document, options?) → Promise<any> | Index a document |
| get | (index, id, options?) → Promise<any> | Get by ID |
| delete | (index, id, options?) → Promise<any> | Delete by ID |
| update | (index, id, body, options?) → Promise<any> | Partial update |
| deleteByQuery | (index, query, options?) → Promise<any> | Delete matching documents |
| updateByQuery | (index, query, options?) → Promise<any> | Update matching documents |
Bulk Operations
| Method | Signature | Description |
|--------|-----------|-------------|
| bulk | (index, body, options?) → Promise<IBulkResponse> | Bulk index/update/delete |
| msearch | (body, options?) → Promise<any> | Multi-search |
IBulkResponse:
{ isError: boolean; errors?: any[]; itemCount: number; timeDuration: number }Index Operations
| Method | Signature | Description |
|--------|-----------|-------------|
| createIndex | (index, options?) → Promise<any> | Create with settings/mappings/aliases |
| deleteIndex | (index, options?) → Promise<any> | Delete index |
| bulkDeleteIndices | (indices) → Promise<any> | Delete multiple indices |
| indexExists | (index) → Promise<boolean> | Check existence |
| listIndices | (options?) → Promise<any[]> | List all indices |
| reindex | (source, dest, options?) → Promise<any> | Reindex data |
| refreshIndex | (index) → Promise<any> | Force refresh |
| putSettings | (index, settings) → Promise<any> | Update index settings |
Mapping Operations
| Method | Signature | Description |
|--------|-----------|-------------|
| getMapping | (index) → Promise<any> | Get index mapping |
| putMapping | (index, body) → Promise<any> | Update mapping |
Cluster Operations
| Method | Signature | Description |
|--------|-----------|-------------|
| getClusterHealth | () → Promise<IClusterHealth> | Cluster health status |
| getClusterSettings | () → Promise<any> | All settings (including defaults) |
| getNodeStats | (metrics?) → Promise<any> | Node-level stats |
| getCpuUsage | () → Promise<number> | Average CPU % across nodes |
Scroll Operations
| Method | Signature | Description |
|--------|-----------|-------------|
| openScroll | (index, query, options?) → Promise<IScrollResult> | Start scroll context |
| nextScroll | (scrollId, scrollTTL?) → Promise<IScrollResult> | Next scroll page |
| clearScroll | (scrollId) → Promise<any> | Release scroll context |
IScrollResult:
{ scrollId: string; hits: any[]; total: number }Snapshot Operations
| Method | Signature | Description |
|--------|-----------|-------------|
| getSnapshotsInProgress | () → Promise<{inProgress, count, snapshots}> | Active snapshots |
Script Operations
| Method | Signature | Description |
|--------|-----------|-------------|
| scriptsPainlessExecute | (body) → Promise<any> | Execute Painless script |
Migration Guide
From ElasticsearchDAO (dr_dataload_pipeline)
Before:
const ElasticsearchDAO = require('../dao/elasticsearchDAO');
// Per-request client, no pooling
const client = ElasticsearchDAO.initializeESClient();
const result = await ElasticsearchDAO.runQueryES(client, index, query);
const exists = await ElasticsearchDAO.indexExists(client, indexName);
await ElasticsearchDAO.bulkInsert(client, indexName, records);
await ElasticsearchDAO.refreshIndexes(client, indexName);
await ElasticsearchDAO.deleteIndex(client, indexName);
const mapping = await ElasticsearchDAO.getIndexMapping(client, indexName);
await ElasticsearchDAO.putESMapping(client, indexName, mapping);After:
import { ClientFactory, AuthType } from '@qrvey/connect-library';
import { defaultProvider } from '@aws-sdk/credential-provider-node';
const factory = new ClientFactory();
const client = await factory.createClient({
host: process.env.SEARCH_ENGINE_HOST,
authType: process.env.SEARCH_ENGINE_USERNAME ? AuthType.BASIC : AuthType.AWS_SIGV4,
username: process.env.SEARCH_ENGINE_USERNAME,
password: process.env.SEARCH_ENGINE_PASSWORD,
region: process.env.AWS_REGION,
credentialProvider: defaultProvider(),
});
const result = await client.search(index, query);
const exists = await client.indexExists(indexName);
await client.bulk(indexName, records); // Returns IBulkResponse with error tracking
await client.refreshIndex(indexName);
await client.deleteIndex(indexName);
const mapping = await client.getMapping(indexName);
await client.putMapping(indexName, mapping);Key differences:
- Methods are instance methods, not static (no need to pass
clientas first arg) bulk()returns a structuredIBulkResponse({ isError, errors, itemCount, timeDuration })- Built-in keep-alive and connection pooling (no per-request client creation)
- No external retry library needed — use
RetryHandler
From ElasticsearchService (qrvey_qollect)
Before:
const ElasticsearchService = require('./elasticsearchService');
// Singleton with manual TTL refresh
const client = await ElasticsearchService.getESClient();
const { body } = await client.search({ index, body: query });
// Raw client usage — must destructure { body } yourselfAfter:
import { ConnectionManager, AuthType } from '@qrvey/connect-library';
import { defaultProvider } from '@aws-sdk/credential-provider-node';
// Automatic TTL-based refresh
const manager = new ConnectionManager();
const client = await manager.getConnection({
host: process.env.SEARCH_ENGINE_HOST,
authType: AuthType.AWS_SIGV4,
region: process.env.AWS_REGION,
credentialProvider: defaultProvider(),
connectionTTL: 3_000_000, // 50 min
});
// Body is already extracted — no destructuring needed
const result = await client.search(index, query);Key differences:
ConnectionManagerreplaces the manual singleton + TTL pattern- Methods return the response body directly (no
{ body }destructuring) - Auth configuration is declarative, not scattered across helper functions
From helper/elasticsearch.js (qrvey_qollect)
Before:
const { getESClient, closeAllClients } = require('./helper/elasticsearch');
const client = await getESClient(host); // Per-host Map cache
const { body } = await client.search({ index, body: query });
// Cleanup
await closeAllClients();After:
import { ConnectionManager, AuthType } from '@qrvey/connect-library';
import { defaultProvider } from '@aws-sdk/credential-provider-node';
const manager = new ConnectionManager();
const client = await manager.getConnection({
host,
authType: AuthType.AWS_SIGV4,
region,
credentialProvider: defaultProvider(),
});
const result = await client.search(index, query);
await manager.closeAll();Development
Build
npm run build # Full build (CJS + ESM + types)
npm run build:clean # Clean dist/
npm run type-check # TypeScript type verificationTest
npm test # Unit tests (356 tests)
npm run test:cov # Unit tests + coverage
npm run test:integration # Integration tests — starts Docker, runs, stops
npm run test:integration:keep # Integration tests — keep containers running
npm run test:integration:no-up # Integration tests — use already-running containers
npm run test:smoke:es # Smoke test against live ES cluster (.env.smoke.es)
npm run test:smoke:os # Smoke test against live OS cluster (.env.smoke.os)Integration Test Infrastructure
Docker Compose spins up three clusters:
| Service | Engine | Port |
|---------|--------|------|
| Elasticsearch | 7.10.2 | localhost:19200 |
| OpenSearch | 2.19.1 | localhost:19201 |
| OpenSearch | 3.5.0 | localhost:19202 |
# Manual container management
docker compose -f docker-compose.integration.yml up -d
docker compose -f docker-compose.integration.yml downLint
npm run lintProject Structure
src/
├── index.ts # Barrel export (public API)
├── config/
│ ├── resolveEngineType.ts # Engine type resolution with env fallback
│ └── resolveConnectionOptions.ts # Options builder with env var defaults
├── interfaces/
│ ├── connectionOptions.interface.ts
│ ├── searchEngineClient.interface.ts
│ ├── connectionManager.interface.ts
│ ├── clientFactory.interface.ts
│ ├── bulkOperations.interface.ts
│ ├── documentOperations.interface.ts
│ ├── indexOperations.interface.ts
│ ├── mappingOperations.interface.ts
│ ├── clusterOperations.interface.ts
│ ├── scrollOperations.interface.ts
│ ├── snapshotOperations.interface.ts
│ ├── scriptOperations.interface.ts
│ ├── retryOptions.interface.ts
│ └── errorHandler.interface.ts
├── types/
│ ├── engineType.type.ts
│ ├── authType.type.ts
│ └── connectionStatus.type.ts
├── services/
│ ├── base/
│ │ └── baseSearchEngineAdapter.service.ts # Abstract base (all 30+ operations)
│ ├── elasticsearch/
│ │ └── elasticsearchAdapter.service.ts # ES 7.x client adapter
│ ├── opensearch/
│ │ └── opensearchAdapter.service.ts # OS 3.x client adapter
│ ├── clientFactory.service.ts # Engine-based factory
│ ├── connectionManager.service.ts # Connection pool + TTL
│ ├── retryHandler.service.ts # Exponential backoff + jitter
│ └── errorHandler.service.ts # Error classification + ConnectError
└── tests/
├── integration/ # 159 integration tests (3 engines)
└── smoke/ # Smoke tests for live clustersLicense
Proprietary — Qrvey Inc.
