npm package discovery and stats viewer.

Discover Tips

  • General search

    [free text search, go nuts!]

  • Package details

    pkg:[package-name]

  • User packages

    @[username]

Sponsor

Optimize Toolset

I’ve always been into building performant and accessible sites, but lately I’ve been taking it extremely seriously. So much so that I’ve been building a tool to help me optimize and monitor the sites that I build to make sure that I’m making an attempt to offer the best experience to those who visit them. If you’re into performant, accessible and SEO friendly sites, you might like it too! You can check it out at Optimize Toolset.

About

Hi, 👋, I’m Ryan Hefner  and I built this site for me, and you! The goal of this site was to provide an easy way for me to check the stats on my npm packages, both for prioritizing issues and updates, and to give me a little kick in the pants to keep up on stuff.

As I was building it, I realized that I was actually using the tool to build the tool, and figured I might as well put this out there and hopefully others will find it to be a fast and useful way to search and browse npm packages as I have.

If you’re interested in other things I’m working on, follow me on Twitter or check out the open source projects I’ve been publishing on GitHub.

I am also working on a Twitter bot for this site to tweet the most popular, newest, random packages from npm. Please follow that account now and it will start sending out packages soon–ish.

Open Software & Tools

This site wouldn’t be possible without the immense generosity and tireless efforts from the people who make contributions to the world and share their work via open source initiatives. Thank you 🙏

© 2025 – Pkg Stats / Ryan Hefner

bandeng-nats

v1.1.0

Published

A powerful NATS client wrapper for real-time messaging with advanced routing capabilities, built-in message headers support, and comprehensive middleware system.

Readme

Bandeng NATS

English | 中文

English

A powerful NATS client wrapper for real-time messaging with advanced routing capabilities, built-in message headers support, and comprehensive middleware system.

Features

  • 🚀 Real-time Messaging: Full support for publish/subscribe and request/response patterns
  • 🛣️ Advanced Routing: Express/Koa-style route registration with global middleware support
  • 🔄 Auto Reconnection: Configurable reconnection strategy with unlimited attempts support
  • 🔁 Retry Mechanism: Linear/exponential backoff retry strategies with DoS protection (publish only)
  • 📊 Message Headers: Built-in NATS headers support with automatic request tracking
  • 🎯 Dual Mode Support: Client and server modes with automatic route registration
  • 🔧 Flexible Configuration: Rich configuration options with comprehensive validation
  • 📨 ACK Mechanism: Reliable message delivery with acknowledgment support
  • ⚖️ Load Balancing: Queue groups and message distribution modes
  • 🆔 Unique ID Generation: Multi-process safe unique message ID generation
  • 🌐 PM2 Integration: Native PM2 environment detection and support
  • 🔄 Message Serialization/Deserialization: Automatic data conversion
  • 📏 Message Size: Controlled by NATS server configuration (no client-side limits)
  • 🛡️ Security Protection: Built-in DoS protection with retry rate limiting
  • 🏥 Health Checks: Comprehensive connection status and subscription monitoring
  • 🚨 Error Handling: Simplified error handling with friendly messages, no large error stacks exposed

Installation

npm install bandeng-nats

Development and Testing

For development and running tests:

# Run tests (requires test dependencies)
./run-tests.sh    # Linux/Mac
run-tests.bat     # Windows

# Or manually:
cd tests
npm install
npm test

Test suite is located in the tests/ directory and is excluded from npm package distribution.

Import Methods

The library supports two import methods:

// Method 1: Direct import (Recommended - preserves JSDoc comments)
const NatsClient = require('bandeng-nats')
const client = new NatsClient({...})

// Method 2: Destructuring import (Backward compatible)
const {nats_realtime_client} = require('bandeng-nats')
const client = new nats_realtime_client({...})

Note: Method 1 preserves all JSDoc comments and provides better IntelliSense support in IDEs.

Requirements

  • Node.js >= 12.0.0
  • NATS Server

Quick Start

1. Create Client

// Method 1: Direct import of main class (recommended, preserves full JSDoc comments)
const NatsClient = require('bandeng-nats')
const client = new NatsClient({
    servers: ['nats://localhost:4222'],
    clientType: 'client',
    requestOpts: {
        timeout: 15000,
        caller: 'my_app'
    },
    reconnect: {
        enabled: true,
        maxReconnectAttempts: 10,
        reconnectTimeWait: 2000
    }
})

// Method 2: Destructuring import (backward compatible)
const {nats_realtime_client} = require('bandeng-nats')
const client = new nats_realtime_client({
    servers: ['nats://localhost:4222'],
    clientType: 'client',
    requestOpts: {
        timeout: 15000,
        caller: 'my_app'
    },
    reconnect: {
        enabled: true,
        maxReconnectAttempts: 10,
        reconnectTimeWait: 2000
    }
})

// Listen for connection events
client.on('connected', () => {
    console.log('NATS client connected')
})

client.on('error', err => {
    console.error('NATS client error:', err)
})

2. Send Request

// Send request and wait for response
client
    .request('api.test.Add', {a: 1, b: 2})
    .then(res => {
        console.log('Response:', res)
    })
    .catch(err => {
        console.error('Error:', err)
    })

// Send request with correlation tracking
client
    .request(
        'api.test.Add',
        {a: 1, b: 2},
        {
            rid: '123456',
            rcid: 'req-123',
            pcid: 'parent-456',
            cid: 'corr-789',
            clevel: '1',
            lang: 'EN'
        }
    )
    .then(res => {
        console.log('Response with correlation:', res)
    })
    .catch(err => {
        console.error('Error:', err)
    })

// Send request with custom options (all values must be strings)
client
    .request(
        'api.test.Add',
        {a: 1, b: 2},
        {
            timeout: '10000',
            rid: 'custom-123',
            customHeader: 'customValue',
            userId: 'user123',
            sessionId: 'session456'
        }
    )
    .then(res => {
        console.log('Response with custom options:', res)
    })
    .catch(err => {
        console.error('Error:', err)
    })

3. Create Server

// Method 1: Direct import of main class (recommended)
const NatsClient = require('bandeng-nats')

// Set global prefix and middleware
NatsClient.SetGlobalPrefix('api')

NatsClient.UseGlobalMiddlewares(async (req, res, next) => {
    console.log('Global middleware executed')
    await next()
})

// Create router
const router = new NatsClient.Router()

// Register service
router.get('test.Add', (req, res, next) => {
    const {a, b} = req.body
    res.json({
        status: 'success',
        result: a + b
    })
})

// Create server instance
const server = new NatsClient({
    servers: ['nats://localhost:4222'],
    clientType: 'server',
    frameworkStyle: 'express'
})

// Method 2: Destructuring import (backward compatible)
const {nats_realtime_client, nats_router} = require('bandeng-nats')
// You can also use nats_realtime_client.SetGlobalPrefix and nats_realtime_client.UseGlobalMiddlewares

Configuration Options

Basic Configuration

