adonis-pulsar
v0.2.2
Published
AdonisJS v7 wrapper for Apache Pulsar
Maintainers
Readme
adonis-pulsar
An AdonisJS v6 package for Apache Pulsar, built on top of the official pulsar-client Node.js library.
It provides:
- A provider that manages the Pulsar client lifecycle
- A
dispatchmethod to publish messages to any topic - An abstract
Consumerclass to handle incoming messages - A generator (
make:consumer) to scaffold new consumers - A
configurehook fornode ace add adonis-pulsar
Requirements
| Dependency | Version |
|---|-------------|
| Node.js | >= 24.0.0 |
| AdonisJS | ^7.0.0 |
| pulsar-client | ^1.11.0 |
Installation
node ace add adonis-pulsarThis will:
- Install the
adonis-pulsarnpm package - Create
config/pulsar.ts - Add
PULSAR_SERVICE_URLto your.envandstart/env.ts - Register the provider and commands in
adonisrc.ts
Manual installation
npm install adonis-pulsar
node ace configure adonis-pulsarConfiguration
The configuration file is located at config/pulsar.ts:
import env from '#start/env'
import { defineConfig } from 'adonis-pulsar'
const pulsarConfig = defineConfig({
serviceUrl: env.get('PULSAR_SERVICE_URL'),
// Optional: JWT token for authentication
token: env.get('PULSAR_TOKEN', ''),
// Optional: default tenant and namespace used to resolve short topic names
tenant: env.get('PULSAR_TENANT', 'public'),
namespace: env.get('PULSAR_NAMESPACE', 'default'),
// Optional: extra options forwarded to new Pulsar.Client()
client: {
operationTimeoutSeconds: 30,
},
// Optional: default producer options applied to every topic
producer: {
sendTimeoutMs: 30000,
},
// Consumers started automatically on app boot
consumers: [
() => import('#consumers/order_consumer'),
],
})
export default pulsarConfigEnvironment variables
PULSAR_SERVICE_URL=pulsar://localhost:6650
# Optional: JWT authentication token
PULSAR_TOKEN=
# Optional: default tenant and namespace for short topic name resolution
PULSAR_TENANT=public
PULSAR_NAMESPACE=defaultFor TLS connections:
PULSAR_SERVICE_URL=pulsar+ssl://broker.example.com:6651
PULSAR_TOKEN=eyJhbGciOiJSUzI1NiJ9...Creating a consumer
node ace make:consumer OrderThis generates app/consumers/order_consumer.ts:
import type Pulsar from 'pulsar-client'
import { Consumer } from 'adonis-pulsar'
export default class OrderConsumer extends Consumer {
static topic = 'order'
static subscription = 'order-subscription'
// static tenant = 'public' // overrides config.tenant for this consumer
// static namespace = 'default' // overrides config.namespace for this consumer
async handle(message: Pulsar.Message, consumer: Pulsar.Consumer): Promise<void> {
const data = message.getData().toString()
console.log('Received message:', data)
consumer.acknowledge(message)
}
}Tenant and namespace resolution
When topic is a short name (e.g. 'order'), the full topic URL is resolved at subscribe time using this priority order:
- Per-consumer
static tenant/static namespace(highest priority) - Global config
tenant/namespace - If neither is set, the short name is passed to Pulsar as-is (or use a full URL directly)
To set global defaults, add them to config/pulsar.ts:
const pulsarConfig = defineConfig({
serviceUrl: env.get('PULSAR_SERVICE_URL'),
tenant: env.get('PULSAR_TENANT'), // e.g. 'public'
namespace: env.get('PULSAR_NAMESPACE'), // e.g. 'default'
consumers: [...],
})To override for a specific consumer, declare static tenant and static namespace on the class:
export default class LegacyOrderConsumer extends Consumer {
static topic = 'order'
static subscription = 'order-subscription'
static tenant = 'legacy'
static namespace = 'v1'
// Resolves to persistent://legacy/v1/order regardless of global config
}Then register it in config/pulsar.ts:
consumers: [
() => import('#consumers/order_consumer'),
],Consumer options
| Static property | Type | Default | Description |
|---|---|---|---|
| topic | string | — | Required. Topic name — short form (e.g. order) or full URL (e.g. persistent://public/default/order) |
| subscription | string | — | Required. Subscription name |
| subscriptionType | SubscriptionType | 'Shared' | Exclusive, Shared, Failover, or KeyShared |
| maxRedeliverCount | number | 0 | Max delivery attempts before rescue() is called. 0 disables the dead-letter policy. |
| tenant | string | undefined | Overrides config.tenant for this consumer. Requires namespace to also be set. |
| namespace | string | undefined | Overrides config.namespace for this consumer. Requires tenant to also be set. |
Handling errors
Override onError to customise the error behaviour. The default implementation calls negativeAcknowledge and re-throws:
async onError(
message: Pulsar.Message,
consumer: Pulsar.Consumer,
error: Error
): Promise<void> {
console.error('Failed to process message', error)
consumer.negativeAcknowledge(message)
}Rescuing failed messages
When maxRedeliverCount is set, the package automatically configures Pulsar's Dead Letter Policy on the subscription. Once a message has been redelivered maxRedeliverCount times and still fails, the rescue() method is called instead of onError(), and the message is then routed to the dead-letter topic (<topic>-<subscription>-DLQ).
Override rescue() to perform cleanup, alerting, or manual persistence before the message is discarded:
import type Pulsar from 'pulsar-client'
import { Consumer } from 'adonis-pulsar'
export default class OrderConsumer extends Consumer {
static topic = 'order'
static subscription = 'order-subscription'
static maxRedeliverCount = 3 // rescue() fires on the 4th failure
async handle(message: Pulsar.Message, consumer: Pulsar.Consumer): Promise<void> {
const order = JSON.parse(message.getData().toString())
await processOrder(order)
consumer.acknowledge(message)
}
async rescue(
message: Pulsar.Message,
consumer: Pulsar.Consumer,
error: Error
): Promise<void> {
const data = message.getData().toString()
// Log to an external system, send an alert, store in a fallback DB…
console.error(`Message permanently failed after ${OrderConsumer.maxRedeliverCount} retries`, {
data,
error: error.message,
redeliveryCount: message.getRedeliveryCount(),
})
}
}Note:
rescue()does not need to callacknowledge— the package does it automatically afterrescue()resolves, so the broker can forward the message to the dead-letter topic. Ifrescue()itself throws, the error is logged and the message is still acknowledged to prevent an infinite loop.
Starting the listener
Consumers start automatically when the HTTP server boots (node ace serve). Each consumer runs its own independent receive loop; a crash in one loop is logged and does not affect the others.
The provider is registered for the web environment only, so it is not loaded during generic Ace console commands (for example node ace make:migration posts).
Publishing messages
Using the service singleton
import pulsar from 'adonis-pulsar/services/main'
// Short name (resolved using config.tenant / config.namespace)
await pulsar.dispatch('order', JSON.stringify({ id: 1 }))
// Full URL — always used as-is
await pulsar.dispatch('persistent://public/default/order', Buffer.from('hello'))Using the container directly
const pulsar = await app.container.make('adonis-pulsar/manager')
await pulsar.dispatch('order', 'hello')Dispatch options
await pulsar.dispatch('order', payload, {
// Custom message properties (key/value string map)
properties: {
correlationId: '123',
source: 'api',
},
// Delay delivery by N milliseconds from now
deliverAfter: 5000,
// Deliver at a specific Unix timestamp (ms)
deliverAt: Date.now() + 60_000,
// Route to a specific partition
partitionKey: 'tenant-42',
})dispatch returns a Pulsar.MessageId that can be used for deduplication or tracing.
Registering consumers programmatically
Outside of the config file, you can register consumers directly on the manager — useful in tests or conditional scenarios:
const pulsar = await app.container.make('adonis-pulsar/manager')
pulsar.register(OrderConsumer, PaymentConsumer)
await pulsar.listen()TypeScript
The package ships full type declarations. The manager is registered in the AdonisJS IoC container with its proper type:
// src/types/extended.ts augments ContainerBindings
import type { PulsarManager } from 'adonis-pulsar'
// Fully typed — no cast needed
const manager = await app.container.make('adonis-pulsar/manager')
await manager.dispatch(...)Package exports
| Export path | Description |
|---|---|
| adonis-pulsar | defineConfig, Consumer, configure, stubsRoot |
| adonis-pulsar/types | PulsarConfig, ConsumerConstructor, DispatchOptions |
| adonis-pulsar/pulsar_provider | AdonisJS service provider |
| adonis-pulsar/commands | MakeConsumer |
| adonis-pulsar/services/main | Pre-resolved PulsarManager singleton |
