@algoan/pubsub
v6.3.0
Published
PubSub Library for algoan
Readme
PubSub
This is a generic PubSub Factory exposing a listen and a emit method.
NOTE: Today, only Google Cloud PubSub has been added.
Installation
npm install --save @algoan/pubsubUsage
Google Cloud PubSub
Run tests
To run tests or to try the PubSubFactory class, you need to have a google account and have installed gcloud sdk.
Then, to install the Google PubSub simulator, run:
gcloud components install pubsub-emulator
gcloud components install beta
gcloud components updateStart tests running:
npm testIt will launch a Google PubSub emulator thanks to the google-pubsub-emulator library.
Example
To create a PubSub instance using Google Cloud:
import { EmittedMessage, GCPubSub, PubSubFactory, Transport } from '@algoan/pubsub'
const pubsub: GCPubSub = PubSubFactory.create({
transport: Transport.GOOGLE_PUBSUB,
options: {
projectId: 'test',
// And all other Google PubSub properties
}
});
const topicName: string = 'some_topic';
await pubsub.listen(topicName, {
autoAck: true,
onMessage: (data: EmittedMessage<{foo: string}>) => {
console.log(data.parsedData); // { foo: 'bar', time: {Date.now}, _eventName: 'some_topic' }
// do whatever you want. The message has already been acknowledged
},
onError: (error: Error) => {
// Handle error as you wish
}
});
await pubsub.emit(topicName, { foo: 'bar' });Contribution
Thank you for your future contribution 😁 Please follow these instructions before opening a pull request!
API
PubSubFactory.create({ transport, options })
The only static method from the PubSubFactory class. It initiates a new PubSub instance depending on the transport. By default, it connects to Google Cloud PubSub.
transport: PubSub technology to use. OnlyGOOGLE_PUBSUBis available for now.options: Options related to the transport.- If
transport === Transport.GOOGLE_PUBSUB, then have a look at the Google Cloud PubSub config client. debug: Display logs if it is set to true. It uses a pino logger and pino-pretty ifNODE_ENVis not equal toproduction.pinoOptions: Ifdebugis set to true, set the pino logger options. Default tolevel: debugandprettyPrint: trueifNODE_ENVis not equal toproduction.topicsPrefix: Add a prefix to all created topics. Example:topicsPrefix: 'my-topic', all topics will begin withmy-topic+{your topic name}.topicsSeparator: Customize separator between topicsPrefix and topic name. Example:topicsSeparator: '-', all topics will be{topic prefix}-{your topic name} (default to '+').subscriptionsPrefix: Add a prefix to all created subscriptions. Example:subscriptionsPrefix: 'my-sub', all subscriptions will begin withmy-sub%{your topic name}.subscriptionsSeparator: Customize separator between subscriptionsPrefix and topic name. Example:subscriptionsSeparator: '-', all subscriptions will be{subscription prefix}-{your topic name} (default to '%').namespace: Add a namespace property to Message attributes when publishing on a topic.environment: Add a environment property to Message attributes when publishing on a topic.
- If
pubsub.listen(event, opts)
Listen to a specific event.
NOTE: It only uses the Google Cloud subscription pull delivery for now.
event: Name of the event.opts: Options related to the Listener methodonMessage: Method called when receiving a messageonError: Method called when an error occursoptions: Option related to the chosen transport
If the chosen transport is Google Cloud PubSub, then options would be:
autoAck: Automatically ACK an event as soon as it is received (default totrue)subscriptionOptions: Options related to the created Subscription:name: Custom name for the subscription. Default:event(also equal to the topic name)get: Options applied to thegetSubscriptionmethod (have a look at Subscription options)sub: Options applied to the subscription instance (see alsosetOptionsmethod)create: Options applied to thecreateSubscriptionmethod (have a look at Create Subscription options)deadLetterTopicName: Per-subscription override for the dead-letter topic name (see Dead Letter Topics)
topicOptions: Options applied to the created topic (have a look at Topic options)topicName: Set the topic name. By default, it uses the default name with a prefix.
pubsub.emit(event, payload, opts)
Emit a specific event with a payload. It added attributes in the message if you have added a namespace or an environment when setting the PubSubFactory class. It also adds an _eventName and a time property in the emitted payload.
event: Name of the event to emit.payload: Payload to send. It will be buffered by Google, and then parsed by the listen method.opts: Options related to the Emit methodmetadata: Custom metadata added to the messageoptions: Option related to the chosen transport
If the chosen transport is Google Cloud PubSub, then options would be:
topicOptions: Options applied to the created topic (have a look at Topic options)publishOptions: Publish options set to the topic after its creation. Refer to Publish OptionsmessageOptions: Additional message options added to the message. Refer to Message Options
pubsub.unsubscribe(event)
Stop the server connection for a given subscription.
event: Name of of the event to stop listening for.
Dead Letter Topics
When deadLetterOptions is set in the constructor options, the library automatically:
- Creates the dead-letter topic (if it does not exist)
- Creates a drain subscription on the dead-letter topic (to prevent GCP from discarding undelivered messages)
- Grants
roles/pubsub.publisheron the dead-letter topic to the GCP Pub/Sub service account - Grants
roles/pubsub.subscriberon the source subscription to the GCP Pub/Sub service account - Applies the
deadLetterPolicyto the created subscription
This setup only happens once per new subscription — reconnecting to an already-existing subscription incurs zero overhead.
Mode 1: Per-subscription isolated dead-letter topics (recommended for multi-consumer systems)
Omit deadLetterTopicName. Each subscription automatically gets its own <subscriptionName>-deadletter topic, so failed messages from different consumers are isolated and can be replayed independently.
const pubsub: GCPubSub = PubSubFactory.create({
transport: Transport.GOOGLE_PUBSUB,
options: {
projectId: 'my-project',
deadLetterOptions: {
maxDeliveryAttempts: 10, // optional, defaults to 5
},
},
});
// Each listen() call creates its own isolated dead-letter topic:
// - "my-orders-sub-deadletter" for the first subscription
// - "my-payments-sub-deadletter" for the second
await pubsub.listen('my-orders-sub');
await pubsub.listen('my-payments-sub');Mode 2: Shared dead-letter topic (single DLT for all subscriptions)
Provide deadLetterTopicName to route all failed messages to one shared topic. The topic must already exist in GCP.
const pubsub: GCPubSub = PubSubFactory.create({
transport: Transport.GOOGLE_PUBSUB,
options: {
projectId: 'my-project',
deadLetterOptions: {
deadLetterTopicName: 'projects/my-project/topics/my-dead-letter', // fully-qualified or short name
maxDeliveryAttempts: 5,
},
},
});
await pubsub.listen('my-orders-sub'); // routes to "my-dead-letter"
await pubsub.listen('my-payments-sub'); // routes to "my-dead-letter"Mode 3: Per-subscription override
Override the dead-letter topic for a specific subscription via subscriptionOptions.deadLetterTopicName. Falls back to the instance-level deadLetterTopicName, then auto-derives if neither is set.
await pubsub.listen('my-special-sub', {
options: {
subscriptionOptions: {
deadLetterTopicName: 'projects/my-project/topics/special-dead-letter',
},
},
});No dead-letter topic
If deadLetterOptions is not set, no dead-letter resources are created and no IAM calls are made — fully backward compatible.
const pubsub: GCPubSub = PubSubFactory.create({
transport: Transport.GOOGLE_PUBSUB,
options: {
projectId: 'my-project',
// no deadLetterOptions — existing behavior unchanged
},
});