@jr200-labs/xstate-nats
v0.9.17
Published
XState machine for NATS
Maintainers
Readme
@jr200-labs/xstate-nats
A state machine library that integrates XState v5 with NATS messaging system, providing a type-safe way to manage NATS connections, subscriptions, and Key-Value operations.
Features
- State Machine Management: Built on XState for predictable state transitions and side effects
- NATS Integration: Full support for NATS Core, JetStream, and Key-Value operations
- Authentication Support: Multiple auth types (decentralised, userpass, token)
- Connection Management: Automatic connection handling with retry logic and error recovery
- Subject Management: Subscribe, publish, and request-reply operations with state tracking
- Key-Value Store: KV bucket and key management with real-time subscriptions
Installation
pnpm add @jr200-labs/xstate-natsQuick Start
Basic Setup
import { natsMachine } from '@jr200-labs/xstate-nats'
import { useActor } from '@xstate/react'
function MyComponent() {
const [state, send] = useActor(natsMachine)
const connect = () => {
send({
type: 'CONFIGURE',
config: {
opts: {
servers: ['nats://localhost:4222'],
},
auth: {
type: 'userpass',
user: 'myuser',
pass: 'mypass',
},
maxRetries: 3,
},
})
send({ type: 'CONNECT' })
}
return (
<div>
<p>Status: {state.value}</p>
<button onClick={connect}>Connect</button>
</div>
)
}Subject Operations
// Subscribe to a subject
send({
type: 'SUBJECT.SUBSCRIBE',
config: {
subject: 'user.events',
callback: (data) => {
console.log('Received:', data)
},
},
})
// Publish to a subject
send({
type: 'SUBJECT.PUBLISH',
subject: 'user.events',
payload: { userId: 123, action: 'login' },
})
// Request-reply pattern
send({
type: 'SUBJECT.REQUEST',
subject: 'user.get',
payload: { userId: 123 },
callback: (reply) => {
console.log('Reply:', reply)
},
})Key-Value Operations
// Create a KV bucket
send({
type: 'KV.BUCKET_CREATE',
bucket: 'user-sessions',
onResult: (result) => {
if (result.ok) {
console.log('Bucket created successfully')
}
},
})
// Put a value
send({
type: 'KV.PUT',
bucket: 'user-sessions',
key: 'user-123',
value: { sessionId: 'abc123', expiresAt: Date.now() },
onResult: (result) => {
if (result.ok) {
console.log('Value stored successfully')
}
},
})
// Get a value
send({
type: 'KV.GET',
bucket: 'user-sessions',
key: 'user-123',
onResult: (result) => {
if ('error' in result) {
console.error('Error:', result.error)
} else {
console.log('Value:', result)
}
},
})
// Subscribe to KV changes
send({
type: 'KV.SUBSCRIBE',
config: {
bucket: 'user-sessions',
key: 'user-123',
callback: (entry) => {
console.log('KV Update:', entry)
},
},
})State Machine States
The NATS machine operates in the following states:
not_configured: Initial state, waiting for configurationconfigured: Configuration received, ready to connectconnecting: Attempting to establish NATS connectioninitialise_managers: Setting up subject and KV managersconnected: Fully connected and operationalclosing: Gracefully disconnectingclosed: Connection closed, can reconnecterror: Error state, can reset and retry
API Reference
Main Exports
natsMachine: The main XState machine for NATS operationsKvSubscriptionKey: Type for KV subscription keysparseNatsResult: Utility for parsing NATS operation resultsAuthConfig: Type for authentication configuration
Authentication
// Decentralised auth
auth: {
type: 'decentralised',
sentinelB64: 'base64-encoded-sentinel',
user: 'username',
pass: 'password'
}
// User/password auth
auth: {
type: 'userpass',
user: 'username',
pass: 'password'
}
// Token auth
auth: {
type: 'token',
token: 'your-token'
}Events
Connection Events
CONFIGURE: Set connection configurationCONNECT: Establish connectionDISCONNECT: Close connectionRESET: Reset to initial state
Subject Events
SUBJECT.SUBSCRIBE: Subscribe to a subjectSUBJECT.UNSUBSCRIBE: Unsubscribe from a subjectSUBJECT.PUBLISH: Publish to a subjectSUBJECT.REQUEST: Send request-replySUBJECT.UNSUBSCRIBE_ALL: Clear all subscriptions
KV Events
KV.BUCKET_CREATE: Create a KV bucketKV.BUCKET_DELETE: Delete a KV bucketKV.BUCKET_LIST: List KV bucketsKV.PUT: Store a valueKV.GET: Retrieve a valueKV.DELETE: Delete a valueKV.SUBSCRIBE: Subscribe to KV changesKV.UNSUBSCRIBE: Unsubscribe from KV changesKV.UNSUBSCRIBE_ALL: Unsubscribe from all KV changes
OpenTelemetry
This library emits OpenTelemetry spans for NATS operations and propagates W3C trace context across the wire so a single trace can span publisher, subscriber, and request/reply replier.
Emitted spans
| Span name | Emitted by | Attributes |
| ----------------------- | -------------------- | ---------------------------------------- |
| xstate.nats.subscribe | SUBJECT.SUBSCRIBE | subject |
| xstate.nats.message | per received message | subject, payload.bytes |
| xstate.nats.publish | SUBJECT.PUBLISH | subject, payload.bytes |
| xstate.nats.request | SUBJECT.REQUEST | subject, payload.bytes, timeout.ms |
| xstate.nats.reconnect | NATS status loop | reconnect.type |
| xstate.nats.lifecycle | root machine | xstate.state, xstate.event |
| xstate.nats.kv.watch | KV.SUBSCRIBE | bucket, key |
| xstate.nats.kv.entry | per KV watch entry | bucket, key, operation |
All error paths record exceptions on the active span, set span status to
ERROR, and emit a named event (xstate.nats.error / xstate.nats.kv.error)
with a truncated stack.
Lifecycle diagnostics
Root machine lifecycle diagnostics are opt-in and separate from raw NATS protocol frame logging. Enable them on the connection config:
send({
type: 'CONFIGURE',
config: {
opts: { servers: ['wss://example-nats'] },
diagnostics: { lifecycle: true },
maxRetries: 3,
},
})When enabled, the root machine emits sanitized xstate.nats.lifecycle spans and
console.debug('xstate-nats lifecycle', attributes) breadcrumbs for configure,
connect, close, manager initialization, and NATS status events. Diagnostics
include state, event type, server URLs, debug/verbose flags, retry count, auth
type, and manager readiness flags. They do not include credentials, JWTs, nkeys,
signatures, passwords, tokens, or raw protocol frames.
Enabling tracing
@opentelemetry/api is a peer dependency — the consumer controls the installed
version and registers the SDK. If no provider is registered all telemetry calls
become no-ops. Minimal setup:
import { trace, propagation, context } from '@opentelemetry/api'
import { AsyncLocalStorageContextManager } from '@opentelemetry/context-async-hooks'
import { W3CTraceContextPropagator } from '@opentelemetry/core'
import { BasicTracerProvider, SimpleSpanProcessor } from '@opentelemetry/sdk-trace-base'
const provider = new BasicTracerProvider({
spanProcessors: [
/* your exporter */
],
})
trace.setGlobalTracerProvider(provider)
propagation.setGlobalPropagator(new W3CTraceContextPropagator())
const ctxMgr = new AsyncLocalStorageContextManager()
ctxMgr.enable()
context.setGlobalContextManager(ctxMgr)Context propagation
Publish and request operations inject traceparent into the outgoing NATS
headers; received messages extract the traceparent and parent their
xstate.nats.message span on it. Downstream services that propagate the header
appear as children of the originating publisher/requester span.
Examples
Check out the React example for a complete working implementation.
Development
Prerequisites
- Node.js 22+
- pnpm
Setup
make installCommands
make check # Run prettier + lint (tsc --noEmit)
make test # Run tests
make build # Build the library
make bump PART=patch # Bump version (major|minor|patch)
make release # Tag, push, and create GitHub release
make publish # Build, test, and publish to npmLicense
MIT
