@quanticjs/events-kafka
v8.2.0
Published
Kafka transport for @quanticjs/events-core — KafkaEventPublisher, KafkaEventConsumer with ADR-008 compliance
Downloads
2,442
Readme
@quanticjs/events-kafka
Kafka transport for @quanticjs/events-core — KafkaEventPublisher, KafkaEventConsumer with retry/DLQ machinery, inbox dedup, consumer health, and Prometheus metrics.
Metrics
All metrics are registered against the prom-client default global registry (promClient.register), the same registry @quanticjs/metrics' MetricsController serves at GET /metrics. There is no DI coupling between the packages:
@quanticjs/metricsabsent: events-kafka still records into the default registry (if the consumer app hasprom-clientinstalled), but no endpoint serves it — the app simply has no/metrics.prom-clientabsent: all metrics are silent no-ops.
| Name | Type | Labels |
|---|---|---|
| quanticjs_events_published_total | Counter | topic |
| quanticjs_events_consumed_total | Counter | topic, group, status |
| quanticjs_events_processing_duration_seconds | Histogram | topic, group |
| quanticjs_events_dlq_total | Counter | topic, error_category |
| quanticjs_events_consumer_lag | Gauge | topic, group, partition |
| quanticjs_events_last_processed_timestamp_seconds | Gauge | topic, group |
Consumer lag
quanticjs_events_consumer_lag is updated by a periodic admin/watermark probe (lagProbeIntervalMs, default 30000; 0 disables it) and reports max(0, high watermark − position) per assigned partition. Probe failures are logged at debug and never affect consumption; the gauge keeps its last value.
Detecting a stuck consumer
A true oldest-unprocessed-age gauge is not feasible cheaply (it would require fetching the next unconsumed message per partition per probe). Use the last-processed-timestamp proxy — alert when both hold:
time() - quanticjs_events_last_processed_timestamp_seconds > 300
and
quanticjs_events_consumer_lag > 0Lag with no recent processing means the consumer is stuck, not idle.
Two prom-client copies (version skew)
prom-client is a peerDependency of both @quanticjs/metrics and @quanticjs/events-kafka so npm hoists a single copy. If version skew ever produces two copies in node_modules, each has its own default registry and kafka metrics silently disappear from /metrics again. Check with:
npm ls prom-client # must show exactly one resolved versionConsumer health & boot behavior
Each consumer tracks a status (connecting → running; crashed on run-loop failure; disconnected on shutdown) and registers itself with KafkaConsumerStatusRegistry, which @quanticjs/health's HealthRegistry auto-detects (via the KAFKA_CONSUMER_STATUS token from @quanticjs/core) as a kafka_consumers readiness check. A crashed run-loop turns readiness red within one health-cache TTL (default 5000ms) and Kubernetes restarts the pod — there is no in-process auto-restart in v7.
Boot connect options on KafkaEventsModuleOptions:
connectRetries?: number; // default 5
connectRetryBaseMs?: number; // default 1000; exponential ×2, cap 30000, full jitter
connectFailurePolicy?: 'fail' | 'degrade'; // default 'fail'
lagProbeIntervalMs?: number; // default 30000; 0 disables the lag probeWith 'fail' (default) boot throws after retry exhaustion. With 'degrade' the app boots, the kafka_consumers readiness check reports the consumer crashed (connect_failed), and connect retries continue in the background; on success the consumer starts and readiness recovers.
