@exellix/narrix-engine
v2.0.0
Published
Orchestrates Narrix Scoping (CNI v1 → Stories/Signals) and optional Narrative Outcome classification
Readme
@exellix/narrix-engine
Orchestrates Narrix Scoping (CNI v1 → Stories/Signals) and optional Narrative Outcome classification into one runtime engine for Node.js/TypeScript.
It supports single, batch enrichment, and file workflows, and it can run multi-pass pipelines (e.g. scoping then discovery) with deterministic, namespaced attachments.
Under the hood it composes:
- @exellix/narrix-scoper (runtime: CNI + story/signal pipeline via
createNarrativeEngine/evaluate) - @exellix/narrix-detector (contracts: CNI v1 types, story contracts, canonical shapes)
✅ Auth: use the token already present in your repo’s
.npmrc/ user.npmrc.
Do not paste tokens into code or docs. This package expects npm auth to already work.
What this package does
Given:
- a configuration — either a config file path (JSON “metadata packs”: signals catalog, narrator mappings, rule packs) or an in-memory pack (no filesystem)
- at runtime: one raw input object (or many)
It produces:
- CNI v1 (canonical input)
- Stories (a.k.a “Narratives / Narrative Instances”)
- Signals
- Optional Narrative Outcomes (classification labels applied to stories)
In addition, narrix-engine can run multiple passes over the same record (e.g. scoping and discovery) and attach outputs under separate namespaces like:
{
"_narrix": {
"meta": { "...": "..." },
"scoping": { "stories": [], "signals": [], "facts": [] },
"discovery": { "stories": [], "signals": [], "facts": [] }
}
}Relationship to upstream packages
@exellix/narrix-engine integrates two upstream packages:
- @exellix/narrix-scoper — Runtime pipeline execution (
createNarrativeEngine,evaluate). Engine calls scoper for CNI build, rule evaluation, signals, and narratives. - @exellix/narrix-detector — Contract types and canonical shapes (CNI v1,
CniNarrativeV1,EvidencePointerV1, etc.). Engine uses detector for type contracts; runtime API comes from scoper.
Note: Engine does not re-implement detector/scoper logic. It orchestrates multi-pass pipelines, routing, and enrichment attachment.
Terminology
- Narrative Type — a dimension/question (identified by
narrativeTypeId) - Story (Narrative Instance) — scoped result for
(subject + narrativeTypeId). Engine attachments usestories(notnarratives). - Signal — auditable alert-like fact emitted by rules
- Narrative Outcome — optional classification bucket for a Story (e.g.,
benign,risk,needs-review). Outcome classification is engine-owned (not detector-owned). - Pass — one configured run of the detector/scoper with a specific set of rule packs and a target attachment path (e.g.
scoping,discovery) - Processor — routing + mapping + per-pass rule packs for a specific record type (e.g.
vulnInstance,assetEgress)
Install
npm i @exellix/narrix-engineThis package depends on:
@exellix/narrix-scoper(runtime pipeline)@exellix/narrix-detector(contract types)
(They should install automatically as dependencies.)
Modes
narrix-engine runs in 3 modes:
- single: process one object and return the result
- enrich: process a JSON payload that contains many records and return the same JSON enriched with outputs
- file: load JSON from disk → enrich → write back to disk
Config can be supplied as:
- Config file (
configPath) — paths in the config are resolved relative to the config file - In-memory pack (
pack) — no filesystem; use for npm-packaged or module-resolved assets (see Create engine from a pack)
All modes can run either:
- Legacy single-pass config (v1): select mapping by
subjectTypeand run one set of rule packs - Pipeline config (v2): route records to processors and run multiple passes (recommended)
Quick start (recommended: pipeline config v2)
1) Create a config folder
Example layout:
./narrix-config/
narrix-engine.config.json
signals.catalog.v1.json
mappings/
subnet.narrator.v1.json
asset.narrator.v1.json
rules/
subnet.scoping.rules.v1.json
subnet.discovery.rules.v1.json
asset.scoping.rules.v1.json
asset.discovery.rules.v1.json
outcomes/
subnet.outcomes.v1.json (optional)2) Example narrix-engine.config.json (v2)
{
"schema": "narrix.engine.config.v2",
"engineId": "demo-engine",
"version": "1.0.0",
"signalsCatalogPath": "./signals.catalog.v1.json",
"mappingIndex": {
"subnet": "./mappings/subnet.narrator.v1.json",
"asset": "./mappings/asset.narrator.v1.json"
},
"processors": [
{
"id": "subnet",
"entityKind": "subnet",
"mappingKey": "subnet",
"route": {
"datasetIds": ["demo.subnets"],
"matchers": [{ "when": { "exists": { "path": "cidr" } } }]
},
"passes": {
"scoping": { "rulePackPaths": ["./rules/subnet.scoping.rules.v1.json"] },
"discovery": { "rulePackPaths": ["./rules/subnet.discovery.rules.v1.json"] }
},
"entityIdentity": {
"entityIdPath": "id",
"entityKey": {
"format": "firstNonEmpty",
"primary": [{ "path": "id" }],
"fallback": [{ "path": "cidr" }]
}
}
}
],
"pipeline": {
"passes": [
{
"passId": "scoping",
"attachPath": "scoping",
"include": { "stories": true, "signals": true, "facts": true, "cni": false, "baggage": false }
},
{
"passId": "discovery",
"attachPath": "discovery",
"include": { "stories": true, "signals": true, "facts": true, "cni": false, "baggage": false },
"policies": {
"requireAssumptionMarking": true,
"requireEvidenceForAssumptions": true
}
}
],
"metaAttachPath": "meta"
},
"enrichment": {
"attachToField": "_narrix",
"includeCni": false,
"includeBaggage": false,
"includeFacts": true,
"includeSignals": true,
"includeStories": true
},
"runtime": {
"trace": false,
"onProcessorNotMatched": "attachError",
"onMissingMapping": "attachError",
"deterministicSort": true
},
"outcomes": {
"enabled": false,
"classifierPackPaths": []
}
}Strict minimal (recommended for new configs): Use only datasetIds, matchers (exists, eq, startsWith), and entityKey.firstNonEmpty. No graphTypeEquals, no template entityKey, no engine-level joinCandidates. See schemas/narrix.engine.config.v2.schema.json and the config skeleton in the repo. Join candidates are emitted as discovery facts (e.g. FACT_JOIN_ASSET_IP, FACT_JOIN_CVE_IDS).
What this config enables
- Record routing via
datasetId(preferred), optionalgraphTypeEquals, or matchers (exists, eq, contains, startsWith) - Two passes:
scopinganddiscoverywith separate rule packs - Namespaced enrichment:
_narrix.scoping.{ stories, signals, facts }_narrix.discovery.{ stories, signals, facts }_narrix.meta.{ entity, run, source }
- Entity identity: deterministic
entityKeyand optionalentityIdattached to meta - Assumption enforcement:
discoverypass requires assumptions to be marked and evidenced
Processor routing (R1)
The engine routes records to processors using this priority order:
- Explicit
processorId(if provided) — direct selection datasetId(preferred) — matchesprocessors[].route.datasetIds[]graphTypeEquals— convenience sugar foreq("graphized.meta.graphType", "...")- Matchers — evaluated in order; all must match
Routing predicates
Matchers support these predicates (evaluated on the record):
exists— path exists and is not null/undefined{ "when": { "exists": { "path": "cidr" } } }eq— path value equals constant{ "when": { "eq": { "path": "type", "value": "vulnerability" } } }contains— path value (string) contains substring{ "when": { "contains": { "path": "name", "substr": "critical" } } }startsWith— path value (string) starts with prefix{ "when": { "startsWith": { "path": "id", "prefix": "vuln-" } } }
Routing examples
{
"processors": [
{
"id": "vulnInstance",
"entityKind": "vulnerability",
"mappingKey": "vulnerability",
"route": {
"datasetIds": ["neo.vulnerabilities"],
"graphTypeEquals": ["network.vuln.instance"],
"matchers": [
{ "when": { "exists": { "path": "graphized.id" } } },
{ "when": { "eq": { "path": "severity", "value": "critical" } } }
]
},
"passes": { /* ... */ }
}
]
}Routing behavior:
- If
datasetId: "neo.vulnerabilities"is provided → matchesvulnInstance - If
graphized.meta.graphType === "network.vuln.instance"→ matchesvulnInstance - If record has
graphized.idexists ANDseverity === "critical"→ matchesvulnInstance - First matching processor wins (order matters if multiple match)
Entity identity (R2)
For every record processed, the engine computes and attaches:
_narrix.meta.entity.entityKind— fromprocessor.entityKind_narrix.meta.entity.entityId— optional, fromentityIdentity.entityIdPath_narrix.meta.entity.entityKey— required, computed deterministically
Entity key computation
The entityKey is derived from entityIdentity.entityKey:
Format: firstNonEmpty (recommended)
- Tries
primarypaths in order, returns first non-empty value - Falls back to
fallbackpaths if all primary are empty - Defaults to
"unknown"if all fail
{
"entityIdentity": {
"entityIdPath": "id",
"entityKey": {
"format": "firstNonEmpty",
"primary": [
{ "path": "graphized.id", "label": "graphId" },
{ "path": "id", "label": "recordId" }
],
"fallback": [
{ "path": "cidr", "label": "cidr" }
]
}
}
}Format: joinWithPipes
- Joins all non-empty values from
primary(orfallback) with| - Example:
"subnet-123|10.0.1.0/24"
Note: Template format with {path} placeholders is not yet implemented. Use firstNonEmpty and store richer join keys in _narrix.discovery.facts (see Join candidates).
Programmatic API
Create engine
You can create an engine in two ways: from a config file path (file-based) or from an in-memory pack (no filesystem, fully deterministic). Provide exactly one of configPath or pack.
Option 1: From a config file
import { createNarrixEngine } from "@exellix/narrix-engine";
const engine = await createNarrixEngine({
configPath: "./narrix-config/narrix-engine.config.json",
/**
* Optional: feature registry for mappings that call feature functions.
* If your mappings are path-only, omit this.
*/
featureRegistry: {
async execute(name, ctx) {
if (name === "net.subnet.isPublic") {
const isPublic = Boolean((ctx.input as any)?.isPublic);
return { passed: true, baggage: { value: isPublic } };
}
return { passed: false, baggage: { error: "unknown function" } };
}
}
});Paths in the config file are resolved relative to the config file’s directory.
Option 2: From an in-memory pack (no FS)
Use a pack when you want module-based resolution (e.g. assets from an npm package) with no file paths and no filesystem access. Fully deterministic.
import { createNarrixEngine } from "@exellix/narrix-engine";
import type { NarrixPack } from "@exellix/narrix-engine";
const pack: NarrixPack = {
id: "my-pack",
version: "1.0.0",
signalsCatalog: { schema: "signals.catalog.v1", signals: { /* ... */ } },
mappings: {},
rulePacks: {
// processorId -> passId -> array of resolved rule pack objects
one: {
scoping: [scopingRulePack],
discovery: [discoveryRulePack],
},
},
processors: [
{
id: "one",
entityKind: "test",
mappingKey: "minimal",
route: { datasetIds: ["test.dataset"], matchers: [{ exists: { path: "id" } }] },
passes: { scoping: { rulePackPaths: [] }, discovery: { rulePackPaths: [] } },
entityIdentity: { entityIdPath: "id", entityKey: { firstNonEmpty: [{ path: "id" }] } },
},
],
pipeline: {
passes: [
{ passId: "scoping", attachPath: "scoping", include: { facts: true, signals: true, stories: true } },
{ passId: "discovery", attachPath: "discovery", include: { facts: true, signals: true, stories: true } },
],
},
docs: {}, // optional
};
const engine = await createNarrixEngine({ pack });
const result = await engine.runOne({ cni, datasetId: "test.dataset" });NarrixPack shape:
| Field | Description |
|-------|-------------|
| id, version | Pack identifier and version |
| signalsCatalog | Resolved signals catalog object (no path) |
| mappings | Record<string, unknown> for compatibility |
| rulePacks | Record<processorId, Record<passId, unknown[]>> — resolved rule pack arrays per processor/pass |
| processors | Same shape as config v2 processors |
| pipeline | Same shape as config v2 pipeline |
| docs | Optional metadata |
If you pass both configPath and pack, or neither, the engine throws CONFIG_INVALID.
Mode 1: single record
V2 pipeline mode: runPipelineOne
Processor is selected by routing (datasetId, graphType, or matchers):
const result = await engine.runPipelineOne({
record: { id: "subnet-123", cidr: "10.0.1.0/24", isPublic: true },
datasetId: "demo.subnets", // preferred routing method
processorId: "subnet", // optional: explicit override
sourceMeta: { kind: "json", collection: "subnets.json" }
});
console.log(result.record._narrix.scoping.stories);
console.log(result.record._narrix.discovery.stories);
console.log(result.record._narrix.meta.entity); // { entityKind, entityId?, entityKey }Return shape:
type RunPipelineResult = {
record: any; // the enriched record with _narrix attachments
meta: {
processorId: string;
entityKind: string;
entityId?: string;
entityKey: string;
runId: string;
producedAt: number;
};
};V1 legacy mode: runOne
const result = await engine.runOne({
input: { id: "subnet-123", cidr: "10.0.1.0/24" },
subjectType: "subnet" // selects mappingIndex["subnet"]
});
console.log(result.scoper.stories);
console.log(result.scoper.signals);Mode 2: enrich a JSON payload
Supported input payload shapes
A) records: []
{
"schema": "demo.batch.v1",
"records": [
{ "id": "1", "cidr": "10.0.0.0/24" },
{ "id": "2", "cidr": "10.0.1.0/24" }
]
}B) recordsById: { ... }
{
"schema": "demo.batch.v1",
"recordsById": {
"1": { "id": "1", "cidr": "10.0.0.0/24" },
"2": { "id": "2", "cidr": "10.0.1.0/24" }
}
}Enrich in code (V2 pipeline mode)
const enriched = await engine.enrichJson({
payload,
datasetId: "demo.subnets", // optional but recommended for routing
processorId: "subnet", // optional: explicit processor override
recordSelector: { // optional: custom record path
type: "recordsPath",
path: "items"
},
sourceMeta: { kind: "json", fileName: "subnets.json" }
});
// returns same JSON shape; each record now has _narrix.scoping/_narrix.discovery/_narrix.metaEnrich in code (V1 legacy mode)
const enriched = await engine.enrichJson({
payload,
recordTypePath: "type" // required: path to subjectType in each record
});
// returns same JSON shape; each record has _narrix.{ signals, stories, outcomes, cni }Mode 3: enrich JSON file on disk
await engine.enrichFile({
datasetId: "demo.subnets",
inputPath: "./data/subnets.json",
outputPath: "./data/subnets.enriched.json"
});In-place overwrite:
await engine.enrichFile({
datasetId: "demo.subnets",
inputPath: "./data/subnets.json",
outputPath: "./data/subnets.json"
});CLI (included)
Help
npx @exellix/narrix-engine --helpSingle record (stdin → stdout)
V2 pipeline mode:
cat one.json \
| npx @exellix/narrix-engine run-one \
--config ./narrix-config/narrix-engine.config.json \
--datasetId demo.subnets \
> out.jsonV1 legacy mode:
cat one.json \
| npx @exellix/narrix-engine run-one \
--config ./narrix-config/narrix-engine.config.json \
--subjectType subnet \
> out.jsonEnrich batch JSON (stdin → stdout)
V2 pipeline mode (recommended):
cat batch.json \
| npx @exellix/narrix-engine enrich \
--config ./narrix-config/narrix-engine.config.json \
--datasetId demo.subnets \
> batch.enriched.jsonWith explicit processor override:
cat batch.json \
| npx @exellix/narrix-engine enrich \
--config ./narrix-config/narrix-engine.config.json \
--processorId vulnInstance \
> batch.enriched.jsonV1 legacy mode:
cat batch.json \
| npx @exellix/narrix-engine enrich \
--config ./narrix-config/narrix-engine.config.json \
--recordTypePath type \
> batch.enriched.jsonEnrich file (disk → disk)
V2 pipeline mode:
npx @exellix/narrix-engine enrich-file \
--config ./narrix-config/narrix-engine.config.json \
--datasetId demo.subnets \
--in ./data/batch.json \
--out ./data/batch.enriched.jsonV1 legacy mode:
npx @exellix/narrix-engine enrich-file \
--config ./narrix-config/narrix-engine.config.json \
--recordTypePath type \
--in ./data/batch.json \
--out ./data/batch.enriched.jsonPipelines and passes
In pipeline config (v2), the engine runs multiple passes over the same record in order. Each pass:
- Uses its own rule packs (configured per processor and pass in
processors[].passes). - Writes results under a namespaced path so outputs do not overwrite each other.
The pipeline defines the list of passes and where each pass attaches:
passId— identifier for the pass (e.g.scoping,discovery).attachPath— key under the configurable root (default_narrix). Results are written to_narrix.<attachPath>, e.g._narrix.scoping,_narrix.discovery.include— which parts of the scoper result to attach (facts, signals, stories, cni, baggage).
So the engine can produce:
_narrix.scoping.{ narratives, signals, facts, … }_narrix.discovery.{ … }_narrix.meta.{ source, entity, run, … }
Only one variable is configurable: attachToField (default _narrix). The rest is fixed by the pipeline and processor config. No per-source mapping is required beyond choosing the processor (by datasetId, graphType, or matchers).
Attachment contract
By default, each record receives one root attachment (configurable via enrichment.attachToField, default _narrix):
V2 pipeline mode attachment structure
{
"_narrix": {
"meta": {
"processorId": "subnet",
"entityKind": "subnet",
"entityId": "subnet-123",
"entityKey": "subnet-123",
"runId": "run-1730000000000-abc123",
"producedAt": 1730000000000,
"source": {
"kind": "json",
"datasetId": "demo.subnets",
"fileName": "subnets.json"
}
},
"scoping": {
"facts": [],
"signals": [],
"stories": []
},
"discovery": {
"facts": [],
"signals": [],
"stories": []
}
}
}Meta fields:
processorId— processor that handled this recordentityKind— entity type (fromprocessor.entityKind)entityId— optional canonical ID (fromentityIdentity.entityIdPath)entityKey— deterministic join key (computed fromentityIdentity.entityKey)runId— unique run identifierproducedAt— timestamp (milliseconds since epoch)source— source metadata (if provided)
V1 legacy mode attachment structure
{
"_narrix": {
"signals": [],
"stories": [],
"outcomes": [],
"cni": { /* ... */ },
"baggage": { /* ... */ },
"meta": { /* ... */ }
}
}Assumptions policy
Discovery (and similar) passes often emit assumptions (narratives with confidence < 1). The engine can enforce that these are correctly marked and evidenced.
Conventions
- Any story/narrative with
confidence< 1 should be treated as an assumption. - It must be explicitly marked, e.g.
meta.assumption === trueormeta.inferred === true(pick one convention in your packs). - If evidence is required, each such narrative must have
evidence.length > 0(references to supporting input paths or facts).
Enforcement toggles (per pass)
In pipeline.passes[].policies:
requireAssumptionMarking— iftrue, enforces bidirectional rules:- If
meta.assumption === true→ must haveconfidence < 1and (ifrequireEvidenceForAssumptionsis also true)evidence.length > 0. - If
confidence < 1→ must havemeta.assumption === trueormeta.inferred === true. If violated, the engine throws or attaches an error (perruntimepolicy).
- If
requireEvidenceForAssumptions— iftrue, any narrative withmeta.assumption === truemust haveevidence.length > 0. If violated, the engine throws or attaches an error.
This keeps packs honest and avoids silent guesses.
Join candidates (R6)
For cross-record joins (e.g. linking vulnerabilities to assets, CVEs to plugins), the engine supports two approaches:
Option B (recommended for now): Discovery rules emit facts like FACT_JOIN_ASSET_IP, FACT_JOIN_CVE_IDS, FACT_JOIN_PLUGIN_ID. Downstream phases read these from _narrix.discovery.facts. This requires no engine changes and keeps join logic in rule packs.
Option A (future): Engine-level extraction that copies specific facts/stories/meta keys into _narrix.discovery.joinCandidates for organized downstream consumption.
For Phase 1, use Option B. Option A can be added later if needed.
Gateways (sources and targets)
Source and target gateways are pluggable. The engine ships with default JSON adapters; you can add others (e.g. MongoDB) in one place without changing processors, packs, or pipeline logic.
Interfaces
- Source gateway —
read(spec)returns an async iterable of{ datasetId, record, sourceMeta }. - Target gateway —
write(spec, items)consumes an async iterable of the same shape and persists (e.g. to a file or database).
Default gateways (shipped)
- JSON source — reads array JSON files; supports payload shapes:
[],{ records: [] },{ recordsById: {} }. - JSON target — writes enriched output; preserves shape (array,
records, orrecordsById) and supports in-place overwrite in file mode.
Where to extend
To add a new source (e.g. MongoDB):
- Implement SourceGateway in
gateways/source/<kind>.ts(e.g.gateways/source/mongodb.ts). - Register it in the gateway registry (e.g.
registerSourceGateway(MongoSourceGateway)).
To add a new target (e.g. MongoDB writeback):
- Implement TargetGateway in
gateways/target/<kind>.ts(e.g.gateways/target/mongodb.ts). - Register it in the gateway registry.
Guarantee: Processors, rule packs, and pipeline definitions are unchanged. Only the gateway layer is extended.
Legacy config (v1) — still supported
If you use:
schema: "narrix.engine.config.v1"rulePackPaths+mappingIndexrunOne({ subjectType })orrecordTypePathfor batch
the engine runs in single-pass mode and attaches:
{
"_narrix": {
"signals": [...],
"stories": [...],
"outcomes": [...],
"cni": { ... }
}
}Pipeline mode (v2) is recommended for scoping + discovery workflows.
Error handling
Configurable via runtime:
onMissingMapping—"throw"(default) or"attachError"when mapping not foundonProcessorNotMatched—"throw"(default),"attachError", or"skip"when no processor matches- Assumption-policy violations — always throw (cannot be suppressed)
Recommended for batch processing: Use "attachError" so pipelines can continue:
{
"runtime": {
"onMissingMapping": "attachError",
"onProcessorNotMatched": "attachError"
}
}When errors are attached, they appear at runtime.errorAttachPath (default: _narrix.meta.error). Error shape includes code, processorId, mappingKey, passId, and message. In batch mode (enrichJson), errors are attached per-record (not abort-the-whole-batch).
Design principles
- Deterministic, auditable pipeline — same input produces same output
- Config from file or pack — file-based config (paths resolved from config location) or in-memory pack (no FS, e.g. for npm packages)
- JSON-pack-driven behavior — mappings, rules, signals catalog define behavior
- Clear boundaries:
- mappings/features normalize → CNI
- rules emit signals + stories
- classifier (optional) applies outcomes
- Multi-pass pipelines with namespaced attachments (
_narrix.scoping,_narrix.discovery) - Source/target extensibility via gateways (single place to change)
- Generic and reusable — not tied to specific domains (network security, etc.)
Hard requirements (Phase 1)
This engine implements the following hard requirements:
- R1 — Processor routing:
datasetId(preferred),graphTypeEquals, matchers (exists,eq,contains,startsWith) - R2 — Entity identity: deterministic
entityKeycomputation (firstNonEmpty,joinWithPipes), attached to_narrix.meta.entity.* - R3 — No complex selectors in config; extraction logic in narrator mappings + features (scoper layer)
- R4 — Multiple pipeline passes with per-pass rule packs and namespaced attachments
- R5 — Bidirectional assumption enforcement:
meta.assumption === true↔confidence < 1+ evidence - R6 — Join candidates: packs-level (Option B) via facts; engine-level (Option A) future enhancement
License
Proprietary (internal).
