Kafka - transactional.id for horizontally scalable consume-transform-produce #2821
-
Given a (Quarkus) application which implements a consume-transform-produce pattern in Kafka using smallrye-reactive-messaging. As per my understanding, in this scenario the transactional.id value should contain the partition id to be able to:
I found the same problem statement described in detail in following SO post: https://stackoverflow.com/questions/50335227/how-to-pick-a-kafka-transaction-id Looking a bit how Spring Kafka provides this functionality, it looks to me like previously they had a producer instance per partition and working with a transactional.id prefix, resulting in transactional.id: Using smallrye-reactive-messaging, can I achieve a horizontally scalable consume-transform-produce loop? How should I configure the transactional.id to achieve this? Thanks in advance for any input. |
Beta Was this translation helpful? Give feedback.
Replies: 1 comment 3 replies
-
Thanks for the very relevant question. That is something we need to document more. First of all, the Kafka connector EOP support is already on KIP-447. Reactive Messaging sets up transactional producer and calls For horizontal scaling, this needs to be unique between your processor instances, to avoid fencing. However, it does not have to be the same between instance restarts. For example, in Kubernetes, you can suffix the For vertical scaling, it is a little more complicated, and limited in Reactive Messaging. In theory, you could have used the concurrency of your EOP consumer to run multiple transactions concurrently in a single instance. BUT this is not currently possible because concurrent transactions will run on the same Kafka producer instance, which of course is not possible. Instead what needs to be done to support this is to back the KafkaTransactions outgoing channel with a kafka-producer-per-topic-partition producer, creating and managing a Kafka producer per topic-partition. This shouldn't be very complicated. Lastly, note that concurrent transactions, whether they are vertically or horizontally scaled, still need to write to separate topic-partitions. This is why |
Beta Was this translation helpful? Give feedback.
Thanks for the very relevant question. That is something we need to document more.
First of all, the Kafka connector EOP support is already on KIP-447.
So you do not need a
transactional.id
suffixed with the partition id.Reactive Messaging sets up transactional producer and calls
initTransactions
when it sees atransactional.id
set. And in Quarkus when aKafkaTransactions
is used, thetransactional.id
for that channel configuration will have a default value of${quarkus.application.name}-${channelName}
, which is of course configurable.For horizontal scaling, this needs to be unique between your processor instances, to avoid fencing. However, it does not have to be the same between insta…