Skip to content

Commit

Permalink
Don't override existing consumer config values KafkaIO (#32443)
Browse files Browse the repository at this point in the history
Signed-off-by: Jeffrey Kinard <jeff@thekinards.com>
  • Loading branch information
Polber committed Sep 15, 2024
1 parent 7a6121a commit e259f47
Showing 1 changed file with 4 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -149,10 +149,10 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) {
Map<String, Object> consumerConfigs =
new HashMap<>(
MoreObjects.firstNonNull(configuration.getConsumerConfigUpdates(), new HashMap<>()));
consumerConfigs.put(ConsumerConfig.GROUP_ID_CONFIG, "kafka-read-provider-" + groupId);
consumerConfigs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
consumerConfigs.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100);
consumerConfigs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
consumerConfigs.putIfAbsent(ConsumerConfig.GROUP_ID_CONFIG, "kafka-read-provider-" + groupId);
consumerConfigs.putIfAbsent(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
consumerConfigs.putIfAbsent(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100);
consumerConfigs.putIfAbsent(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);

String format = configuration.getFormat();
boolean handleErrors = ErrorHandling.hasOutput(configuration.getErrorHandling());
Expand Down

0 comments on commit e259f47

Please sign in to comment.