message-nexus
v1.2.0
Published
A unified, type-safe, multi-protocol cross-context message communication library
Maintainers
Readme
message-nexus
A unified, type-safe cross-context message communication library supporting multiple transport protocols.
Installation
npm install message-nexus
# or
pnpm add message-nexusFeatures
- Unified Interface: Supports Mitt (in-process), PostMessage (iframe/window), BroadcastChannel (cross-tab), and WebSocket (network communication)
- JSON-RPC 2.0 Compliance: Strict adherence to the JSON-RPC 2.0 specification for standardized communication
- Envelope Pattern: Extensible message envelope containing routing information (from, to) and metadata
- Type Safety: Full TypeScript support with generic type inference
- Request-Response Pattern: Promise-style asynchronous communication with built-in timeout protection
- Auto Reconnect: WebSocket automatic reconnection mechanism with exponential backoff support
- Message Queue: Offline message caching, automatically sent after connection recovery
- Retry Mechanism: Automatic retry on request failure, configurable retry counts and delays
- Message Validation: Runtime message format validation to prevent illegal messages
- Monitoring Metrics: Built-in message statistics and performance monitoring
- Message Interceptors: Hook into the outgoing request and incoming response pipelines to modify payloads, inject metadata, or implement global logging
- Cross-Context Errors: Preserves error
nameandstacktraces across communication bridges for seamless debugging - Structured Logging: Supports adjustable log levels (DEBUG, INFO, WARN, ERROR) and custom log handlers or simple loggers (like
console) for easy debugging and production monitoring. - Resource Management: All drivers support the
destroy()method to properly clean up resources.
Quick Start
1. In-Process Communication (Mitt)
import MessageNexus, { MittDriver, createEmitter } from 'message-nexus'
// Shared emitter
const emitter = createEmitter()
const driver = new MittDriver(emitter)
const nexus = new MessageNexus(driver)
// Send request
const response = await nexus.invoke('GET_DATA', { id: 123 })
console.log(response)
// Send one-way notification
nexus.notify('UPDATE_STATUS', { status: 'active' })
// Listen for commands
const receiverDriver = new MittDriver(emitter)
const receiverNexus = new MessageNexus(receiverDriver)
const unsubscribe = receiverNexus.handle('GET_DATA', (params, context) => {
return { name: 'test', value: 42 }
})
// Listen for notifications
const unsubscribeNotify = receiverNexus.onNotification('UPDATE_STATUS', (params, context) => {
console.log('Notification received:', params)
})2. iframe/Window Communication (PostMessage)
import MessageNexus, { PostMessageDriver } from 'message-nexus'
// Sender
const driver = new PostMessageDriver(window.parent, 'https://example.com')
const nexus = new MessageNexus(driver)
const response = await nexus.invoke('PING')
console.log('Pong:', response)
// Receiver
const iframeDriver = new PostMessageDriver(iframe.contentWindow, 'https://example.com')
const iframeNexus = new MessageNexus(iframeDriver)
iframeNexus.handle('PING', (params, context) => {
return { time: Date.now() }
})3. Cross-Tab Communication (BroadcastChannel)
import MessageNexus, { BroadcastDriver } from 'message-nexus'
// Create BroadcastDriver, specifying the channel name
const driver = new BroadcastDriver({ channel: 'my-app-channel' })
const nexus = new MessageNexus(driver)
// Listen for commands
nexus.handle('SYNC_STATE', (params, context) => {
console.log('Received:', params)
return { result: 'success' }
})
// Send request (will be broadcast to all tabs on the same channel)
const response = await nexus.invoke({
method: 'SYNC_STATE',
params: { state: '...' },
})
// Receiver
const receiverDriver = new BroadcastDriver({ channel: 'my-app-channel' })
const receiverNexus = new MessageNexus(receiverDriver)
receiverNexus.handle('SYNC_STATE', (params, context) => {
console.log('Received:', params)
return { result: 'success' }
})4. WebSocket Communication
import MessageNexus, { WebSocketDriver } from 'message-nexus'
// Automatic reconnection configuration
const driver = new WebSocketDriver({
url: 'wss://api.example.com/ws',
reconnect: {
maxRetries: 5, // Maximum retry count
retryInterval: 3000, // Retry interval (milliseconds)
},
})
const nexus = new MessageNexus(driver)
// Send request
const response = await nexus.invoke({
method: 'GET_USER',
params: { userId: 123 },
timeout: 5000,
retryCount: 3, // Retry 3 times on failure
retryDelay: 1000, // Retry delay
})
// Receiver
const receiverDriver = new WebSocketDriver({
url: 'wss://api.example.com/ws',
reconnect: {
maxRetries: 5, // Maximum retry count
retryInterval: 3000, // Retry interval (milliseconds)
},
})
const receiverNexus = new MessageNexus(receiverDriver)
receiverNexus.handle('SYNC_STATE', (params, context) => {
console.log('Received:', params)
return { result: 'success' }
})API Documentation
MessageNexus
Constructor
new MessageNexus<InvokeMap, NotificationMap>(
driver: BaseDriver,
options?: MessageNexusOptions
)Generics:
InvokeMap: A record mapping method names to{ params: any; result: any }. Defaults toDefaultRegistry.NotificationMap: A record mapping notification method names to their parameter types. Defaults toRecord<string, any>.
Options:
| Parameter | Type | Default Value | Description |
| ------------- | ---------------------------- | -------------- | ------------------------------------------ |
| instanceId | string | auto-generated | Instance ID, used for message routing |
| timeout | number | 10000 | Request timeout (milliseconds) |
| logger | LoggerInterface | SimpleLogger | new Logger() | Logger instance or simple logger (e.g. console) |
| loggerEnabled | boolean | false | Whether to enable logging |
| logLevel | LogLevel | LogLevel.INFO | Minimum log level to report |
LogLevel: DEBUG, INFO, WARN, ERROR
SimpleLogger Interface:
interface SimpleLogger {
debug(message: string, metadata?: Record<string, unknown>): void
info(message: string, metadata?: Record<string, unknown>): void
warn(message: string, metadata?: Record<string, unknown>): void
error(message: string, metadata?: Record<string, unknown>): void
}Methods
invoke()
Send request and wait for response.
nexus.invoke<K extends keyof InvokeMap>(
methodOrOptions: K | InvokeOptions<K, InvokeMap[K]['params']>
): Promise<InvokeMap[K]['result']>Options:
| Parameter | Type | Required | Description | | ---------- | ----------------------- | -------- | ---------------------------- | | method | K | Yes | Message method | | params | InvokeMap[K]['params'] | No | Request data | | to | string | No | Target instance ID | | metadata | Record<string, unknown> | No | Metadata | | timeout | number | No | Timeout (overrides global) | | retryCount | number | No | Number of retries on failure | | retryDelay | number | No | Retry delay (milliseconds) |
Example:
// Simple request
const result = await nexus.invoke('FETCH_DATA')
// Full configuration
const result = await nexus.invoke({
method: 'FETCH_DATA',
params: { id: 123 },
to: 'target-instance',
timeout: 5000,
retryCount: 3,
retryDelay: 1000,
})notify()
Send a one-way notification (Fire-and-Forget). Does not wait for a response and does not generate an ID. Complies with JSON-RPC 2.0 Notification specification.
nexus.notify<K extends keyof NotificationMap>(
methodOrOptions: K | NotificationOptions<K, NotificationMap[K]>
): voidOptions:
| Parameter | Type | Required | Description | | --------- | ----------------------- | -------- | ------------------- | | method | K | Yes | Notification method | | params | NotificationMap[K] | No | Notification data | | to | string | No | Target instance ID | | metadata | Record<string, unknown> | No | Metadata |
Example:
// Simple notification
nexus.notify('HEARTBEAT')
// Full configuration
nexus.notify({
method: 'UPDATE_STATE',
params: { state: 'ready' },
to: 'target-instance',
})handle()
Register a request handler for a specific method. The return value (or resolved value of a returned Promise) is automatically sent back as the response.
nexus.handle<K extends keyof InvokeMap>(
method: K,
handler: InvokeHandler<InvokeMap[K]['params'], InvokeMap[K]['result']>
): () => voidParameters:
method: The method name to handle.handler: A function that receives(params, context)and returns a result or a Promise.
InvokeContext:
| Property | Type | Description |
| ----------- | ------------------------- | ----------------------------------------------- |
| messageId | string | Unique identifier for the request (JSON-RPC ID) |
| from | string | Instance ID of the sender |
| to | string | Instance ID of the receiver (your instance ID) |
| metadata | Record<string, unknown> | Custom metadata sent with the envelope |
Example:
const unsubscribe = nexus.handle('ECHO', (params, context) => {
console.log(`Received ECHO from ${context.from}`)
return { echoed: params }
})
// Unsubscribe
unsubscribe()onNotification()
Register a handler for a specific notification method (one-way messages).
nexus.onNotification<K extends keyof NotificationMap>(
method: K,
handler: NotificationHandler<NotificationMap[K]>
): () => voidExample:
const unsubscribe = nexus.onNotification('HEARTBEAT', (params, context) => {
console.log(`Heartbeat from ${context.from}`)
})
// Unsubscribe
unsubscribe()onError()
Register a global error handler for background errors (e.g., driver failures, invalid incoming messages). For request-specific errors, use try/catch with invoke().
nexus.onError(handler: (error: Error | NexusError, context?: Record<string, unknown>) => void): () => voidExample:
nexus.onError((error, context) => {
if (error instanceof NexusError) {
console.error(`Bridge error [${error.code}]: ${error.message}`, error.data)
} else {
console.error('System error:', error.message)
}
// Send to error tracking service
Sentry.captureException(error, { extra: context })
})useRequestInterceptor()
Register a hook to intercept and potentially modify outgoing messages before they are sent to the driver. Interceptors can be synchronous or asynchronous.
nexus.useRequestInterceptor(
(message: Message) => Message | Promise<Message>
): () => voidExample:
const unsubscribe = nexus.useRequestInterceptor(async (message) => {
// Inject an authentication token into the metadata
message.metadata = { ...message.metadata, token: await getAuthToken() }
return message
})useResponseInterceptor()
Register a hook to intercept and modify incoming messages before they are processed by handlers or resolve pending invokes.
nexus.useResponseInterceptor(
(message: Message) => Message | Promise<Message>
): () => voidErrors
MessageNexus provides a structured error system based on the JSON-RPC 2.0 specification.
NexusError
A custom error class that includes a numeric code and optional data. MessageNexus automatically preserves and serializes the name and stack properties of errors thrown in handlers, enabling seamless cross-context debugging.
class NexusError<D = any> extends Error {
code: number // JSON-RPC or Nexus-specific error code
data?: D // Optional additional error information
name: string // Preserved error name from original context
stack?: string // Preserved stack trace from original context
}NexusErrorCode
Common error codes exported by the library:
| Code | Name | Description |
| --- | --- | --- |
| -32700 | ParseError | Invalid JSON received by the server |
| -32600 | InvalidRequest | The JSON sent is not a valid Request object |
| -32601 | MethodNotFound | The method does not exist / is not registered |
| -32602 | InvalidParams | Invalid method parameter(s) |
| -32603 | InternalError | Internal JSON-RPC error (e.g., handler threw an exception) |
| -32001 | Timeout | Request timed out |
| -32002 | SendFailed | Failed to send message via driver |
| -32003 | InvalidResponse | Received a response that doesn't match the request |
getMetrics()
Get monitoring metrics.
nexus.getMetrics(): MetricsReturn Value:
{
messagesSent: number // Messages sent
messagesReceived: number // Messages received
messagesFailed: number // Messages failed
pendingMessages: number // Pending messages
queuedMessages: number // Queued messages
totalLatency: number // Total latency (milliseconds)
averageLatency: number // Average latency (milliseconds)
}Example:
const metrics = nexus.getMetrics()
console.log(`Avg latency: ${metrics.averageLatency}ms`)
console.log(
`Success rate: ${((metrics.messagesReceived / metrics.messagesSent) * 100).toFixed(2)}%`,
)onMetrics()
Register metrics change callback.
nexus.onMetrics(callback: MetricsCallback): () => voidExample:
const unsubscribe = nexus.onMetrics((metrics) => {
// Send to monitoring system
metricsService.report(metrics)
})flushQueue()
Flush the message queue, sending all cached messages.
nexus.flushQueue()destroy()
Destroy the instance and clean up resources.
nexus.destroy()Note: The destroy() method automatically calls the driver's destroy() method to clean up resources like event listeners. It is recommended to call this method when the component is unmounted to avoid memory leaks.
WebSocketDriver
Constructor
new WebSocketDriver(options: WebSocketDriverOptions)Options:
| Parameter | Type | Default Value | Description | | --------- | --------------------------- | ------------- | ---------------------------------- | | url | string | Required | WebSocket URL | | reconnect | boolean | ReconnectOptions | true | Whether to automatically reconnect | | logger | Logger | new Logger() | Logger instance |
ReconnectOptions:
| Parameter | Type | Default Value | Description | | ------------- | ------ | ------------- | ----------------------------- | | maxRetries | number | Infinity | Maximum retry count | | retryInterval | number | 5000 | Retry interval (milliseconds) |
Example:
const driver = new WebSocketDriver({
url: 'wss://api.example.com/ws',
reconnect: {
maxRetries: 10,
retryInterval: 3000,
},
})Methods
close()
Close connection and stop reconnection.
driver.close()PostMessageDriver
Constructor
new PostMessageDriver(targetWindow: Window, targetOrigin: string)Parameters:
| Parameter | Type | Required | Description | | ------------ | ------ | -------- | ----------------------------------------------------------------- | | targetWindow | Window | Yes | Target window object | | targetOrigin | string | Yes | Target origin address (security requirement, '*' cannot be used) |
Example:
const driver = new PostMessageDriver(window.parent, 'https://app.example.com')MittDriver
Constructor
new MittDriver(emitter: Emitter<Record<string, Message>>)Example:
import { createEmitter, MittDriver } from 'message-nexus'
// Use the factory function to create an independent emitter instance
const emitter = createEmitter()
const driver = new MittDriver(emitter)Note: It is recommended to use the createEmitter() factory function to create an independent emitter instance.
BroadcastDriver
Constructor
new BroadcastDriver(options: BroadcastDriverOptions)BroadcastDriverOptions:
| Parameter | Type | Default Value | Description | | --------- | ------ | ------------- | ---------------------- | | channel | string | Required | Broadcast channel name |
Example:
import { BroadcastDriver, MessageNexus } from 'message-nexus'
const driver = new BroadcastDriver({ channel: 'my-app-channel' })
const nexus = new MessageNexus(driver)
// Listen for messages from other tabs
nexus.handle('SOME_METHOD', (params, context) => {
console.log('Received from another tab:', params)
return { received: true }
})
// Clean up resources
nexus.destroy()Features:
- Multiple tabs under the same origin can communicate via the same channel name
- Automatically adds a protocol identifier to filter non-MessageNexus messages
- Supports dynamic channel switching
Advanced Usage / Techniques
Asynchronous Handlers
Handlers registered via handle() can be async functions or return a Promise. MessageNexus will wait for the Promise to resolve before sending the response back to the caller.
nexus.handle('FETCH_REMOTE', async (params) => {
const data = await fetch(`https://api.example.com/items/${params.id}`)
return await data.json()
})Suspending Responses (Manual Reply Simulation)
In some cases, you may need to wait for a user action (like clicking a button in the UI) before replying to a request. You can achieve this by returning a Promise and storing its resolve function.
const pendingResolvers = new Map<string, (value: any) => void>()
nexus.handle('user.confirm', (params, context) => {
return new Promise((resolve) => {
// Store the resolve function indexed by messageId
const id = context.messageId!
pendingResolvers.set(id, resolve)
// Trigger some UI to show a confirmation dialog
showDialog(params.message)
})
})
// Later, when the user clicks "Confirm"
function onUserConfirm(id: string) {
const resolve = pendingResolvers.get(id)
if (resolve) {
resolve({ confirmed: true })
pendingResolvers.delete(id)
}
}Design Highlights
1. Type Safety
MessageNexus uses TypeScript generics and Schema mapping to provide full type inference across method names, parameters, and results:
// 1. Define your protocol schemas
interface MyInvokeMap {
'getUser': { params: { id: number }; result: { name: string; age: number } };
'calculate': { params: { a: number; b: number }; result: number };
}
interface MyNotificationMap {
'onLog': { message: string; level: 'info' | 'warn' | 'error' };
}
// 2. Initialize with your schemas
const nexus = new MessageNexus<MyInvokeMap, MyNotificationMap>(driver)
// 3. Enjoy full type inference and autocompletion
const response = await nexus.invoke('getUser', { id: 123 })
// response Type: { name: string; age: number }
nexus.notify('onLog', { message: 'Ready', level: 'info' })
// 4. Type-safe handlers
nexus.handle('calculate', (params) => {
// params Type: { a: number; b: number }
return params.a + params.b // result Type must be number
})2. Memory Safety
- Auto Cleanup: Regularly clean up expired message records
- Manual Cleanup: Internal request records are deleted immediately after the response is received or timed out
- Auto-Reply: Handlers automatically send responses when the return value is resolved, ensuring no orphaned requests
- Resource Release: The
destroy()method cleans up all timers and event listeners - Queue Limits: The message queue has a maximum size limit to prevent infinite growth
- Driver Lifecycle: Each driver implements the
destroy()method to correctly release resources - Emitter Isolation: Recommended to use
createEmitter()to create independent instances, avoiding memory leaks caused by shared singletons
3. Error Recovery & Handling
- Auto Reconnect: WebSocket automatic reconnection mechanism with exponential backoff strategy
- Request Retry: Automatic retry on request failure, configurable retry counts and delays
- Message Queue: Offline message caching, automatically sent after connection recovery
- Structured Error Handling: Dedicated
NexusErrorclass and standard codes (JSON-RPC 2.0 compatible) for precise diagnostic and fault recovery - Unified Error Callback: Global
onErrorlistener for non-request background errors
4. Security Hardening
- PostMessage: Prohibit using
'*'as targetOrigin; the origin address must be explicitly specified - BroadcastChannel: Use the protocol identifier
__messageBridgeto distinguish MessageNexus messages from user-defined messages - Message Validation: Runtime validation of message format to prevent crashes from illegal messages
- Source Filtering: Automatically filters non-target messages
5. Observability
Built-in monitoring metrics for easy production environment monitoring:
const metrics = nexus.getMetrics()
console.log(`Messages: ${metrics.messagesSent} sent, ${metrics.messagesReceived} received`)
console.log(
`Success rate: ${((metrics.messagesReceived / metrics.messagesSent) * 100).toFixed(2)}%`,
)
console.log(`Avg latency: ${metrics.averageLatency}ms`)
console.log(`Pending: ${metrics.pendingMessages}, Queued: ${metrics.queuedMessages}`)License
MIT