| Parameter | Type | Default | Description | | ---------------- | ---------------------- | --------------------------- | ----------------------------- | | servers | string[] | ['nats://localhost:4222'] | NATS server address list | | clientType | 'client' \| 'server' | 'client' | Instance type | | frameworkStyle | 'express' \| 'koa' | 'express' | Framework style (server only) | | messageMode | 'ALL' \| 'ONCE' | 'ONCE' | Message processing mode |

Request Options (Global)

| Parameter | Type | Default | Description | | --------------------- | -------- | ------- | -------------------- | | requestOpts.timeout | number | 15000 | Request timeout (ms) | | requestOpts.caller | string | '' | Caller information |

Note: All timeout values are validated to be between 1ms and 300000ms (5 minutes).

Reconnection Configuration

| Parameter | Type | Default | Description | | -------------------------------- | --------- | ------- | ----------------------------------------------- | | reconnect.enabled | boolean | false | Enable reconnection | | reconnect.maxReconnectAttempts | number | 300 | Maximum reconnection attempts, -1 for unlimited | | reconnect.reconnectTimeWait | number | 2000 | Reconnection wait time (ms) |

Retry Configuration

Important: Retry functionality is only available for publish() method. The request() method does not support retry by design, as request-response patterns require immediate response handling.

| Parameter | Type | Default | Description | | ------------------ | --------------------------- | ---------- | --------------------------- | | retry.enabled | boolean | false | Enable retry (publish only) | | retry.maxRetries | number | 3 | Maximum retry attempts | | retry.retryDelay | number | 1000 | Retry delay time (ms) | | retry.backoff | 'linear' \| 'exponential' | 'linear' | Retry backoff strategy |

Security Note: Built-in DoS protection limits retry frequency to 100 retries per minute globally and 50 retries per subject to prevent abuse. This protection only applies to publish operations with retry enabled.

Connection Configuration

| Parameter | Type | Default | Description | | --------------------------------- | --------- | -------- | --------------------------------------------- | | connection.timeout | number | 15000 | Connection timeout (ms) | | connection.pingInterval | number | 120000 | PING interval (ms) | | connection.maxPingOut | number | 2 | Maximum PING timeout count | | connection.maxOutstandingPings | number | 1 | Maximum outstanding PING count | | connection.waitOnFirstConnect | boolean | false | Wait on first connection | | connection.ignoreClusterUpdates | boolean | false | Ignore cluster updates | | connection.inboxPrefix | string | '' | Temporary subscription prefix | | connection.noEcho | boolean | true | Disable echo messages (performance optimized) | | connection.noRandomize | boolean | false | Disable server randomization | | connection.noResolve | boolean | false | Disable DNS resolution | | connection.noUseOldRequestStyle | boolean | false | Disable old request style |

Validation: Connection timeout must be between 1ms and 300000ms (5 minutes). PING interval must be between 1000ms and 3600000ms (1 hour). maxOutstandingPings must be between 1 and 10.

Core Concepts

Message Modes

  • ONCE Mode: Uses queue groups, messages processed by only one process (load balancing mode)
  • ALL Mode: No queue groups, all processes receive messages (broadcast mode)

Echo Mechanism

NATS supports message echoing, which determines whether a client receives its own published messages:

With Echo Enabled (noEcho: false):

  • Client receives messages it publishes to subscribed subjects
  • Useful for debugging and self-monitoring
  • Higher network overhead due to duplicate message delivery

With Echo Disabled (noEcho: true) - Default:

  • Client does NOT receive its own published messages
  • Reduces network traffic and processing overhead
  • Recommended for production environments
  • Prevents potential infinite loops in message processing
  • Better performance and resource utilization
// Production ready (uses default noEcho: true)
const client = new NatsRealtimeClient({
    // noEcho defaults to true for better performance
})

// Explicitly enable echo for debugging
const debugClient = new NatsRealtimeClient({
    connection: {
        noEcho: false // Enable echo for debugging purposes
    }
})

Message Size

The client does not enforce message size limits. Message size is controlled by the NATS server configuration (default 1MB, configurable via --max_payload).

Note: If your NATS server is configured with a larger payload limit (e.g., 10MB), you can send messages up to that limit without client-side validation.

DoS Protection

Built-in protection against DoS attacks through intelligent retry rate limiting (applies only to publish operations with retry enabled):

const client = new NatsRealtimeClient({
    retry: {
        enabled: true, // Only affects publish() method
        maxRetries: 3,
        retryDelay: 1000,
        backoff: 'exponential'
    }
})

// Automatic protection kicks in for publish operations:
// - Global limit: 100 retries per minute
// - Per-subject limit: 50 retries per minute
// - Automatic cleanup of expired retry records

// When limits are exceeded:
client.on('error', error => {
    if (error.context === 'retry_protection') {
        console.log('Publish retry blocked due to rate limiting')
    }
})

Protection Features:

  • Rate Limiting: Prevents excessive retry attempts on publish operations
  • Automatic Cleanup: Time-window based record management
  • Graceful Degradation: Blocked retries are logged but don't crash the application
  • Per-Subject Limits: Individual topic protection for publish operations
  • Request Isolation: Request operations are not affected by retry rate limiting

Queue Groups

Queue groups are used to implement load balancing, ensuring messages are processed by only one subscriber:

// Method 1: Direct import of main class (recommended)
const NatsClient = require('bandeng-nats')
const server = new NatsClient({
    clientType: 'server',
    messageMode: 'ONCE' // Use queue group
    // Queue group name defaults to global prefix
})

// Method 2: Destructuring import (backward compatible)
const {nats_realtime_client} = require('bandeng-nats')
const server = new nats_realtime_client({
    clientType: 'server',
    messageMode: 'ONCE' // Use queue group
    // Queue group name defaults to global prefix
})

Message Headers

Built-in support for NATS message headers with flexible metadata passing mechanism:

Automatically Added Headers

Each request automatically includes the following headers:

  • rid: Request ID (auto-generated or user-specified)
  • caller: Source client information
  • timestamp: Request timestamp
Custom Headers

Now supports passing arbitrary custom headers, all values must be strings:

// Pass custom headers
const response = await client.request(
    'api.test.Add',
    {a: 1, b: 2},
    {
        rid: 'custom-123',
        userId: 'user123',
        sessionId: 'session456',
        customHeader: 'customValue',
        timeout: '10000'
    }
)

// All headers are available in server handlers
router.get('test.Add', (req, res, next) => {
    console.log('Request ID:', req.headers.rid)
    console.log('User ID:', req.headers.userId)
    console.log('Session ID:', req.headers.sessionId)
    console.log('Custom Header:', req.headers.customHeader)
    console.log('Timeout Setting:', req.headers.timeout)
    console.log('Source Client:', req.headers.caller)
    console.log('Request Time:', req.headers.timestamp)
    // ... handle request
})
Header Mechanism Details
  • Flexibility: Can pass arbitrary key-value pairs, no longer limited to predefined parameter lists
  • Type Requirements: All values must be strings, non-string values are automatically ignored
  • Auto-merge: Custom headers are merged with system auto-added headers
  • Complete Transfer: All header information is completely passed to the server for tracking and debugging
  • Efficient Processing: No unnecessary type conversions; the system directly uses string values and filters out non-string ones

