raft-logic
v0.3.13
Published
Node.js wrapper around a WASM build of tikv/raft-rs (via wasm-bindgen).
Maintainers
Readme
raft-logic
Node.js library that wraps a WebAssembly build of the etcd/tikv raft implementation (raft-rs), exposing a small, promise-based ES module API for building Raft nodes in JavaScript.
- ESM-only (Node 18+)
- Contains a minimal in-memory transport and storage for testing/examples
- WASM artifacts are bundled in the published package; no external download needed
- Deterministic control helpers for tests: transferLeadership, stepDown, runUntilStableLeader
- Event-driven signals and waiters: onRoleChange, onCommitAdvanced, onQuorumActive; waitForLeaderStable(), waitForHeartbeatRound(), waitForFirstStableLeader()
Installation
- npm install raft-logic
Requirements
- Node.js >= 18
- ESM environment (either use .mjs files or set "type": "module" in your package.json)
Quick start (single node)
import { RaftNode, InMemoryTransport, InMemoryStorage } from 'raft-logic';
async function run() {
const transport = new InMemoryTransport();
const storage = new InMemoryStorage();
const applied = [];
const node = new RaftNode({
id: '1',
peers: ['1'], // single-node cluster
electionTick: 10,
heartbeatTick: 1,
transport,
storage,
apply: async (entry) => {
if (entry.data) {
const data = Buffer.from(entry.data, 'base64').toString('utf8');
applied.push(data);
console.log('[apply] committed entry:', data);
}
},
tickIntervalMs: 50,
});
await node.start();
// Wait until a leader exists (for single node this should be quick)
await node.waitForLeader(5000);
// Ergonomic client request: auto-forward (if follower), and optionally wait for local apply
await node.clientRequest('hello-from-node', { waitFor: 'apply', timeout: 2000 });
console.log('Applied entries:', applied);
await node.stop();
}
run().catch((e) => {
console.error(e);
process.exit(1);
});Threaded quick start (worker thread)
import { ThreadedRaftNode, InMemoryTransport } from 'raft-logic';
async function run() {
const transport = new InMemoryTransport();
const applied = [];
const node = new ThreadedRaftNode({
id: '1',
peers: ['1'],
electionTick: 10,
heartbeatTick: 1,
transport,
apply: async (entry) => {
if (entry.data) {
const data = Buffer.from(entry.data, 'base64').toString('utf8');
applied.push(data);
console.log('[apply/main-thread] committed entry:', data);
}
},
tickIntervalMs: 50,
preVote: true,
});
await node.start();
await node.waitForLeader(5000);
// Propose via clientRequest and wait for local apply
const res = await node.clientRequest('hello-from-threaded', { waitFor: 'apply', timeout: 2000 });
console.log('Proposed at index', res.index, 'term', res.term);
await node.stop();
}
run().catch((e) => { console.error(e); process.exit(1); });API overview (high level)
class RaftNode(options)
- options:
- id: string (stringified u64)
- peers: string[] (initial voter set, stringified u64)
- electionTick: number
- heartbeatTick: number
- transport: { send(fromId: string, msgs: object[]): Promise, register?: (node: RaftNode) => void }
- storage?: { initialState?(id: string): Promise, persistReady?(id: string, ready: any): Promise }
- apply(entry): function called for each committed entry
- tickIntervalMs?: number (default ~100ms)
- preVote?: boolean (default false)
- checkQuorum?: boolean (default false)
- onStateUpdate?(snapshot): optional callback to observe internal state cache updates
- onDrain?(): optional callback when the Ready queue is drained
- metrics?: { onPropose?(): void, onRejected?(reason: string, meta?: object): void, onRoleChange?(prev: string, next: string): void }
- methods:
- start(): Promise
- stop(opts?): Promise
- opts: { drainApply?: boolean, drainTicks?: boolean } to quiesce before stopping
- status(): Promise<{ role, term, lead, commitIndex, lastApplied, lastLogIndex, raft_state } | null>
- propose(data: Uint8Array | ArrayBuffer | string): Promise
- Throws NotLeaderError on follower/candidate; safe to call on single-node or when leader hint points to self
- schedulePropose(data, opts?): Promise<{ index, term }>
- Defers propose out of apply() context; safe alternative to propose() in re-entrant scenarios
- clientRequest(data, opts?): Promise<{ index, term }>
- opts: { autoForward?: boolean=true, waitFor?: 'none'|'commit'|'apply'='none', timeout?: number, abortSignal?: AbortSignal }
- Auto-forwards to current leader if called on a follower (when transport supports it). Optionally waits for commit or local apply.
- waitForLeader(timeoutMs?): Promise — resolves with leaderId when a stable leader exists
- waitForLeaderStable(options?): Promise — event-driven waiter; options { requireQuorum=true, minTermResidencyTicks=0, signal?: AbortSignal }
- waitForHeartbeatRound(n=1, options?): Promise — resolves after n majority heartbeat rounds in current term
- waitForCommit(index, { localApply?: boolean, timeout?: number, abortSignal?: AbortSignal }): Promise
- Waits for index to be committed cluster-wide and (optionally) applied locally
- readIndex(opts?): Promise — leader-gated lease read; returns safe commit index
- barrier(): Promise
- Leader: resolves when all prior proposals are committed
- Follower: resolves when all prior proposals are committed and applied locally
- manualTick(): Promise, advanceTicks(n): Promise
- Deterministic testing helpers to manually drive ticks
- step(msg: object): Promise
- campaign(): Promise
- transferLeadership(targetId, timeoutMs?): Promise — deterministic leader handoff to a target peer (test helper)
- stepDown(timeoutMs?): Promise — relinquish leadership and trigger re-election/transfer
- runUntilStableLeader(timeoutMs?): Promise — wait for a stable leader and a short stability window
- options:
class ThreadedRaftNode(options)
- Same high-level behavior as RaftNode but runs the raft core (WASM + storage + drain loop) inside a worker thread.
- Additional methods and signals:
- onStateChange(cb), onBecameLeader(cb), onBecameFollower(cb): subscribe to state updates
- transferLeadership(targetId, timeoutMs?): Promise — test helper via MsgTransferLeader
- stepDown(timeoutMs?): Promise — test helper via MsgTimeoutNow
- runUntilStableLeader(timeoutMs?): Promise — waits for a leader and a stability window (no role/lead changes)
- readIndex(opts?): Promise — leader-gated lease read; returns safe commit index
- stop(opts?): Promise — accepts { drainApply?: boolean, drainTicks?: boolean }
Adapters
- InMemoryTransport: routes messages within the same process for testing.
- sendClientRequest(leaderId, data, opts) — in-memory convenience used by clientRequest auto-forwarding
- Utilities:
- leaderForwarder(transport, getLeaderId): returns async function to route clientRequest to the current leader
- waitForFirstStableLeader(nodes, options?): Promise — Promise.any over nodes’ waitForLeaderStable to pick the first stable leader
- waitForReadyLeader(node, options?): Promise — composite helper that waits for a leader, drains backlog, and (optionally) enforces a lease-based read barrier before treating the node as ready for client operations.
- InMemoryStorage: demonstrates the persist-before-advance contract (not durable)
- SqliteStorage: production-oriented durable storage
- getDb(): returns the underlying better-sqlite3 Database handle for co-located app tables
- onOpen(db): optional callback invoked when the DB is opened to initialize co-located tables
- InMemoryTransport: routes messages within the same process for testing.
Typed errors
- NotLeaderError: { code: 'NotLeader', term: number, leaderId?: string, forwardHint?: { id?: string } }
- TimeoutError: { code: 'Timeout' }
- RejectedError: { code: 'Rejected', reason?: 'Reentrancy' | 'NotLeader' | ... }
Status/observability
- status(): returns a richer snapshot: { role: 'leader'|'follower'|'candidate', term, lead, commitIndex, lastApplied, lastLogIndex, raft_state }
- Event-style callbacks on ThreadedRaftNode: onStateChange, onBecameLeader, onBecameFollower
- Barrier and waiter APIs for deterministic sequencing in tests and app logic
Examples: metrics and readIndex
import { RaftNode, InMemoryTransport, SimpleMetrics } from 'raft-logic';
const transport = new InMemoryTransport();
const metrics = new SimpleMetrics();
const node = new RaftNode({
id: '1',
peers: ['1'],
electionTick: 10,
heartbeatTick: 1,
transport,
apply: () => {},
metrics, // enable metrics hooks
});
await node.start();
await node.waitForLeader(5000);
// Metrics counters increment on proposals and rejections
await node.clientRequest('ex', { waitFor: 'commit', timeout: 2000 });
console.log('metrics snapshot', metrics.snapshot());
// Linearizable read (lease-based): returns a safe commit index
const safeIndex = await node.readIndex({ timeout: 2000 });
// perform your application read knowing state ≥ safeIndex
await node.stop({ drainApply: true, drainTicks: true });Deterministic testing helpers
- Set tickIntervalMs in RaftNode to 0 (or use ThreadedRaftNode helpers) and drive ticks via manualTick()/advanceTicks(n) to remove wall clock coupling.
- transferLeadership(targetId), stepDown() allow deterministic failover in tests.
- runUntilStableLeader(timeout) blocks until a stable leader is observed (and holds through a short stability window).
Example: deterministic leadership transfer and step down
import { ThreadedRaftNode, InMemoryTransport } from 'raft-logic';
const transport = new InMemoryTransport();
const peers = ['1','2','3'];
const common = {
peers,
electionTick: 10,
heartbeatTick: 1,
preVote: true,
checkQuorum: false,
transport,
apply: async () => {},
tickIntervalMs: 0 // manual ticking for determinism
};
const n1 = new ThreadedRaftNode({ id: '1', ...common });
const n2 = new ThreadedRaftNode({ id: '2', ...common });
const n3 = new ThreadedRaftNode({ id: '3', ...common });
await Promise.all([n1.start(), n2.start(), n3.start()]);
// helper to drive ticks across the cluster
async function driveTicks(rounds = 100) {
for (let i = 0; i < rounds; i++) {
await Promise.all([n1.manualTick(), n2.manualTick(), n3.manualTick()]);
}
}
// elect a leader deterministically
await driveTicks(200);
const leaderId = await n1.runUntilStableLeader(5000);
console.log('Leader elected:', leaderId);
// transfer leadership to node 2
if (leaderId !== '2') {
const leaderNode = leaderId === '1' ? n1 : (leaderId === '2' ? n2 : n3);
await leaderNode.transferLeadership('2', 5000);
await driveTicks(200);
}
// ask node 2 to step down and observe a new leader
await n2.stepDown(5000);
await driveTicks(200);
const nextLeader = await n1.runUntilStableLeader(5000);
console.log('New leader:', nextLeader);
await Promise.all([n1.stop(), n2.stop(), n3.stop()]);Durable storage: SqliteStorage
- Upholds persist-before-advance:
- snapshot (compaction: delete entries <= snapshot.index)
- entries (tail replacement: delete entries >= firstNew, then insert)
- hardState (term, vote, commit)
- Durable PRAGMAs on open: WAL + synchronous=FULL by default (configurable).
- getDb(): gives access to the same database handle for co-located app tables.
- onOpen(db): optional hook to set up your co-located schema.
- Optional applyWithDb(entry, db): if provided, called during apply with the same handle used by raft-logic. This enables efficient, consistent side-effects (best-effort; user manages app-level transactions).
Usage (durable)
import { RaftNode, InMemoryTransport, SqliteStorage } from 'raft-logic';
const transport = new InMemoryTransport();
const storage = new SqliteStorage({
file: './data/node-1.sqlite',
onOpen(db) {
db.exec('CREATE TABLE IF NOT EXISTS outbox(id INTEGER PRIMARY KEY, payload TEXT)');
}
});
storage.open();
const node = new RaftNode({
id: '1',
peers: ['1','2','3'],
electionTick: 10,
heartbeatTick: 1,
transport,
storage,
apply: async (entry) => { /* your state machine */ },
});
await node.start();
await node.waitForLeader(5000);
const { index, term } = await node.clientRequest('do-something', { waitFor: 'commit', timeout: 2000 });
console.log('Committed at index', index, 'term', term);
await node.stop();
storage.close();Notes on WASM
- The package bundles the wasm-bindgen output under ./wasm and the loader automatically initializes it in Node.js.
- You normally do not need to call loadWasm() directly; RaftNode.start() will initialize it if needed.
- Low-level helpers:
- loadWasm(customUrl?): Promise
- wasmReady(): Exports
- ready(customUrl?): Promise
Examples
- Single node (threaded):
- npm run example:single-threaded
- Three nodes (in-memory):
- npm run example:three
- Three nodes + SQLite:
- npm run example:three-sqlite
- Restart + recovery demo:
- npm run example:three-sqlite-restart
Implementation notes
- Schema avoids reserved SQLite keywords:
- hard_state.commit_index (not "commit")
- snapshot.snap_index (not "index")
- Entries persist the entry type as etype; the host JSON boundary uses entryType.
Changelog (recent)
- New event signals: onRoleChange, onCommitAdvanced, onQuorumActive
- New waiters: waitForLeaderStable(options), waitForHeartbeatRound(n), and waitForFirstStableLeader(nodes, options)
- loader.ready(): Promise ensures WASM initialized without polling
- Added clientRequest(entry, opts) with auto-forwarding, waitFor=commit/apply, timeout/abort.
- propose() returns NotLeaderError on followers (with leaderId, term, forwardHint).
- Leadership signals and barriers:
- onBecameLeader/onBecameFollower/onStateChange
- waitForLeader(), barrier()
- Waiters: waitForCommit(index, { localApply, timeout, abortSignal })
- Deterministic testing: manualTick(), advanceTicks(), transferLeadership(), stepDown(), runUntilStableLeader()
- SqliteStorage: getDb(), onOpen(db), optional applyWithDb(entry, db)
- Richer status(): role, term, lead, commitIndex, lastApplied, lastLogIndex
- Typed errors: NotLeaderError, TimeoutError, RejectedError
- Metrics hooks: metrics.onPropose(), metrics.onRejected(reason, meta), metrics.onRoleChange(prev, next)
- readIndex(): leader-gated lease-based linearizable read; returns commit index
- stop(opts): { drainApply, drainTicks } to quiesce before freeing
- Utilities: leaderForwarder(transport, getLeaderId)
WASM Lifecycle Control and Diagnostics
New APIs have been added to give developers explicit control and visibility over the WebAssembly runtime lifecycle.
These are especially useful in long-lived or test-driven environments where multiple Raft clusters are created and destroyed in one process.
Prevent premature freeing
import { disableAutoFree } from "raft-logic/loader.mjs";
disableAutoFree(); // Keeps the WASM runtime alive for the entire processor equivalently:
import { retainWasm } from "raft-logic/loader.mjs";
retainWasm(true);Global singleton loader
Ensures all RaftNodes share the same WASM instance:
import { getWasmInstance } from "raft-logic/loader.mjs";
const wasm = await getWasmInstance();Diagnostics
Inspect the current WASM runtime state:
import { getWasmStatus } from "raft-logic/loader.mjs";
console.log(getWasmStatus()); // { refCount: 0, freed: false, autoFreeDisabled: true }Controlled shutdown
When auto-free is disabled, the runtime will not be freed automatically:
import { controlledShutdownWasm } from "raft-logic/loader.mjs";
await controlledShutdownWasm(); // Skips freeing if disableAutoFree() was calledThese APIs make raft-logic more robust for frameworks, test suites, and long-lived processes.
Diagnostic Tests and Optional Logging
A new diagnostic test has been added to help verify multi-cluster Raft behavior and isolate issues such as worker lifecycle or election stalls.
Running the diagnostic test
npm test --silent -- test/multi-instance-diagnostics.test.mjsThis test creates two independent Raft clusters and drives them manually using manualTick().
It verifies that both clusters elect leaders independently and remain isolated.
Optional logging
The diagnostic test supports an enableLogs option to toggle detailed Raft logs:
const clusterA = await makeCluster('A', { enableLogs: true });
const clusterB = await makeCluster('B', { enableLogs: false });When enableLogs is true, detailed [apply ...] and Raft debug logs are printed.
When false, the test runs silently except for high-level diagnostic messages.
License
- MIT OR Apache-2.0
