From 37ad3196d81c6dc71f8e1e516ed6eda072b98752 Mon Sep 17 00:00:00 2001 From: Chamikara Jayalath Date: Wed, 21 Aug 2024 09:42:09 -0700 Subject: [PATCH] Fixes a breakage related to Kafka upgrade (#32262) --- .../io/kafka/upgrade/KafkaIOTranslation.java | 28 ++++++++++--------- 1 file changed, 15 insertions(+), 13 deletions(-) diff --git a/sdks/java/io/kafka/upgrade/src/main/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslation.java b/sdks/java/io/kafka/upgrade/src/main/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslation.java index db7b172170a1a..841236969d257 100644 --- a/sdks/java/io/kafka/upgrade/src/main/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslation.java +++ b/sdks/java/io/kafka/upgrade/src/main/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslation.java @@ -332,19 +332,21 @@ public Row toConfigRow(Read transform) { transform = transform.withMaxNumRecords(maxNumRecords); } - Boolean isRedistributed = configRow.getBoolean("redistribute"); - if (isRedistributed != null && isRedistributed) { - transform = transform.withRedistribute(); - Integer redistributeNumKeys = - configRow.getValue("redistribute_num_keys") == null - ? Integer.valueOf(0) - : configRow.getInt32("redistribute_num_keys"); - if (redistributeNumKeys != null && !redistributeNumKeys.equals(0)) { - transform = transform.withRedistributeNumKeys(redistributeNumKeys); - } - Boolean allowDuplicates = configRow.getBoolean("allows_duplicates"); - if (allowDuplicates != null && allowDuplicates) { - transform = transform.withAllowDuplicates(allowDuplicates); + if (TransformUpgrader.compareVersions(updateCompatibilityBeamVersion, "2.58.0") >= 0) { + Boolean isRedistributed = configRow.getBoolean("redistribute"); + if (isRedistributed != null && isRedistributed) { + transform = transform.withRedistribute(); + Integer redistributeNumKeys = + configRow.getValue("redistribute_num_keys") == null + ? Integer.valueOf(0) + : configRow.getInt32("redistribute_num_keys"); + if (redistributeNumKeys != null && !redistributeNumKeys.equals(0)) { + transform = transform.withRedistributeNumKeys(redistributeNumKeys); + } + Boolean allowDuplicates = configRow.getBoolean("allows_duplicates"); + if (allowDuplicates != null && allowDuplicates) { + transform = transform.withAllowDuplicates(allowDuplicates); + } } } Duration maxReadTime = configRow.getValue("max_read_time");