ACK Mechanism

Support for message acknowledgment mechanism to ensure reliable message delivery:

// Publish message with ACK
await client.publish(
    'api.notify',
    {message: 'Hello'},
    {
        ack: true,
        ackTimeout: 5000,
        maxRetries: 3
    }
)

Global Middleware

Support for global middleware that applies to all routes:

// Method 1: Direct import of main class (recommended)
const NatsClient = require('bandeng-nats')

// Must be called before any route registration
NatsClient.UseGlobalMiddlewares(
    async (req, res, next) => {
        console.log('Global middleware 1')
        await next()
    },
    async (req, res, next) => {
        console.log('Global middleware 2')
        await next()
    }
)

// Method 2: Destructuring import (backward compatible)
const {nats_router} = require('bandeng-nats')
// You can also use nats_router.UseGlobalMiddlewares

API Documentation

NatsRealtimeClient

Constructor
new NatsRealtimeClient(options)
Events
  • connected: Triggered when connection is successful
  • error: Triggered when an error occurs
  • close: Triggered when connection is closed
Methods
request(subject, data, options)

Send request and wait for response.

Important: Request operations do not support retry functionality by design. Unlike publish operations which can be retried in case of failure, request-response patterns require immediate response handling and cannot be retried automatically.

Note: The options parameter is flexible and accepts any key-value pairs where all values must be strings. This allows you to pass custom metadata, headers, or configuration options that will be included in the NATS message headers. The system efficiently processes these options by directly using string values and automatically filtering out non-string ones.

const response = await client.request('api.test.Add', {a: 1, b: 2})

Parameters:

  • subject (string): Request subject
  • data (any): Request data
  • options (object): Request options
    • timeout (number): Request timeout (ms), default 15000
    • rid (string): Request ID, optional (auto-generated if not provided)
    • rcid (string): Request correlation ID, optional
    • pcid (string): Parent correlation ID, optional
    • cid (string): Correlation ID, optional
    • clevel (string): Current level, optional
    • lang (string): Language, optional ('ZH' or 'EN'), default 'ZH'
    • Other custom headers: All additional properties will be converted to strings and included as headers
publish(subject, data, options)

Publish message with optional retry support (message queue mode, no message response).

Retry Support: This method supports automatic retry with configurable backoff strategies when ack: true is enabled. Retry configuration is controlled by the global retry options in the client constructor.

// No ACK mode (no retry)
await client.publish('api.test.notify', {message: 'Hello'})

// ACK mode with retry support
await client.publish(
    'api.test.notify',
    {message: 'Hello'},
    {
        ack: true, // Enable ACK and retry
        ackTimeout: 5000, // ACK timeout before retry
        maxRetries: 3 // Override global retry limit for this publish
    }
)

Parameters:

  • subject (string): Message subject
  • data (any): Message data
  • options (object): Publish options
    • ack (boolean): Whether acknowledgment is required, default false. When enabled, triggers retry mechanism on failure.
    • ackTimeout (number): ACK timeout (ms), default 5000. Time to wait before considering ACK failed and triggering retry.
    • maxRetries (number): Maximum retry attempts for this specific publish operation, overrides global retry settings.
subscribe(subject, callback, options)

Subscribe to subject.

await client.subscribe('api.test.notify', (message, msg) => {
    console.log('Received:', message)
})

Parameters:

  • subject (string): Message subject
  • callback (function): Message handler function, receives (message, msg)
  • options (object): Subscribe options
    • ack (boolean): Whether to send ACK confirmation message, default false
unsubscribe(subject)

Unsubscribe from subject.

await client.unsubscribe('api.test.notify')
close()

Close connection.

await client.close()
healthCheck()

Perform comprehensive health check and get detailed connection status.

const health = client.healthCheck()
console.log(health)
// Returns detailed health information:
// {
//   status: 'healthy' | 'unhealthy',
//   timestamp: '2024-01-01T12:00:00.000Z',
//   connection: {
//     connected: true,
//     clientType: 'client' | 'server',
//     servers: ['nats://localhost:4222']
//   },
//   subscriptions: {
//     total: 5,      // Total subscriptions
//     routes: 3      // Route subscriptions (server mode only)
//   },
//   reason?: 'Not connected to NATS server'  // Only present when unhealthy
// }

Router

Constructor
new Router()
Methods
get(servicePath, ...handlers)

Register GET type service.

router.get('api.user.get', (req, res, next) => {
    res.json({user: req.body})
})
set(servicePath, ...handlers)

Register SET type service.

router.set('api.user.update', (req, res, next) => {
    const {id, name} = req.body
    res.json({success: true})
})
use(prefix)

Set route prefix.

// Set prefix
router.use('api')
router.get('user.get', handler) // Becomes 'api.user.get'
getRoutes()

Get all routes.

const routes = router.getRoutes()
cleanup(servicePath)

Clean up routes.

// Clean up specific route
router.cleanup('api.user.get')

// Clean up all routes
router.cleanup()

Global Functions

SetGlobalPrefix(prefix)

Set global prefix for all routes.

// Method 1: Direct import of main class (recommended)
const NatsClient = require('bandeng-nats')

// Must be called before any route registration
NatsClient.SetGlobalPrefix('api')

// Method 2: Destructuring import (backward compatible)
const {nats_router} = require('bandeng-nats')
// You can also use nats_router.SetGlobalPrefix
UseGlobalMiddlewares(...middlewares)

Register global middleware for all routes.

// Method 1: Direct import of main class (recommended)
const NatsClient = require('bandeng-nats')

// Must be called before any route registration
NatsClient.UseGlobalMiddlewares(async (req, res, next) => {
    console.log('Global middleware')
    await next()
})

// Method 2: Destructuring import (backward compatible)
const {nats_router} = require('bandeng-nats')
// You can also use nats_router.UseGlobalMiddlewares

Routing System

Route Registration

// Method 1: Direct import of main class (recommended)
const NatsClient = require('bandeng-nats')
const router = new NatsClient.Router()

// Method 2: Destructuring import (backward compatible)
const {Router} = require('bandeng-nats')
const router = new Router()

// Set route prefix
router.use('user')

// Register services
router.get('Add', (req, res, next) => {
    const {id} = req.body
    res.json({user: {id, name: 'John'}})
})

router.set('Update', (req, res, next) => {
    const {id, name} = req.body
    res.json({success: true})
})

Middleware Support

// Authentication middleware
const authMiddleware = (req, res, next) => {
    if (!req.body.token) {
        return res.status(401).json({error: 'Unauthorized'})
    }
    next()
}

// Logging middleware
const logMiddleware = (req, res, next) => {
    console.log(`${Date.now()} - ${req.servicePath}`)
    next()
}

// Register service with middleware
router.get('Add', authMiddleware, logMiddleware, (req, res, next) => {
    res.json({user: req.body})
})

Global Prefix

// Method 1: Direct import of main class (recommended)
const NatsClient = require('bandeng-nats')

