From e259f472addfd8daf429508c887e5288f43ae0f8 Mon Sep 17 00:00:00 2001 From: Jeff Kinard Date: Sun, 15 Sep 2024 16:22:55 -0400 Subject: [PATCH] Don't override existing consumer config values KafkaIO (#32443) Signed-off-by: Jeffrey Kinard --- .../sdk/io/kafka/KafkaReadSchemaTransformProvider.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java index b2eeb1a54d1da..e87669ab2b0a4 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java @@ -149,10 +149,10 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) { Map consumerConfigs = new HashMap<>( MoreObjects.firstNonNull(configuration.getConsumerConfigUpdates(), new HashMap<>())); - consumerConfigs.put(ConsumerConfig.GROUP_ID_CONFIG, "kafka-read-provider-" + groupId); - consumerConfigs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); - consumerConfigs.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100); - consumerConfigs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset); + consumerConfigs.putIfAbsent(ConsumerConfig.GROUP_ID_CONFIG, "kafka-read-provider-" + groupId); + consumerConfigs.putIfAbsent(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); + consumerConfigs.putIfAbsent(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100); + consumerConfigs.putIfAbsent(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset); String format = configuration.getFormat(); boolean handleErrors = ErrorHandling.hasOutput(configuration.getErrorHandling());