@bernierllc/csv-import-service
v0.10.1
Published
Generic CSV resolution/merge orchestrator: parse → fuzzy-map → validate → resolve → detectConflicts → buildMergePlan → applyMergePlan
Downloads
1,335
Readme
@bernierllc/csv-import-service
Service tier · Generic CSV resolution/merge orchestrator
⚠️ Node.js Only
This package is Node.js-only ("browser": false). It orchestrates
@bernierllc/entity-resolver, @bernierllc/merge-planner, and the full CSV
stack — all of which have Node.js dependencies.
For browser/frontend applications, use the backend API exposed by
@bernierllc/csv-import-suite and call it via @bernierllc/csv-import-suite-client.
Overview
csv-import-service is the stateless generic resolution/merge engine for
the BernierLLC CSV uploader stack. It wires together the full pipeline:
CSV input
→ parse (csv-parser)
→ fuzzy auto-map (csv-mapper + string-similarity)
→ validate (csv-validator)
→ resolve (entity-resolver + host CandidateProvider)
→ detectConflicts (merge-planner)
→ RunResult ← UI review happens here
→ buildMergePlan (operator decisions compiled into MergePlan)
→ applyMergePlan (Persister + retry + progress + AuditSink)
→ ImportReportThe package is fully generic over <Row, Existing> and contains zero
domain concepts — all domain shape enters through UploaderSchema,
CandidateProvider<Row, Existing>, and Persister<Existing>.
Installation
npm install @bernierllc/csv-import-serviceQuick Start
import { CsvImportService } from '@bernierllc/csv-import-service';
import type { Persister, CandidateProvider } from '@bernierllc/csv-import-service';
// 1. Instantiate (optional NeverHub init)
const svc = new CsvImportService<MyRow, MyRecord>();
await svc.initialize(); // auto-detects NeverHub; safe to skip
// 2. Run the pipeline
const result = await svc.run({
csv: req.file.buffer,
targetFields: ['email', 'fullName', 'phone'],
uploaderSchema: {
fields: [
{ targetField: 'email', identity: true, mergeable: false },
{ targetField: 'fullName', identity: false, mergeable: true },
{ targetField: 'phone', identity: false, mergeable: true },
],
},
matchConfig: {
rules: [
{ field: 'email', comparator: 'compareEmail', weight: 3 },
{ field: 'fullName', comparator: 'compareName', weight: 1 },
],
autoAcceptAt: 0.9,
reviewAt: 0.5,
},
candidateProvider: myDb, // implements CandidateProvider
});
// result.rows — mapped rows
// result.resolutions — per-row MatchResult (tier: auto/review/none)
// result.validationErrors — Map<rowIndex, ValidationError[]>
// 3. Send result to UI for operator review of 'review'-tier matches
// and conflict resolution. Build RecordResolution[] from UI choices.
// 4. Compile the plan
const plan = svc.buildMergePlan(resolutions, uploaderSchema);
// 5. Apply
const report = await svc.apply(plan, myPersister, {
batchSize: 50,
onProgress: ({ applied, total, fraction }) =>
console.log(`${Math.round(fraction * 100)}%`),
auditSink: myAuditLog,
retry: { maxRetries: 3, initialDelayMs: 200, maxDelayMs: 5000, backoffFactor: 2 },
});
console.log(`created=${report.created} merged=${report.merged} failed=${report.failed}`);API
class CsvImportService<Row, Existing>
initialize(): Promise<void>
Auto-detect NeverHub and register this package. All core functionality works
without NeverHub — calling initialize() is optional.
run(options: RunOptions<Row, Existing>): Promise<RunResult<Row, Existing>>
Execute the full parse → map → validate → resolve pipeline.
RunOptions:
| Field | Type | Required | Description |
|---|---|---|---|
| csv | string \| Buffer | Yes | Raw CSV content |
| targetFields | string[] | Yes | Field names for fuzzy auto-mapping |
| uploaderSchema | UploaderSchema | Yes | Identity/mergeable schema |
| matchConfig | MatchConfig<Row> | Yes | Entity-resolver configuration |
| candidateProvider | CandidateProvider<Row, Existing> | Yes | Host DB lookup |
| parserOptions | CSVParserOptions | — | csv-parser options |
| columnMapping | ColumnMapping | — | Explicit mapping (bypasses autoMap) |
| autoMapThreshold | number | — | Fuzzy confidence threshold (default: 0.5) |
| validationSchema | ValidationSchema | — | csv-validator schema |
RunResult:
| Field | Description |
|---|---|
| headers | Parsed headers |
| rows | Mapped rows |
| resolutions | Per-row MatchResult<Existing> (tier: auto/review/none) |
| validationErrors | Map<rowIndex, ValidationError[]> |
buildMergePlan(resolutions, schema): MergePlan<Existing>
Compile operator RecordResolution[] decisions into a declarative MergePlan.
apply(plan, persister, opts?): Promise<ImportReport>
Apply the plan via the host Persister. Supports batching, retry, progress
events, and AuditSink emissions.
detectConflicts(row, existing, schema): FieldConflict[]
Convenience wrapper around @bernierllc/merge-planner's detectConflicts.
function applyMergePlan<Existing>(plan, persister, opts?): Promise<ImportReport>
Standalone functional export. Identical to CsvImportService.apply() but
usable without instantiating the service class.
interface Persister<Existing>
Host-implemented persistence contract:
interface Persister<Existing> {
create(_rows: Record<string, unknown>[]): Promise<Existing[]>;
merge(_patches: MergePatch<Existing>[]): Promise<Existing[]>;
}interface AuditSink
Receives a structured AuditEvent for every create/merge/skip decision and
apply outcome. Errors thrown by the sink are silently suppressed.
interface AuditSink {
emit(_event: AuditEvent): void | Promise<void>;
}AuditAction values: 'create' | 'merge' | 'skip' | 'apply_ok' | 'apply_err'
ApplyMergePlanOptions
| Field | Type | Default | Description |
|---|---|---|---|
| batchSize | number | 100 | Items per Persister call |
| onProgress | (_p: Progress) => void | — | Called after each batch |
| auditSink | AuditSink | — | Receives structured audit events |
| retry | { maxRetries, initialDelayMs, maxDelayMs, backoffFactor } | 3/200/5000/2 | Retry config |
| isRetryable | (_err: unknown) => boolean | () => true | Classify transient vs permanent errors |
Error Handling
This package throws CsvImportServiceError with typed error codes and Error.cause
chains for full debuggability via @bernierllc/logger's formatErrorChain.
| Error Class | Code | Description | Retryable |
|---|---|---|---|
| CsvImportServiceError | PARSE_FAILED | csv-parser rejected the input | No |
| CsvImportServiceError | MAPPING_FAILED | AutoMapper or row mapping failed | No |
| CsvImportServiceError | VALIDATION_FAILED | Validator threw unexpectedly | No |
| CsvImportServiceError | RESOLUTION_FAILED | entity-resolver / CandidateProvider failed | Depends |
| CsvImportServiceError | PROVIDER_FAILURE | CandidateProvider threw | Depends |
| CsvImportServiceError | APPLY_FAILED | Persister failed permanently after retries | No |
| CsvImportServiceError | INVALID_CONFIG | Missing required configuration | No |
| CsvImportServiceError | INVALID_INPUT | Null/invalid arguments | No |
| CsvImportServiceError | OPERATION_FAILED | Unexpected orchestration error | No |
All errors use Error.cause to chain underlying errors. Use
@bernierllc/logger's formatErrorChain to view the full chain.
import { CsvImportServiceError } from '@bernierllc/csv-import-service';
import { formatErrorChain } from '@bernierllc/logger';
try {
await svc.run(opts);
} catch (err) {
if (err instanceof CsvImportServiceError) {
console.error(`[${err.code}] ${err.message}`);
console.error(formatErrorChain(err)); // Full chain including cause
if (err.context) {
console.error('Context:', err.context);
}
}
}NeverHub Integration
CsvImportService follows the CLAUDE.md service-tier NeverHub pattern:
const svc = new CsvImportService();
await svc.initialize(); // auto-detect + register; no-op if NeverHub absent
// Core functionality works regardless:
const result = await svc.run(...);NeverHub is never required — run(), buildMergePlan(), and apply() all
work without it. Call initialize() once on startup if you want NeverHub
service discovery.
Design
- Stateless: no internal job/progress state between calls.
- Generic: fully parameterized over
<Row, Existing>— zero domain concepts. - Composable: each step delegates to a focused core package.
- Idempotent-friendly:
applyMergePlanprocesses items in deterministic rowIndex order; re-runs with pre-filtered plans skip already-applied rows. - Partial failure:
ImportReport.outcomesrecords per-item status so callers can detect and retry failed items without re-processing successful ones.
License
Bernier LLC — see LICENSE