// Set global prefix
NatsClient.SetGlobalPrefix('api')

// Routes will automatically add prefix: api.user.get
router.get('user.get', handler)

// Method 2: Destructuring import (backward compatible)
// You can also use require('bandeng-nats').SetGlobalPrefix

Framework Styles

Support for Express and Koa framework styles:

// Method 1: Direct import of main class (recommended)
const NatsClient = require('bandeng-nats')

// Express style
const server = new NatsClient({
    clientType: 'server',
    frameworkStyle: 'express'
})

// Koa style
const server = new NatsClient({
    clientType: 'server',
    frameworkStyle: 'koa'
})

// Method 2: Destructuring import (backward compatible)
const {nats_realtime_client} = require('bandeng-nats')

// Express style
const server = new nats_realtime_client({
    clientType: 'server',
    frameworkStyle: 'express'
})

// Koa style
const server = new nats_realtime_client({
    clientType: 'server',
    frameworkStyle: 'koa'
})

Examples

Complete Example

Server (test_server.js)
// Method 1: Direct import of main class (recommended)
const NatsClient = require('./index.js')

// Load routes
require('./routes/index.js')

const server = new NatsClient({
    servers: ['nats://localhost:4222'],
    clientType: 'server',
    frameworkStyle: 'express',
    messageMode: 'ONCE', // Use queue group for load balancing
    reconnect: {
        enabled: true,
        maxReconnectAttempts: 10,
        reconnectTimeWait: 2000
    }
})

server.on('connected', () => {
    console.log('NATS Server started')
})

// Method 2: Destructuring import (backward compatible)
const {nats_realtime_client, nats_router} = require('./index.js')
Client (test_client.js)
// Method 1: Direct import of main class (recommended)
const NatsClient = require('./index.js')

const client = new NatsClient({
    servers: ['nats://localhost:4222'],
    clientType: 'client',
    reconnect: {
        enabled: true,
        maxReconnectAttempts: 10,
        reconnectTimeWait: 2000
    }
})

// Method 2: Destructuring import (backward compatible)
const {nats_realtime_client} = require('./index.js')

const client = new nats_realtime_client({
    servers: ['nats://localhost:4222'],
    clientType: 'client',
    reconnect: {
        enabled: true,
        maxReconnectAttempts: 10,
        reconnectTimeWait: 2000
    }
})

client.on('connected', () => {
    console.log('NATS client connected')

    // Send request
    client
        .request('api.test.Add', {a: 1, b: 2})
        .then(res => {
            console.log('Response:', res)
        })
        .catch(err => {
            console.error('Error:', err)
        })

    // Send request with correlation tracking
    client
        .request(
            'api.test.Add',
            {a: 1, b: 2},
            {
                rid: '123456',
                rcid: 'req-123',
                pcid: 'parent-456',
                cid: 'corr-789',
                clevel: '1',
                lang: 'EN'
            }
        )
        .then(res => {
            console.log('Response with correlation:', res)
        })
        .catch(err => {
            console.error('Error:', err)
        })

    // Send request with custom options (all values must be strings)
    client
        .request(
            'api.test.Add',
            {a: 1, b: 2},
            {
                timeout: '10000',
                rid: 'custom-123',
                customHeader: 'customValue',
                userId: 'user123',
                sessionId: 'session456'
            }
        )
        .then(res => {
            console.log('Response with custom options:', res)
        })
        .catch(err => {
            console.error('Error:', err)
        })
})
Route Definition (routes/test.js)
const {Router} = require('../nats_router.js')

const TestRouter = new Router()

TestRouter.use('test')

TestRouter.get('Add', (req, res, next) => {
    const {a, b} = req.body
    res.json({
        status: 'success',
        result: a + b
    })
})

TestRouter.get('Sub', (req, res, next) => {
    const {a, b} = req.body
    res.json({
        status: 'success',
        result: a - b
    })
})

module.exports = TestRouter
Global Configuration (routes/index.js)
const {SetGlobalPrefix, UseGlobalMiddlewares} = require('../nats_router.js')

// Set global prefix
SetGlobalPrefix('api')

// Register global middleware
UseGlobalMiddlewares(async (req, res, next) => {
    console.log('Global middleware is working.')
    await next()
})

module.exports = class Routes {
    static test = require('./test.js')
}

Error Handling

The library provides simplified and user-friendly error handling, automatically wrapping NATS errors to avoid exposing large error stacks and nested error objects to application code.

Error Simplification

All NATS errors are automatically simplified:

  • 503 No Responders: Converted to friendly message "no responders"
  • 400 Bad Request: Uses description from API error or default message "bad request"
  • 409 Conflict: Uses description from API error or default message "conflict"
  • 408 Timeout: Friendly timeout message "request timeout"
  • 404 Not Found: Friendly not found message "not found"
  • Other errors: Simplified with message length limits (max 200 characters), default "request failed"

Features:

  • No code field exposed in error objects
  • No nested error objects (chainedError, api_error, etc.)
  • No stack traces exposed to application code
  • Only essential context preserved (context, subject, timestamp)

Connection Errors

client.on('error', err => {
    console.error('Connection error:', err)
    // Error object is simplified, no large stack traces
})

Request Errors

client
    .request('api.test.Add', {a: 1, b: 2})
    .then(res => {
        console.log('Success:', res)
    })
    .catch(err => {
        console.error('Request failed:', err.message)
        // Example output for 503 error: "no responders"
        // Example output for timeout: "request timeout"
        // Error object is simplified, no nested properties
    })

Error Object Structure

try {
    await client.request('api.nonexistent', {})
} catch (error) {
    console.log(error.message) // "no responders"
    console.log(error.context) // "request:api.nonexistent"
    console.log(error.timestamp) // "2025-01-27T07:11:50.909Z"
    console.log(error.subject) // "api.nonexistent" (if available)
    // No code, no stack, no nested errors
}

Retry Mechanism

Not effective in request mode

const client = new nats_realtime_client({
    retry: {
        enabled: true,
        maxRetries: 3,
        retryDelay: 1000,
        backoff: 'exponential'
    }
})

Configuration Validation

Automatically validates configuration parameters to ensure correctness.

Validation Rules

  • servers: Must be a non-empty array
  • messageMode: Must be 'ONCE' or 'ALL'
  • reconnect.maxReconnectAttempts: Must be a number >= -1
  • reconnect.reconnectTimeWait: Must be a positive number
  • retry.maxRetries: Must be a non-negative number
  • retry.retryDelay: Must be a positive number
  • retry.backoff: Must be 'linear' or 'exponential'
  • connection.timeout: Must be a positive integer

Error Handling

Configuration validation failures throw detailed error messages:

try {
    const client = new nats_realtime_client({
        servers: [], // Error: empty array
        retry: {
            enabled: true,
            backoff: 'invalid' // Error: invalid backoff strategy
        }
    })
} catch (error) {
    console.error('Configuration error:', error.message)
    // Output: Configuration error: servers must be a non-empty array, backoff must be "linear" or "exponential"
}

