id | title |
---|---|
transactions |
Transactions |
KafkaJS provides a simple interface to support Kafka transactions.
Note: Transactions require Kafka version >= v0.11.
You initialize a transaction by making an async call to producer.transaction()
. The returned transaction object has the methods send
and sendBatch
with an identical signature to the producer. When you are done you call transaction.commit()
or transaction.abort()
to end the transaction. A transactionally aware consumer will only read messages which were committed.
Note: Kafka requires that the transactional producer have the following configuration to guarantee EoS ("Exactly-once-semantics"):
- The producer must have a max in flight requests of 1
- The producer must wait for acknowledgement from all replicas (acks=-1)
- The producer must have unlimited retries
Configure the producer client with maxInFlightRequests: 1
, idempotent: true
and a transactionalId
to guarantee EOS. Configuring the options will enable the settings mentioned above.
const client = new Kafka({
clientId: 'transactional-client',
brokers: ['kafka1:9092', 'kafka2:9092'],
})
const producer = client.producer({
transactionalId: 'my-transactional-producer',
maxInFlightRequests: 1,
idempotent: true
})
Within a transaction, you can produce one or more messages. If transaction.abort
is called, all messages will be rolled back.
const transaction = await producer.transaction()
try {
await transaction.send({ topic, messages })
await transaction.commit()
} catch (e) {
await transaction.abort()
}
The transactionalId
allows Kafka to fence out zombie instances by rejecting writes from producers with the same transactionalId
, allowing only writes from the most recently registered producer. To ensure EoS semantics in a stream processing application, it is important that the transactionalId
is always the same for a given input topic and partition in the read-process-write cycle.
The simplest way to achieve this is to encode the topic and partition in the transactionalId
itself such as the scheme: "myapp-producer-" + topic + "-" + partition
.
This article from Confluent goes into much greater detail on how transactions work and is a recommended read before deciding on a transactionalId
.
To send offsets as part of a transaction, meaning they will be committed only if the transaction succeeds, use the transaction.sendOffsets()
method. This is necessary whenever we want a transaction to produce messages derived from a consumer, in a "consume-transform-produce" loop.
await transaction.sendOffsets({
consumerGroupId, topics
})
topics
has the following structure:
[{
topic: <String>,
partitions: [{
partition: <Number>,
offset: <String>
}]
}]