@nitra/nats
v4.3.3
Published
nats helper
Readme
@nitra/nats
NATS JetStream helper для Node.js.1 Простий API для публікації, обробки та моніторингу повідомлень у черзі з гнучким управлінням consumer-ами через конфігурацію та CLI інструменти.
Встановлення
bun add @nitra/nats
# або
npm install @nitra/natsНалаштування змінних середовища
Пакет використовує такі змінні середовища:
NATS_URL— адреса сервера NATS (наприклад,nats://localhost:4222)NATS_STREAM— назва stream (за замовчуваннямdev)
Формат subject
Всі функції працюють із subject у форматі:
projectName:subjectNameПриклади:
myProject:jobsservice:notifications
Публікація повідомлення
import { publish } from '@nitra/nats'
// Публікація повідомлення
await publish('project:subject', { id: 1, foo: 'bar' })
await publish('service:notifications', { message: 'Hello!' })- Повідомлення публікується у subject
dev.project:subject(або${NATS_STREAM}.project:subject) - Consumer-и потрібно створювати окремо через
ensureConsumerабо CLI
Управління Consumer-ами
Програмне створення consumer-а
import { ensureConsumer } from '@nitra/nats'
// Створення простого consumer-а
await ensureConsumer({
streamName: 'dev',
durableName: 'project:subject',
filterSubjects: ['project:subject'],
deliverPolicy: 'all',
ackPolicy: 'explicit'
})
// Створення групового consumer-а для кількох subject-ів
await ensureConsumer({
streamName: 'dev',
durableName: 'worker-group',
filterSubjects: ['project:orders', 'project:payments'],
deliverPolicy: 'all',
ackPolicy: 'explicit'
})CLI для роботи з YAML конфігураціями
Створіть YAML файл з конфігурацією consumer-а:
# consumer.yaml
apiVersion: jetstream.nats.io/v1beta2
kind: Consumer
metadata:
name: 'nats:test'
namespace: dev
spec:
streamName: dev
durableName: 'nats:test'
filterSubjects:
- 'nats:subject'
- 'nats:subject2'
deliverPolicy: all
ackPolicy: explicitЗастосуйте конфігурацію:
# З змінними середовища
NODE_ENV=development NATS_URL=nats://localhost:4222 NATS_STREAM=dev node cli.js consumer.yaml
# Або через npx після публікації
npx @nitra/nats consumer.yamlОбробка повідомлення (worker)
import { read, finish } from '@nitra/nats'
// Читання для стандартного durable consumer (durable_name = subject)
const data = await read('project:subject')
// ...обробка data...
await finish() // підтвердження (ack) повідомлення
// Читання для кастомного durable consumer
const data2 = await read('worker-group')
await finish()- Якщо не викликати
finish(), повідомлення буде повернуто у чергу (nak) при завершенні процесу або помилці. - Durable consumer створюється автоматично при першій публікації.
Кількість непрочитаних повідомлень для durable consumer
import { getPendingCount } from '@nitra/nats'
const count = await getPendingCount('project:subject') // для стандартного durable
console.log('pending:', count)
const count2 = await getPendingCount('worker-group') // для кастомного durable
console.log('pending for group:', count2)Як це працює
publish(subject, data):
- Публікує повідомлення у subject
${stream}.${subject} - Перевіряє формат subject (має бути
project:subject)
- Публікує повідомлення у subject
ensureConsumer(spec):
- Створює consumer якщо не існує
- Оновлює
filter_subjectsякщо вони змінились - Перестворює consumer якщо змінились
deliverPolicyабоackPolicy - Автоматично створює stream якщо потрібно
read(durableName):
- Читає одне повідомлення з черги для durable consumer
finish():
- Підтверджує (ack) повідомлення
getPendingCount(durableName):
- Повертає кількість непрочитаних повідомлень для durable consumer
Важливо
- STREAM у NATS за замовчуванням
dev(або значення змінноїNATS_STREAM) - Consumer-и потрібно створювати явно через
ensureConsumerабо CLI - Subject має відповідати формату
project:subject - Пакет розрахований на single-message workflow (одне повідомлення на читання за раз)
ensureConsumerрозумно оновлює конфігурацію без втрати повідомлень- CLI підтримує YAML конфігурації для декларативного управління consumer-ами
Приклад повного workflow
import { publish, ensureConsumer, read, finish, getPendingCount } from '@nitra/nats'
// 1. Створення consumer-ів
await ensureConsumer({
streamName: 'dev',
durableName: 'project:subject',
filterSubjects: ['project:subject'],
deliverPolicy: 'all',
ackPolicy: 'explicit'
})
await ensureConsumer({
streamName: 'dev',
durableName: 'worker-group',
filterSubjects: ['project:subject', 'project:orders'],
deliverPolicy: 'all',
ackPolicy: 'explicit'
})
// 2. Публікація повідомлень
await publish('project:subject', { id: 1, action: 'create' })
await publish('project:orders', { orderId: 123, amount: 100 })
// 3. Перевірка pending повідомлень
const count1 = await getPendingCount('project:subject')
const count2 = await getPendingCount('worker-group')
console.log(`pending: ${count1}, worker-group: ${count2}`)
// 4. Обробка повідомлень
const data1 = await read('project:subject')
console.log('received:', data1)
await finish()
const data2 = await read('worker-group')
console.log('group received:', data2)
await finish()CLI Інструмент
CLI підтримує роботу з YAML конфігураціями consumer-ів у форматі JetStream Consumer API.
Використання
# Застосування конфігурації consumer-а з YAML файлу
NATS_URL=nats://localhost:4222 node cli.js consumer.yaml
# Через npx після публікації пакету
npx @nitra/nats consumer.yamlФормат YAML конфігурації
apiVersion: jetstream.nats.io/v1beta2
kind: Consumer
metadata:
name: my-consumer
namespace: dev
spec:
streamName: dev
durableName: my-consumer
filterSubjects:
- project:orders
- project:payments
deliverPolicy: all
ackPolicy: explicitCLI автоматично:
- Створить consumer якщо не існує
- Оновить filter_subjects якщо вони змінились
- Перестворить consumer якщо змінились deliverPolicy або ackPolicy
- Створить stream якщо потрібно
Ліцензія
MIT
