samanbayaka
v0.0.15
Published
Moleculer Gateway service with kafka transporter
Readme
samanbayaka
This project is a modular, production-ready microservices framework built with Moleculer and Moleculer-Web latest, using NATS as a high-performance messaging transporter and AJV for robust schema validation. It provides a scalable foundation for developing distributed Node.js applications, emphasizing fault tolerance, clean architecture, and efficient service communication. The system is designed to support flexible orchestration of services while maintaining strong validation and reliability across components.
Features
- Moleculer-based microservices architecture
- NATS transporter for high-performance messaging
- Redpanda transporter for persistent,robust and scalability.
- In memory L1 cache and Redis cache for L2
- Schema validation using AJV
- Dynamic service loading (Gateway / Feature / Demo)
- Graceful shutdown handling (SIGINT, SIGTERM)
- Centralized error handling middleware
- REPL support for debugging
- Config-driven initialization
- Metrics and Log shipping for Grafana
Installation
pnpm install samanbayakaOnce the package is installed, you can import the library using import or require approach:
import sbk from "samanbayaka"Prerequisites
It is assumed that NATS, Redpanda, and Redis are installed and properly configured. All services should be discoverable through the /etc/hosts file.
Minimum required Node.js version: >= 22.x.x
node -v
nvm use 22Additionally, OpenObserve is used to view logs and telemetry.
Environment variables
To configure all required environment variables, create a Bash file:
source /etc/profile.d/sbk.sh
Paste this:
## Configurations files path
export SBK_CONFIG_PATH=/usr/local/etc/<your_folder>
## Web server port 8760-8769
export SBK_PORT=8765
## Log level allowed values are fatal | error | warn | info | debug | trace
export SBK_LOG_LEVEL=info
## Maximum allowed TTL for the memoryLRU (L1) cache; the value is capped at 600 seconds.
export MAX_L1_TTL=120
## Maximum allowed TTL for the redis (L2) cache; the value is capped at 86400 seconds.
export MAX_L2_TTL=600
## Enabling telemetry middleware – Set the value to true; the default is false.
export SBK_TELEMETRY_ENABLE=falseSet environment variables in the current Bash session.
source /etc/profile.d/sbk.shOptionally, you may verify the set environment variables.
echo $SBK_PORTConfiguration files
Create a directory to store the configuration files, then copy all configuration files from the current project path into this directory and update them as needed. Before copying, ensure that your current working directory is the project directory.
pwd
sudo mkdir -p /usr/local/etc/<your_folder>
sudo cp config/*.* /usr/local/etc/<your_folder>Log files
For production, it is recommended to create a new directory named "samanbayaka" under /var/log to store logs.
sudo mkdir -p /var/log/samanbayaka
sudo chown -R $USER:$(id -gn) /var/log/samanbayaka
sudo chmod 755 /var/log/samanbayakaEmpty your log files if they become very large, as follows:
truncate -s 0 /var/log/samanbayaka/sbk-*.logSetup logrotate (optional)
Create config file:
sudo nano /etc/logrotate.d/sbkPaste this:
/var/log/samanbayaka/sbk-*.log {
daily
rotate 14
compress
delaycompress
missingok
notifempty
copytruncate
create 0640 node node
}API Docs
sbk.Errors
sbk.Errors exposes the Moleculer Errors object.
sbk.getConfig
sbk.getConfig is a function that returns a configuration object from a specified file name.
The file is loaded from the configuration directory defined by the environment variable.
sbk.loadDemo
sbk.loadDemo is an asynchronous function that creates a demo service to help understand the project.
sbk.loadGatewayService
sbk.loadGatewayService is an asynchronous function that creates a gateway service to expose REST APIs.
sbk.loadFeatureService
sbk.loadFeatureService is an asynchronous function that creates a feature service.
Function Signature
sbk.loadFeatureService(schema)Parameter
schema- An object that follows the Moleculer service schema structure.
Example:
{
name: "math",
actions: {
add(ctx) {
return Number(ctx.params.a) + Number(ctx.params.b);
},
sub(ctx) {
return Number(ctx.params.a) - Number(ctx.params.b);
}
}
}sbk.auxBrokerService
sbk.auxBrokerService is a function used to initialize and manage message brokers such as Kafka, MQTT, and RabbitMQ.
Function Signature
sbk.auxBrokerService(
type,
brokerOpts = {},
callback = () => {}
)Parameters
type(string) - Specifies the broker type to initialize. Supported values:"kafka""mqtt""amqp"
Example:
"kafka"brokerOpts(object) - Configuration object used to initialize the broker client, producer, and consumer. Example structure:
{
name: <your_service_name>
client: {},
producer: {},
consumer: {
groupId: "all",
interMessageDelayMs: 10,
},
logLevel: "INFO",
gzip: true,
msgPack: true,
rest: false,
}Configuration Options
| Property | Type | Description |
|---|---|---|
| name | string | Only lowercase letters (a–z), digits (0–9), and hyphens (-) are allowed. |
| client | object | Broker client configuration |
| producer | object | Producer configuration |
| consumer | object | Consumer configuration |
| consumer.groupId | string | Consumer group identifier is mandatory for the consumer |
| consumer.interMessageDelayMs | number | Optional delay between messages in milliseconds |
| logLevel | string | Logging level (NOTHING, ERROR, WARN, INFO, DEBUG), default NOTHING |
| gzip | boolean | Enables message compression; only GZIP is supported, default true |
| msgPack | boolean | Enables MessagePack serialization, default true |
| rest | boolean | Enables REST end point, default false |
callback(function) - Callback function executed when a message/event is received.
Arguments
| Argument | Description |
|---|---|
| ctx | Broker context |
| payload | Incoming message payload |
Example:
(ctx, payload) => {
ctx.broker.logger.debug(payload)
}Usage
Create gateway service
The Gateway is a critical service that exposes all endpoints as REST APIs. At least one Gateway service must be running to make the REST APIs accessible.
mkdir gateway
cd gateway
pnpm init
pnpm install samanbayaka
touch index.mjsOpen index.mjs and paste the following:
import sbk from "samanbayaka"
await sbk.loadGatewayService()Create feature services
mkdir <your_service_name>
cd <your_service_name>
pnpm init
pnpm install samanbayaka
touch index.mjsA feature service, such as hello, can be created as follows:
mkdir hello
cd hello
pnpm init
pnpm install samanbayaka
touch index.mjsOpen index.mjs and paste the following:
import sbk from "samanbayaka"
await sbk.loadFeatureService({
name: "hello",
version: "v1",
actions: {
hello: {
rest: {
method: "GET",
path: "/hello"
},
/**
* Enable caching to this action
*/
cache: {
/**
* Cache entries expire after 30 seconds.
* For L1 caching, TTL should be defined as [L2_TTL, L1_TTL].
* Example: ttl: [60, 10] → L2 TTL is 60 seconds and L1 TTL is 10 seconds.
*/
ttl: 30
},
handler(ctx){
return "Hello Samanbayaka"
}
},
welcome: {
rest: "GET /welcome",
handler(ctx) {
return `Welcome, ${ctx.params.query?.name || "Guest"} - ${ctx.broker.nodeID}`
}
},
},
})
Create auxaliary broker services
for producer
mkdir <your_service_name>-<type>-<topic>
cd <your_service_name>-<type>-<topic>
pnpm init
pnpm install samanbayaka
touch index.mjsAn auxiliary service, such as a producer that publishes messages to the STUDENT topic, can be created as follows:
mkdir producer-kafka-student
cd producer-kafka-student
pnpm init
pnpm install samanbayaka
touch index.mjsOpen index.mjs and paste the following:
import sbk from "samanbayaka"
await sbk.auxBrokerService(
"kafka",
{
name: "producer-kafka-student",
rest: true,
...
},
() => {}
)In the service configuration name end with "*" i.e. producer-kafka-student* can capable to publish to the all topics start with STUDENT like STUDENT.SCIENCE, STUDENT.ARTS etc.
name: "producer-kafka-student*"If you do not expose the service as a REST API and run it as a private/internal service, either remove the rest property or set it to false.
rest: falsefor consumer
mkdir <your_service_name>-<type>-<topic>-<group>
cd <your_service_name>-<type>-<topic>-<group>
pnpm init
pnpm install samanbayaka
touch index.mjsAn auxiliary service, such as a consumer that subscribe messages from the STUDENT topic having group name junior, can be created as follows:
mkdir producer-kafka-student-junior
cd producer-kafka-student-junior
pnpm init
pnpm install samanbayaka
touch index.mjsOpen index.mjs and paste the following:
import sbk from "samanbayaka"
await sbk.auxBrokerService(
"kafka",
{
name: "consumer-kafka-student",
consumer: {
groupId: "junior",
interMessageDelayMs: 10,
...
},
...
},
() => {}
)The interMessageDelayMs option introduces a delay between two consecutive message/event consumptions, measured in milliseconds. If you do not need any delay between message consumption, simply remove the interMessageDelayMs property from the consumer options.
Run the services
- development/testing
cd <your_path>/gateway
node index.mjs
cd <your_path>/hello
node index.mjs- production
cd <your_path>/gateway
node index.mjs >> /var/log/samanbayaka/sbk-${HOSTNAME}-$$.log 2>&1
cd <your_path>/hello
node index.mjs >> /var/log/samanbayaka/sbk-${HOSTNAME}-$$.log 2>&1Demo
You can also run the demo service to better understand how microservices work in the Moleculer ecosystem and how REST APIs are exposed.
mkdir demo
cd demo
pnpm init
pnpm install samanbayaka
touch demo.mjsOpen and edit the demo.mjs file as follows
import sbk from "samanbayaka"
await sbk.loadDemo()Run the service, demo
node demo.mjs