Important Notes

  1. Global Prefix: Server mode must set global prefix, otherwise throws error
  2. Route Registration: Routes must be registered before creating server instance
  3. Global Middleware: Global middleware must be registered before any route registration
  4. Connection Management: Client automatically connects, no need to manually call connect method
  5. Resource Cleanup: Should call close() method to clean up resources after use
  6. Error Listening: Always listen for error events to handle connection and message errors
  7. Queue Groups: Use queue groups for load balancing in multi-instance deployments
  8. PM2 Support: Native PM2 environment detection for unique ID generation
  9. Message Headers: Automatic request tracking with unique IDs and timestamps
  10. Request Options: Use requestOpts for request-specific configuration
  11. Message Size: Message size is controlled by NATS server configuration (default 1MB, configurable via --max_payload)

Best Practices

  1. Connection Management: Always listen for connection events and handle errors
  2. Route Organization: Use modular route files to organize code
  3. Error Handling: Add error handling for all asynchronous operations
  4. Reconnection Configuration: Enable reconnection mechanism in production environment
  5. Health Monitoring: Regularly check connection status and subscription counts
  6. Load Balancing: Use queue groups to implement multi-instance load balancing
  7. Message Acknowledgment: Use ACK mechanism for important messages to ensure reliability
  8. Global Middleware: Use global middleware for cross-cutting concerns like logging and authentication
  9. Request Tracking: Use requestOpts for better request tracking and debugging
  10. PM2 Deployment: Leverage PM2 integration for production deployments
  11. Correlation Tracking: Use correlation IDs (rcid, pcid, cid) for distributed tracing
  12. Language Support: Set appropriate language headers for international applications

License

ISC License

Author

lizhi

Changelog

v1.1.0

  • Error Handling Enhancement: Simplified error handling with friendly error messages, no large error stacks exposed
    • 503 No Responders: Automatically converted to friendly message "no responders"
    • Error Simplification: Remove code field, nested error objects (chainedError, api_error, etc.), and stack traces
    • Friendly Messages: Provide user-friendly error messages for common error codes (400, 404, 408, 409, 503)
    • Message Length Limit: Error messages limited to 200 characters to prevent information overload
    • Reference NATS Library: Error handling approach based on NATS library's consumer.js implementation
  • Message Size: Removed client-side message size validation (controlled by NATS server configuration)
    • Server Configuration: Message size now fully controlled by NATS server --max_payload parameter
    • No Client Limits: Client no longer enforces 1MB limit, allowing larger messages when server supports it
  • Node.js Compatibility: Ensured compatibility with Node.js 12+ (removed optional chaining operator)

Chinese

一个功能强大的 NATS 客户端封装库,提供实时消息传递、高级路由能力、内置消息头支持和完整的中间件系统。

特性

  • 🚀 实时消息处理: 完整的发布/订阅和请求/响应模式支持
  • 🛣️ 高级路由系统: Express/Koa 风格的路由注册,支持全局中间件
  • 🔄 自动重连: 可配置的重连策略,支持无限重连
  • 🔁 重试机制: 线性/指数退避重试策略,带 DoS 保护(仅适用于 publish 方法)
  • 📊 消息头支持: 内置 NATS 消息头支持,自动请求追踪
  • 🎯 双模式支持: 客户端和服务端模式,自动路由注册
  • 🔧 灵活配置: 丰富的配置选项,全面的参数验证
  • 📨 ACK 机制: 可靠的消息传递,支持确认机制
  • ⚖️ 负载均衡: 队列组和消息分发模式
  • 🆔 唯一 ID 生成: 多进程安全的唯一消息 ID 生成
  • 🌐 PM2 集成: 原生 PM2 环境检测和支持
  • 🔄 消息序列化/反序列化: 自动数据转换
  • 📏 消息大小: 由 NATS 服务器配置控制(客户端无限制)
  • 🛡️ 安全保护: 内置 DoS 防护,重试频率限制
  • 🏥 健康检查: 全面连接状态和订阅监控
  • 🚨 错误处理: 精简的错误处理,提供友好的错误消息,不暴露庞大的错误栈
  • 🔍 配置验证: 严格的输入选项检查和边界值验证

安装

npm install bandeng-nats

导入方式

库支持两种导入方式:

// 方式1:直接导入(推荐 - 保持完整注释)
const NatsClient = require('bandeng-nats')
const client = new NatsClient({...})

// 方式2:解构导入(向后兼容)
const {nats_realtime_client} = require('bandeng-nats')
const client = new nats_realtime_client({...})

注意:方式 1 保持所有 JSDoc 注释,在 IDE 中提供更好的智能提示支持。

依赖

  • Node.js >= 12.0.0
  • NATS 服务器

快速开始

1. 创建客户端

// 方式1:直接导入主类(推荐,保持完整注释)
const NatsClient = require('bandeng-nats')
const client = new NatsClient({
    servers: ['nats://localhost:4222'],
    clientType: 'client',
    requestOpts: {
        timeout: 15000,
        caller: 'my_app'
    },
    reconnect: {
        enabled: true,
        maxReconnectAttempts: 10,
        reconnectTimeWait: 2000
    }
})

// 方式2:解构导入(向后兼容)
const {nats_realtime_client} = require('bandeng-nats')
const client = new nats_realtime_client({
    servers: ['nats://localhost:4222'],
    clientType: 'client',
    requestOpts: {
        timeout: 15000,
        caller: 'my_app'
    },
    reconnect: {
        enabled: true,
        maxReconnectAttempts: 10,
        reconnectTimeWait: 2000
    }
})

// 监听连接事件
client.on('connected', () => {
    console.log('NATS 客户端已连接')
})

client.on('error', err => {
    console.error('NATS 客户端错误:', err)
})

2. 发送请求

// 发送请求并等待响应
client
    .request('api.test.Add', {a: 1, b: 2})
    .then(res => {
        console.log('响应:', res)
    })
    .catch(err => {
        console.error('错误:', err)
    })

// 发送带关联追踪的请求
client
    .request(
        'api.test.Add',
        {a: 1, b: 2},
        {
            rid: '123456',
            rcid: 'req-123',
            pcid: 'parent-456',
            cid: 'corr-789',
            clevel: '1',
            lang: 'EN'
        }
    )
    .then(res => {
        console.log('带关联的响应:', res)
    })
    .catch(err => {
        console.error('错误:', err)
    })

3. 创建服务端

const {nats_realtime_client, nats_router} = require('bandeng-nats')

// 设置全局前缀和中间件
const {SetGlobalPrefix, UseGlobalMiddlewares} = nats_router

SetGlobalPrefix('api')

UseGlobalMiddlewares(async (req, res, next) => {
    console.log('全局中间件执行')
    await next()
})

// 创建路由
const router = new NatsClient.Router()

// 注册服务
router.get('test.Add', (req, res, next) => {
    const {a, b} = req.body
    res.json({
        status: 'success',
        result: a + b
    })
})

// 创建服务端实例
const server = new NatsClient({
    servers: ['nats://localhost:4222'],
    clientType: 'server',
    frameworkStyle: 'express'
})

