@a-mehrabi/aranode-kafka-client
v1.0.0
Published
kafka client implementation for aranode flow engine.
Downloads
6
Readme
Kafka Client Module
kafka client implementation for aranode flow engine.
Table of Contents
Installation
To install this module in your aranode project, you just need to add the package using following command:
$ yarn add @a-mehrabi/aranode-kafka-clientAfter that, when you want to use it, you must include it in custom modules like the following:
path: .env
ARANODE_CUSTOM_MODULES=@a-mehrabi/aranode-kafka-clientUsage
For using Kafka client, you need three descriptions:
Kafka client config config description (kafkaClientConfig)
Kafka producer adapter description (kafkaProducer)
Kafka consumer adapter description (kafkaConsumer)
Kafka client config
Kafka Client config description enables you to define the config of the kafka client, including clientId, brokers config, etc.
You don't define configs of the producer and consumer in this config.
version: 1
kind: config
name: config-name
config:
kafka:
clientId: client-id
brokers:
- localhost:9092
retry:
retries: 8
initialRetryTime: 100
maxRetryTime: 2
factor: 3
multiplier: 4Kafka Producer
to publish messages to Kafka server you have to create a adapter description. kafka producer adapter description, sets a handler for specified kafka client.
flow example:
version: 1
kind: flow
name: flow-name
entryPoint: start
flow:
start:
transform:
path: 'dist/producer-transformer.js'
nextNode: kaka
kafka:
port:
name: kafka-client-out-port
type: outbound
terminal: trueConfigs example:
path: adapter.yml
version: 1
kind: adapter
name: adapter-name
adapter:
producer:
client: kafka-client-config-name
topic: topic-namepath: bind.yml
version: 1
kind: bindPro
name: bind-name
bind:
flow-name:
- type: outbound
port: port-name
adapter: adapter-nameKafka Consumer
kafka consumer allow a group of machines or processes to coordinate access to a list of topics, distributing the load among the consumers.
flow example:
version: 1
kind: flow
name: flow-name
entryPoint: start
flow:
start:
nextNode: consumeData
consumeData:
transform:
path: 'dist/consumer-transformer.js'
terminal: trueConfigs example:
path: adapter.yml
version: 1
kind: adapter
name: consumer-adapter-name
adapter:
consumer:
client: kafka-client-config-name
groupId: group-id
topic: topic-name
fromBeginning: truepath: bind.yml
version: 1
kind: bind
name: bind-name
bind:
flow-name:
- type: inbound
port: port-name
adapter: consumer-adapter-nameAPI
kafkaClientConfig
Type: config description
Options:
clientId (string), required
Kafka client id
brokers (string[]), required
Kafka client must be configured with at least one broker. The brokers on the list are considered seed brokers and are only used to bootstrap the client and load initial metadata
retry ([k: string]: number), optional
The retry option can be used to set the configuration of the retry mechanism, which is used to retry connections and API calls to Kafka (when using producers or consumers)
retries (number), optional, default =
5Max number of retries per call
initialRetryTime (number), optional, default =
300Initial value used to calculate the retry in milliseconds (This is still randomized following the randomization factor)
maxRetryTime (number), optional, default =
30000Maximum wait time for a retry in milliseconds
factor (number), optional, default =
0.2Randomization factor Name of the service that you want to load and use
kafkaProducer
Type: adapter description
Options:
client (string), required
Kafka client config name
topic (string), required, default =
nullTopic name
retry ([k: string]: number), optional
The retry option can be used to set the configuration of the retry mechanism, which is used to retry connections and API calls to Kafka (when using producers or consumers)
retries (number), optional, default =
5Max number of retries per call
initialRetryTime (number), optional, default =
300Initial value used to calculate the retry in milliseconds (This is still randomized following the randomization factor)
maxRetryTime (number), optional, default =
30000Maximum wait time for a retry in milliseconds
factor (number), optional, default =
0.2Randomization factor
multiplier (number), optional, default =
2Exponential factor
metadataMaxAge (number), optional, default =
300000The period of time in milliseconds after which we force a refresh of metadata even if we haven't seen any partition leadership changes to proactively discover any new brokers or partitions
allowAutoTopicCreation (boolean), optional, default =
trueAllow topic creation when querying metadata for non-existent topics
idempotent (boolean), optional, default =
falseExperimental. If enabled producer will ensure each message is written exactly once. Acks must be set to -1 ("all"). Retries will default to MAX_SAFE_INTEGER.
transactionalId (string), optional
transactionTimeout (number), optional, default =
60000The maximum amount of time in ms that the transaction coordinator will wait for a transaction status update from the producer before proactively aborting the ongoing transaction.
If this value is larger than the transaction max timeout ms setting in the broker, the request will fail with a InvalidTransactionTimeout errormaxInFlightRequests (number), optional, default =
nullMax number of requests that may be in progress at any time. If falsey then no limit
kafkaConsumer
Type: adapter description
Options:
client (string), required
Kafka client config name
groupId (string), required
Group id
topic (string), required
Topic name
fromBeginning (boolean), required
The consumer group will use the latest committed offset when starting to fetch messages. If the offset is invalid or not defined, fromBeginning defines the behavior of the consumer group.
sessionTimeout (number), optional, default =
30000The consumer timeout time described in milliseconds
