Skip to content

Commit

Permalink
[apache#30941]fix upgrade test due to missed config ConsumerPollingTi…
Browse files Browse the repository at this point in the history
…meout in KafkaIOTranslation
  • Loading branch information
xianhualiu committed Apr 16, 2024
1 parent 7907c78 commit 95e46c0
Showing 1 changed file with 9 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,9 @@ public Row toConfigRow(Read<?, ?> transform) {
if (transform.getStopReadTime() != null) {
fieldValues.put("stop_read_time", transform.getStopReadTime());
}
if (transform.getConsumerPollingTimeout() != null) {
fieldValues.put("consumer_polling_timeout", transform.getConsumerPollingTimeout());
}

fieldValues.put(
"is_commit_offset_finalize_enabled", transform.isCommitOffsetsInFinalizeEnabled());
Expand Down Expand Up @@ -321,6 +324,12 @@ public Row toConfigRow(Read<?, ?> transform) {
transform =
transform.withMaxReadTime(org.joda.time.Duration.millis(maxReadTime.toMillis()));
}
Duration consumerPollingTimeout = configRow.getValue("consumer_polling_timeout");
if (consumerPollingTimeout != null) {
transform =
transform.withConsumerPollingTimeout(
org.joda.time.Duration.millis(consumerPollingTimeout.toMillis()));
}
Instant startReadTime = configRow.getValue("start_read_time");
if (startReadTime != null) {
transform = transform.withStartReadTime(startReadTime);
Expand Down

0 comments on commit 95e46c0

Please sign in to comment.