Skip to content

Commit

Permalink
Fixes a breakage related to Kafka upgrade (#32262)
Browse files Browse the repository at this point in the history
  • Loading branch information
chamikaramj committed Aug 21, 2024
1 parent c3432b7 commit 37ad319
Showing 1 changed file with 15 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -332,19 +332,21 @@ public Row toConfigRow(Read<?, ?> transform) {
transform = transform.withMaxNumRecords(maxNumRecords);
}

Boolean isRedistributed = configRow.getBoolean("redistribute");
if (isRedistributed != null && isRedistributed) {
transform = transform.withRedistribute();
Integer redistributeNumKeys =
configRow.getValue("redistribute_num_keys") == null
? Integer.valueOf(0)
: configRow.getInt32("redistribute_num_keys");
if (redistributeNumKeys != null && !redistributeNumKeys.equals(0)) {
transform = transform.withRedistributeNumKeys(redistributeNumKeys);
}
Boolean allowDuplicates = configRow.getBoolean("allows_duplicates");
if (allowDuplicates != null && allowDuplicates) {
transform = transform.withAllowDuplicates(allowDuplicates);
if (TransformUpgrader.compareVersions(updateCompatibilityBeamVersion, "2.58.0") >= 0) {
Boolean isRedistributed = configRow.getBoolean("redistribute");
if (isRedistributed != null && isRedistributed) {
transform = transform.withRedistribute();
Integer redistributeNumKeys =
configRow.getValue("redistribute_num_keys") == null
? Integer.valueOf(0)
: configRow.getInt32("redistribute_num_keys");
if (redistributeNumKeys != null && !redistributeNumKeys.equals(0)) {
transform = transform.withRedistributeNumKeys(redistributeNumKeys);
}
Boolean allowDuplicates = configRow.getBoolean("allows_duplicates");
if (allowDuplicates != null && allowDuplicates) {
transform = transform.withAllowDuplicates(allowDuplicates);
}
}
}
Duration maxReadTime = configRow.getValue("max_read_time");
Expand Down

0 comments on commit 37ad319

Please sign in to comment.