配置选项

基础配置

| 参数 | 类型 | 默认值 | 说明 | | ---------------- | --------------------------------------------- | --------------------------- | -------------------- | | servers | string[] | ['nats://localhost:4222'] | NATS 服务器地址列表 | | clientType | 'client'(客户端)\| 'server'(服务端) | 'client' | 实例类型 | | frameworkStyle | 'express'(Express风格)\| 'koa'(Koa风格) | 'express' | 框架风格(仅服务端) | | messageMode | 'ALL'(全部)\| 'ONCE'(单次) | 'ONCE' | 消息处理模式 |

请求选项(全局)

| 参数 | 类型 | 默认值 | 说明 | | --------------------- | -------- | ------- | -------------------- | | requestOpts.timeout | number | 15000 | 请求超时时间(毫秒) | | requestOpts.caller | string | '' | 调用者信息 |

重连配置

| 参数 | 类型 | 默认值 | 说明 | | -------------------------------- | --------- | ------- | ----------------------------- | | reconnect.enabled | boolean | false | 是否启用重连 | | reconnect.maxReconnectAttempts | number | 300 | 最大重连次数,-1 表示无限重连 | | reconnect.reconnectTimeWait | number | 2000 | 重连等待时间(毫秒) |

重试配置

重要说明:重试功能仅适用于 publish() 方法。request() 方法不支持重试功能,这是设计上的选择,因为请求-响应模式需要立即响应处理。

| 参数 | 类型 | 默认值 | 说明 | | ------------------ | ------------------------------------------ | ---------- | ------------------------------------- | | retry.enabled | boolean | false | 是否启用重试(仅适用于 publish 方法) | | retry.maxRetries | number | 3 | 最大重试次数 | | retry.retryDelay | number | 1000 | 重试延迟时间(毫秒) | | retry.backoff | 'linear'(线性)\| 'exponential'(指数) | 'linear' | 重试退避策略 |

连接配置

| 参数 | 类型 | 默认值 | 说明 | | --------------------------------- | --------- | -------- | ------------------------ | | connection.timeout | number | 15000 | 连接超时时间(毫秒) | | connection.pingInterval | number | 120000 | PING 间隔时间(毫秒) | | connection.maxPingOut | number | 2 | 最大 PING 超时次数 | | connection.maxOutstandingPings | number | 1 | 最大未响应 PING 数量 | | connection.waitOnFirstConnect | boolean | false | 首次连接是否等待 | | connection.ignoreClusterUpdates | boolean | false | 是否忽略集群更新 | | connection.inboxPrefix | string | '' | 临时订阅前缀 | | connection.noEcho | boolean | true | 禁用回声消息(性能优化) | | connection.noRandomize | boolean | false | 禁用服务器随机化 | | connection.noResolve | boolean | false | 禁用 DNS 解析 | | connection.noUseOldRequestStyle | boolean | false | 禁用旧请求风格 |

核心概念

消息模式

  • ONCE 模式: 使用队列组,消息只被一个进程处理(负载均衡模式)
  • ALL 模式: 不使用队列组,所有进程都接收消息(广播模式)

消息大小

客户端不强制执行消息大小限制。消息大小由 NATS 服务器配置控制(默认 1MB,可通过--max_payload配置)。

注意: 如果您的 NATS 服务器配置了更大的负载限制(例如 10MB),您可以发送最大到该限制的消息,无需客户端验证。

队列组

队列组用于实现负载均衡,确保消息只被一个订阅者处理:

// 方式1:直接导入主类(推荐)
const NatsClient = require('bandeng-nats')
const server = new NatsClient({
    clientType: 'server',
    messageMode: 'ONCE' // 使用队列组
    // 队列组名称默认为全局前缀
})

// 方式2:解构导入(向后兼容)
const {nats_realtime_client} = require('bandeng-nats')
const server = new nats_realtime_client({
    clientType: 'server',
    messageMode: 'ONCE' // 使用队列组
    // 队列组名称默认为全局前缀
})

回显机制

NATS 支持消息回显,这决定了客户端是否会接收自己发布的消息:

启用回显 (noEcho: false)

  • 客户端会接收自己发布到订阅主题的消息
  • 有助于调试和自我监控
  • 网络开销较高,因为会传递重复的消息

禁用回显 (noEcho: true) - 默认值

  • 客户端不会接收自己发布的消息
  • 减少网络流量和处理开销
  • 推荐用于生产环境
  • 防止消息处理中潜在的无限循环
  • 更好的性能和资源利用率
// 生产就绪配置(使用默认的 noEcho: true)
const client = new NatsRealtimeClient({
    // noEcho 默认为 true,提供更好的性能
})

// 调试时启用回显
const debugClient = new NatsRealtimeClient({
    connection: {
        noEcho: false // 启用回显便于调试
    }
})

消息头

内置支持 NATS 消息头,提供灵活的元数据传递机制:

自动添加的头部信息

每个请求都会自动添加以下头部信息:

  • rid: 请求 ID(自动生成或用户指定)
  • caller: 来源客户端信息
  • timestamp: 请求时间戳

自定义头部信息

现在支持传递任意自定义头部信息,所有值必须为字符串类型:

// 传递自定义头部信息
const response = await client.request(
    'api.test.Add',
    {a: 1, b: 2},
    {
        rid: 'custom-123',
        userId: 'user123',
        sessionId: 'session456',
        customHeader: 'customValue',
        timeout: '10000'
    }
)

// 服务端处理器中可以访问所有头部信息
router.get('test.Add', (req, res, next) => {
    console.log('请求ID:', req.headers.rid)
    console.log('用户ID:', req.headers.userId)
    console.log('会话ID:', req.headers.sessionId)
    console.log('自定义头部:', req.headers.customHeader)
    console.log('超时设置:', req.headers.timeout)
    console.log('来源客户端:', req.headers.caller)
    console.log('请求时间:', req.headers.timestamp)
    // ... 处理请求
})

头部信息机制说明

  • 灵活性: 可以传递任意键值对,不再限制在预定义的参数列表中
  • 类型要求: 所有值必须为字符串类型,非字符串值会被自动忽略
  • 自动合并: 自定义头部会与系统自动添加的头部信息合并
  • 完整传递: 所有头部信息都会完整传递到服务端,便于追踪和调试
  • 高效处理: 无需不必要的类型转换,系统直接使用字符串值并过滤非字符串值

ACK 机制

支持消息确认机制,确保可靠的消息传递:

// 发布带 ACK 的消息
await client.publish(
    'api.notify',
    {message: 'Hello'},
    {
        ack: true,
        ackTimeout: 5000,
        maxRetries: 3
    }
)

DoS 防护

内置的针对 DoS 攻击的保护,通过智能的重试频率限制(仅适用于启用了重试功能的 publish 操作):

const client = new NatsRealtimeClient({
    retry: {
        enabled: true, // 仅影响publish()方法
        maxRetries: 3,
        retryDelay: 1000,
        backoff: 'exponential'
    }
})

