@nxtedition/shared
v5.1.13
Published
Cross-thread primitives for Node.js worker threads
Maintainers
Keywords
Readme
@nxtedition/shared
Cross-thread primitives for Node.js worker threads.
Install
npm install @nxtedition/sharedAPI
SharedArrayBuffer registry
A process-wide, thread-safe registry for SharedArrayBuffers. Keys are strong references; backing stores are stored as weak references. If all JS references to a SharedArrayBuffer are garbage collected, the factory is called again on the next getOrCreate.
import { getOrCreate } from '@nxtedition/shared'
// Pass a byte size — a SharedArrayBuffer is created automatically
const sab = getOrCreate('my-buffer', 1024)
// Or pass a factory function for custom creation
const sab2 = getOrCreate('my-other-buffer', (key) => new SharedArrayBuffer(2048))A practical use case is application-wide stats counters. Instead of plumbing parentPort.postMessage calls to propagate metrics from workers to the main thread, every thread can atomically update a shared counter directly:
// stats.js — import from any thread
import { getOrCreate } from '@nxtedition/shared'
const counters = new Int32Array(getOrCreate('app:stats', 4 * 4))
export const REQUESTS = 0
export const ERRORS = 1
export const BYTES_IN = 2
export const BYTES_OUT = 3
export function inc(index, delta = 1) {
Atomics.add(counters, index, delta)
}
export function snapshot() {
return {
requests: Atomics.load(counters, REQUESTS),
errors: Atomics.load(counters, ERRORS),
bytesIn: Atomics.load(counters, BYTES_IN),
bytesOut: Atomics.load(counters, BYTES_OUT),
}
}Any worker calls inc(REQUESTS) on the hot path; the main thread calls snapshot() to read all counters without any message passing overhead.
getOrCreate(key, sizeOrCallbackFn)
Returns an existing SharedArrayBuffer for key, or creates and registers one. Thread-safe (mutex-protected).
- key
string-- Registry key. - sizeOrCallbackFn
number | (key: string) => SharedArrayBuffer-- Either a positive integer byte size (creates anew SharedArrayBuffer(size)automatically), or a factory function called (under lock) when the key has no live entry.
Throws if called recursively from within a callback.
Cross-thread lock
A simple mutex built on SharedArrayBuffer and Atomics. Inspired by the Web Locks API but works across worker threads without message passing.
import { withLock } from '@nxtedition/shared'
const result = await withLock('my-resource', async () => {
// exclusive access across all threads
return doWork()
})withLock(key, fn, opaque?, opts?)
Acquires a cross-thread lock identified by key, executes fn, and releases the lock. If the lock is held by another thread, waits asynchronously (via Atomics.waitAsync) until it becomes available. The lock is always released when fn returns or throws.
- key
string-- Lock name (scoped via the registry). - fn
(opaque?) => T | Promise<T>-- Function to execute under the lock. - opaque (optional) -- Passed through to
fnto avoid closures on hot paths. - opts.signal
AbortSignal(optional) -- Aborts the lock acquisition. If the signal fires while waiting, the promise rejects and the lock is not acquired.
Returns Promise<T> with the return value of fn.
Ring buffer
A high-performance, lock-free ring buffer for inter-thread communication using SharedArrayBuffer.
A single SharedArrayBuffer is mapped into both threads. The writer appends messages by advancing a write pointer; the reader consumes them by advancing a read pointer. No copies, no ownership transfers, no cloning overhead. Reads are zero-copy: the reader callback receives a DataView directly into the shared buffer. Writes are batched -- the write pointer is only published after a high-water mark is reached or the current event loop tick ends.
import { Reader, Writer } from '@nxtedition/shared'
const w = new Writer(1024 * 1024) // 1 MB ring buffer
const payload = Buffer.from('hello world')
w.writeSync(payload.length, (data) => {
payload.copy(data.buffer, data.byteOffset)
return data.byteOffset + payload.length
})
// Pass w.handle to the other thread via workerData
const r = new Reader(w.handle)
r.readSome((data) => {
const msg = data.buffer.subarray(data.byteOffset, data.byteOffset + data.byteLength).toString()
console.log(msg) // 'hello world'
})new Reader(handleOrSize)
Creates a reader for the ring buffer.
- handleOrSize --
SharedHandlefromwriter.handle, or a positive integer to allocate a new ring buffer.
reader.handle
The underlying SharedHandle. Pass to another thread via workerData.
reader.stats
Returns { readCount, readBytes }.
reader.readSome(next, opaque?)
Reads a batch of messages. Calls next(data, opaque) for each message. data has buffer, view, byteOffset, byteLength. Return false to stop early. Returns the number of messages processed.
new Writer(handleOrSize, options?)
Creates a writer for the ring buffer.
Options:
yield?: () => void-- Called when the writer must wait for the reader.logger?: { warn(obj, msg): void }-- Logger for yield warnings.
writer.handle
The underlying SharedHandle.
writer.maxMessageSize
Maximum payload size for a single write.
writer.stats
Returns { yieldCount, yieldTime, writeCount, writeBytes }.
writer.writeSync(len, fn, opaque?)
Synchronously writes a message. Blocks until buffer space is available. fn(data, opaque) must return the end position (data.byteOffset + bytesWritten).
writer.tryWrite(len, fn, opaque?)
Non-blocking write attempt. Returns false if the buffer is full.
writer.cork(callback?)
Batches writes. Published when uncork is called or the callback returns.
writer.uncork()
Releases the cork and publishes pending writes.
writer.flushSync()
Immediately publishes pending writes regardless of cork state.
Native binding
The ring buffer relies on a native C++ addon for double-mapped virtual memory (contiguous reads/writes across the ring boundary) and huge page support on Linux. The SharedArrayBuffer registry uses V8's BackingStore API to hold weak references to backing stores across threads.
License
MIT