// 自动保护机制对publish操作启动:
// - 全局限制:每分钟100次重试
// - 主题级别限制:每个主题每分钟50次重试
// - 自动清理过期的重试记录

// 当超过限制时:
client.on('error', error => {
    if (error.context === 'retry_protection') {
        console.log('Publish重试因频率限制而被阻止')
    }
})

防护特性:

  • 频率限制:防止对 publish 操作过度的重试尝试
  • 自动清理:基于时间窗口的记录管理
  • 优雅降级:被阻止的重试会被记录但不会导致应用崩溃
  • 主题级别限制:单个主题的独立保护
  • 请求隔离:request 操作不受重试频率限制影响

全局中间件

支持全局中间件,应用于所有路由:

// 方式1:直接导入主类(推荐)
const NatsClient = require('bandeng-nats')
const {UseGlobalMiddlewares} = NatsClient

// 必须在任何路由注册之前调用
UseGlobalMiddlewares(
    async (req, res, next) => {
        console.log('全局中间件 1')
        await next()
    },
    async (req, res, next) => {
        console.log('全局中间件 2')
        await next()
    }
)

// 方式2:解构导入(向后兼容)
const {nats_router} = require('bandeng-nats')
// 也可以使用 nats_router.UseGlobalMiddlewares

API 文档

NatsRealtimeClient

构造函数

new NatsRealtimeClient(options)

事件

  • connected: 连接成功时触发
  • error: 发生错误时触发
  • close: 连接关闭时触发

方法

request(subject, data, options)

发送请求并等待响应。

重要: 请求操作不支持重试功能,这是设计上的选择,因为请求-响应模式需要立即响应处理,不能自动重试。

注意: options 参数非常灵活,接受任意键值对,所有值必须为字符串类型。这允许你传递自定义元数据、头部信息或配置选项,这些都将包含在 NATS 消息头部中。系统高效处理这些选项,直接使用字符串值并自动过滤非字符串值。

参数:

  • subject (string): 请求主题
  • data (any): 请求数据
  • options (object): 请求选项
    • timeout (number): 请求超时时间(毫秒),默认 15000
    • rid (string): 请求 ID,可选(未提供时自动生成)
    • rcid (string): 请求关联 ID,可选
    • pcid (string): 父关联 ID,可选
    • cid (string): 关联 ID,可选
    • clevel (string): 当前级别,可选
    • lang (string): 语言,可选 ('ZH' 或 'EN'),默认 'ZH'
    • 其他自定义头部:所有额外属性都将被转换为字符串并作为头部信息包含
publish(subject, data, options)

发布消息,支持可选的重试功能。

重试支持: 当 ack: true 启用时,该方法支持自动重试,并可配置退避策略。重试配置由客户端构造函数中的全局 retry 选项控制。

参数:

  • subject (string): 消息主题
  • data (any): 消息数据
  • options (object): 发布选项
    • ack (boolean): 是否需要确认,默认 false。启用时触发重试机制
    • ackTimeout (number): ACK 超时时间(毫秒),默认 5000。超时后将触发重试
    • maxRetries (number): 此发布操作的最大重试次数,覆盖全局重试设置
subscribe(subject, callback, options)

订阅消息。

await client.subscribe('api.test.notify', (message, msg) => {
    console.log('收到:', message)
})

参数:

  • subject (string): 消息主题
  • callback (function): 消息处理函数,接收 (message, msg)
  • options (object): 订阅选项
    • ack (boolean): 是否需要发送 ACK 确认消息,默认 false
unsubscribe(subject)

取消订阅。

await client.unsubscribe('api.test.notify')
close()

关闭连接。

await client.close()
healthCheck()

执行健康检查并获取连接状态。

const health = client.healthCheck()
console.log(health)
// 返回健康状态,包括:
// - 连接状态
// - 订阅计数
// - 整体健康状态('healthy'、'unhealthy')

路由系统

Router

// 方式1:直接导入主类(推荐)
const NatsClient = require('bandeng-nats')
const router = new NatsClient.Router()

// 方式2:解构导入(向后兼容)
const {Router} = require('bandeng-nats')
const router = new Router()

// 设置前缀
router.use('api')

// 注册 GET 路由
router.get('user.get', (req, res, next) => {
    res.json({status: 'success', data: userData})
})

// 注册 SET 路由
router.set('user.update', (req, res, next) => {
    res.json({status: 'success', message: 'Updated'})
})

全局函数

SetGlobalPrefix(prefix)

设置全局前缀。

// 方式1:直接导入主类(推荐)
const NatsClient = require('bandeng-nats')

NatsClient.SetGlobalPrefix('api')

// 方式2:解构导入(向后兼容)
// 也可以使用 require('bandeng-nats').SetGlobalPrefix
UseGlobalMiddlewares(...middlewares)

注册全局中间件。

// 方式1:直接导入主类(推荐)
const NatsClient = require('bandeng-nats')

NatsClient.UseGlobalMiddlewares(
    async (req, res, next) => {
        console.log('全局中间件 1')
        await next()
    },
    async (req, res, next) => {
        console.log('全局中间件 2')
        await next()
    }
)

// 方式2:解构导入(向后兼容)
// 也可以使用 require('bandeng-nats').UseGlobalMiddlewares

示例

服务端示例

// 方式1:直接导入主类(推荐)
const NatsClient = require('./index.js')

// 设置全局前缀
NatsClient.SetGlobalPrefix('api')

// 注册全局中间件
NatsClient.UseGlobalMiddlewares(async (req, res, next) => {
    console.log('全局中间件执行')
    await next()
})

// 创建路由
const router = new NatsClient.Router()

// 注册服务
router.get('test.Add', (req, res, next) => {
    const {a, b} = req.body
    res.json({
        status: 'success',
        result: a + b
    })
})

// 创建服务端实例
const server = new NatsClient({
    servers: ['nats://localhost:4222'],
    clientType: 'server',
    frameworkStyle: 'express'
})

server.on('connected', () => {
    console.log('NATS 服务器已启动')
})

// 方式2:解构导入(向后兼容)
const {nats_realtime_client, nats_router} = require('./index.js')
// 也可以使用 nats_realtime_client.SetGlobalPrefix 和 nats_realtime_client.UseGlobalMiddlewares
const {Router} = require('bandeng-nats')

客户端示例

// 方式1:直接导入主类(推荐)
const NatsClient = require('./index.js')

const client = new NatsClient({
    servers: ['nats://localhost:4222'],
    clientType: 'client',
    reconnect: {
        enabled: true,
        maxReconnectAttempts: 10,
        reconnectTimeWait: 2000
    }
})

client.on('connected', () => {
    console.log('NATS 客户端已连接')

    // 发送请求
    client
        .request('api.test.Add', {a: 1, b: 2})
        .then(res => {
            console.log('响应:', res)
        })
        .catch(err => {
            console.error('错误:', err)
        })
})

// 方式2:解构导入(向后兼容)
const {nats_realtime_client} = require('./index.js')

const client = new nats_realtime_client({
    servers: ['nats://localhost:4222'],
    clientType: 'client',
    reconnect: {
        enabled: true,
        maxReconnectAttempts: 10,
        reconnectTimeWait: 2000
    }
})

client.on('connected', () => {
    console.log('NATS 客户端已连接')

    // 发送请求
    client
        .request('api.test.Add', {a: 1, b: 2})
        .then(res => {
            console.log('响应:', res)
        })
        .catch(err => {
            console.error('错误:', err)
        })
})

错误处理

库提供了精简且用户友好的错误处理机制,自动包装 NATS 错误,避免向应用代码暴露庞大的错误栈和嵌套错误对象。

错误精简

所有 NATS 错误都会被自动精简处理:

  • 503 No Responders: 转换为友好消息 "no responders"
  • 400 Bad Request: 使用 API 错误描述或默认消息 "bad request"
  • 409 Conflict: 使用 API 错误描述或默认消息 "conflict"
  • 408 Timeout: 友好的超时消息 "request timeout"
  • 404 Not Found: 友好的未找到消息 "not found"
  • 其他错误: 精简处理,消息长度限制(最多 200 字符),默认 "request failed"

特性

  • 错误对象中不暴露 code 字段
  • 不包含嵌套错误对象(chainedErrorapi_error 等)
  • 不向应用代码暴露错误栈
  • 仅保留必要的上下文(contextsubjecttimestamp

连接错误

client.on('error', err => {
    console.error('连接错误:', err)
    // 错误对象已精简,不包含庞大的错误栈
})

请求错误

client
    .request('api.test.Add', {a: 1, b: 2})
    .then(res => {
        console.log('成功:', res)
    })
    .catch(err => {
        console.error('请求失败:', err.message)
        // 503错误示例输出: "no responders"
        // 超时错误示例输出: "request timeout"
        // 错误对象已精简,不包含嵌套属性
    })

错误对象结构

try {
    await client.request('api.nonexistent', {})
} catch (error) {
    console.log(error.message) // "no responders"
    console.log(error.context) // "request:api.nonexistent"
    console.log(error.timestamp) // "2025-01-27T07:11:50.909Z"
    console.log(error.subject) // "api.nonexistent" (如果可用)
    // 没有code、没有stack、没有嵌套错误
}

重试机制

在请求模式下无效

const client = new nats_realtime_client({
    retry: {
        enabled: true,
        maxRetries: 3,
        retryDelay: 1000,
        backoff: 'exponential'
    }
})

配置验证

自动验证配置参数以确保正确性。

验证规则

  • servers: 必须是非空数组,每个服务器地址必须符合 nats://host:port 格式
  • clientType: 必需,必须是 'client' 或 'server'
  • frameworkStyle: 当 clientType 为 'server' 时必需,必须是 'express' 或 'koa'
  • messageMode: 必需,必须是 'ONCE' 或 'ALL'
  • reconnect.enabled: 可选,默认为 false,必须是布尔值
  • reconnect.maxReconnectAttempts: 当 reconnect.enabled 为 true 时,必须是 -1 到 1000 之间的数字
  • reconnect.reconnectTimeWait: 当 reconnect.enabled 为 true 时,必须是 100 到 60000 之间的数字
  • retry.enabled: 可选,默认为 false,必须是布尔值
  • retry.maxRetries: 当 retry.enabled 为 true 时,必须是 0 到 100 之间的数字
  • retry.retryDelay: 当 retry.enabled 为 true 时,必须是 10 到 60000 之间的数字
  • retry.backoff: 当 retry.enabled 为 true 时必需,必须是 'linear' 或 'exponential'
  • connection.timeout: 可选,必须是 1 到 300000 之间的数字
  • requestOpts.timeout: 可选,必须是 1 到 300000 之间的数字
  • requestOpts.caller: 可选,必须是字符串,长度不超过 100 字符

错误处理

配置验证失败会抛出详细的错误消息:

try {
    const client = new nats_realtime_client({
        servers: [], // 错误: 空数组
        retry: {
            enabled: true,
            backoff: 'invalid' // 错误: 无效的退避策略
        }
    })
} catch (error) {
    console.error('配置错误:', error.message)
    // 输出: 配置错误: servers 必须是非空数组, backoff 必须是 "linear"(线性)或 "exponential"(指数)
}

重要注意事项

  1. 全局前缀: 服务端模式必须设置全局前缀,否则会抛出错误
  2. 路由注册: 路由必须在创建服务端实例之前注册
  3. 全局中间件: 全局中间件必须在使用任何路由注册之前注册
  4. 连接管理: 客户端会自动连接,无需手动调用 connect 方法
  5. 资源清理: 使用完毕后应调用 close() 方法清理资源
  6. 错误监听: 始终监听 error 事件以处理连接和消息错误
  7. 队列组: 多实例部署时使用队列组实现负载均衡
  8. PM2 支持: 原生 PM2 环境检测以实现唯一 ID 生成
  9. 消息头: 自动请求追踪,带唯一 ID 和时间戳
  10. 请求选项: 使用 requestOpts 进行请求特定配置
  11. 消息大小: 消息大小由 NATS 服务器配置控制(默认 1MB,可通过--max_payload配置)

最佳实践

  1. 连接管理: 始终监听连接事件并处理错误
  2. 路由组织: 使用模块化路由文件组织代码
  3. 错误处理: 为所有异步操作添加错误处理
  4. 重连配置: 在生产环境中启用重连机制
  5. 健康监控: 定期检查连接状态和订阅计数
  6. 负载均衡: 使用队列组实现多实例负载均衡
  7. 消息确认: 对重要消息使用 ACK 机制确保可靠性
  8. 全局中间件: 使用全局中间件处理横切关注点,如日志和认证
  9. 请求追踪: 使用 requestOpts 进行更好的请求追踪和调试
  10. PM2 部署: 利用 PM2 集成进行生产部署
  11. 关联追踪: 使用关联 ID (rcid, pcid, cid) 进行分布式追踪
  12. 语言支持: 为国际化应用设置适当的语言头

许可证

ISC 许可证

作者

lizhi

更新日志

v1.1.0

  • 错误处理增强: 精简的错误处理,提供友好的错误消息,不暴露庞大的错误栈
    • 503 No Responders: 自动转换为友好消息 "no responders"
    • 错误精简: 移除 code 字段、嵌套错误对象(chainedErrorapi_error 等)和错误栈
    • 友好消息: 为常见错误码(400、404、408、409、503)提供用户友好的错误消息
    • 消息长度限制: 错误消息限制为 200 字符,防止信息过载
    • 参考 NATS 库: 错误处理方式基于 NATS 库的 consumer.js 实现
  • 消息大小: 移除客户端消息大小校验(由 NATS 服务器配置控制)
    • 服务器配置: 消息大小现在完全由 NATS 服务器的 --max_payload 参数控制
    • 无客户端限制: 客户端不再强制执行 1MB 限制,当服务器支持时可以发送更大的消息
  • Node.js 兼容性: 确保兼容 Node.js 12+(移除了可选链操作符)