diff --git a/CHANGES.md b/CHANGES.md index 5824c71a98dc..941ba23a7573 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -73,6 +73,7 @@ ## Breaking Changes * X behavior was changed ([#X](https://github.com/apache/beam/issues/X)). +* Default consumer polling timeout for KafkaIO.Read was increased from 1 second to 2 seconds. Use KafkaIO.read().withConsumerPollingTimeout(Duration duration) to configure this timeout value when necessary ([#30870](https://github.com/apache/beam/issues/30870)). ## Deprecations diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java index 231a1b9e49e1..c56071e85adb 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java @@ -587,6 +587,7 @@ public static Read read() { .setCommitOffsetsInFinalizeEnabled(false) .setDynamicRead(false) .setTimestampPolicyFactory(TimestampPolicyFactory.withProcessingTime()) + .setConsumerPollingTimeout(Duration.standardSeconds(2L)) .build(); } @@ -706,6 +707,9 @@ public abstract static class Read @Pure public abstract @Nullable ErrorHandler getBadRecordErrorHandler(); + @Pure + public abstract @Nullable Duration getConsumerPollingTimeout(); + abstract Builder toBuilder(); @AutoValue.Builder @@ -762,6 +766,8 @@ Builder setCheckStopReadingFn( return setCheckStopReadingFn(CheckStopReadingFnWrapper.of(checkStopReadingFn)); } + abstract Builder setConsumerPollingTimeout(Duration consumerPollingTimeout); + abstract Read build(); static void setupExternalBuilder( @@ -1334,6 +1340,17 @@ public Read withBadRecordErrorHandler(ErrorHandler badRecord return toBuilder().setBadRecordErrorHandler(badRecordErrorHandler).build(); } + /** + * Sets the timeout time for Kafka consumer polling request in the {@link ReadFromKafkaDoFn}. + * The default is 2 second. + */ + public Read withConsumerPollingTimeout(Duration duration) { + checkState( + duration == null || duration.compareTo(Duration.ZERO) > 0, + "Consumer polling timeout must be greater than 0."); + return toBuilder().setConsumerPollingTimeout(duration).build(); + } + /** Returns a {@link PTransform} for PCollection of {@link KV}, dropping Kafka metatdata. */ public PTransform>> withoutMetadata() { return new TypedWithoutMetadata<>(this); @@ -1596,7 +1613,8 @@ public PCollection> expand(PBegin input) { .withValueDeserializerProvider(kafkaRead.getValueDeserializerProvider()) .withManualWatermarkEstimator() .withTimestampPolicyFactory(kafkaRead.getTimestampPolicyFactory()) - .withCheckStopReadingFn(kafkaRead.getCheckStopReadingFn()); + .withCheckStopReadingFn(kafkaRead.getCheckStopReadingFn()) + .withConsumerPollingTimeout(kafkaRead.getConsumerPollingTimeout()); if (kafkaRead.isCommitOffsetsInFinalizeEnabled()) { readTransform = readTransform.commitOffsets(); } @@ -2036,6 +2054,9 @@ public abstract static class ReadSourceDescriptors @Pure abstract ErrorHandler getBadRecordErrorHandler(); + @Pure + abstract @Nullable Duration getConsumerPollingTimeout(); + abstract boolean isBounded(); abstract ReadSourceDescriptors.Builder toBuilder(); @@ -2086,6 +2107,9 @@ abstract ReadSourceDescriptors.Builder setBadRecordRouter( abstract ReadSourceDescriptors.Builder setBadRecordErrorHandler( ErrorHandler badRecordErrorHandler); + abstract ReadSourceDescriptors.Builder setConsumerPollingTimeout( + @Nullable Duration duration); + abstract ReadSourceDescriptors.Builder setBounded(boolean bounded); abstract ReadSourceDescriptors build(); @@ -2099,6 +2123,7 @@ public static ReadSourceDescriptors read() { .setBounded(false) .setBadRecordRouter(BadRecordRouter.THROWING_ROUTER) .setBadRecordErrorHandler(new ErrorHandler.DefaultErrorHandler<>()) + .setConsumerPollingTimeout(Duration.standardSeconds(2L)) .build() .withProcessingTime() .withMonotonicallyIncreasingWatermarkEstimator(); @@ -2360,6 +2385,14 @@ public ReadSourceDescriptors withBadRecordErrorHandler( .build(); } + /** + * Sets the timeout time for Kafka consumer polling request in the {@link ReadFromKafkaDoFn}. + * The default is 2 second. + */ + public ReadSourceDescriptors withConsumerPollingTimeout(@Nullable Duration duration) { + return toBuilder().setConsumerPollingTimeout(duration).build(); + } + ReadAllFromRow forExternalBuild() { return new ReadAllFromRow<>(this); } diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOReadImplementationCompatibility.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOReadImplementationCompatibility.java index a2cc9aaeb4d9..7e54407300d4 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOReadImplementationCompatibility.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOReadImplementationCompatibility.java @@ -112,6 +112,7 @@ Object getDefaultValue() { VALUE_DESERIALIZER_PROVIDER, CHECK_STOP_READING_FN(SDF), BAD_RECORD_ERROR_HANDLER(SDF), + CONSUMER_POLLING_TIMEOUT, ; @Nonnull private final ImmutableSet supportedImplementations; diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java index 924833290f13..3a821ef9519e 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java @@ -191,6 +191,12 @@ private ReadFromKafkaDoFn( this.checkStopReadingFn = transform.getCheckStopReadingFn(); this.badRecordRouter = transform.getBadRecordRouter(); this.recordTag = recordTag; + if (transform.getConsumerPollingTimeout() != null) { + this.consumerPollingTimeout = + java.time.Duration.ofMillis(transform.getConsumerPollingTimeout().getMillis()); + } else { + this.consumerPollingTimeout = KAFKA_POLL_TIMEOUT; + } } private static final Logger LOG = LoggerFactory.getLogger(ReadFromKafkaDoFn.class); @@ -217,8 +223,9 @@ private ReadFromKafkaDoFn( private transient @Nullable LoadingCache avgRecordSize; - private static final java.time.Duration KAFKA_POLL_TIMEOUT = java.time.Duration.ofSeconds(1); + private static final java.time.Duration KAFKA_POLL_TIMEOUT = java.time.Duration.ofSeconds(2); + @VisibleForTesting final java.time.Duration consumerPollingTimeout; @VisibleForTesting final DeserializerProvider keyDeserializerProvider; @VisibleForTesting final DeserializerProvider valueDeserializerProvider; @VisibleForTesting final Map consumerConfig; @@ -508,7 +515,7 @@ private ConsumerRecords poll( java.time.Duration elapsed = java.time.Duration.ZERO; while (true) { final ConsumerRecords rawRecords = - consumer.poll(KAFKA_POLL_TIMEOUT.minus(elapsed)); + consumer.poll(consumerPollingTimeout.minus(elapsed)); if (!rawRecords.isEmpty()) { // return as we have found some entries return rawRecords; @@ -518,8 +525,11 @@ private ConsumerRecords poll( return rawRecords; } elapsed = sw.elapsed(); - if (elapsed.toMillis() >= KAFKA_POLL_TIMEOUT.toMillis()) { + if (elapsed.toMillis() >= consumerPollingTimeout.toMillis()) { // timeout is over + LOG.warn( + "No messages retrieved with polling timeout {} seconds. Consider increasing the consumer polling timeout using withConsumerPollingTimeout method.", + consumerPollingTimeout.getSeconds()); return rawRecords; } } diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java index 9b15b86051f5..44c028f08a27 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java @@ -2121,6 +2121,18 @@ public void testSinkMetrics() throws Exception { } } + @Test(expected = IllegalStateException.class) + public void testWithInvalidConsumerPollingTimeout() { + KafkaIO.read().withConsumerPollingTimeout(Duration.standardSeconds(-5)); + } + + @Test + public void testWithValidConsumerPollingTimeout() { + KafkaIO.Read reader = + KafkaIO.read().withConsumerPollingTimeout(Duration.standardSeconds(15)); + assertEquals(15, reader.getConsumerPollingTimeout().getStandardSeconds()); + } + private static void verifyProducerRecords( MockProducer mockProducer, String topic, diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java index 48b5b060a295..8902f22164bc 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java @@ -641,6 +641,20 @@ public void testUnbounded() { Assert.assertNotEquals(0, visitor.unboundedPCollections.size()); } + @Test + public void testConstructorWithPollTimeout() { + ReadSourceDescriptors descriptors = makeReadSourceDescriptor(consumer); + // default poll timeout = 1 scond + ReadFromKafkaDoFn dofnInstance = ReadFromKafkaDoFn.create(descriptors, RECORDS); + Assert.assertEquals(Duration.ofSeconds(2L), dofnInstance.consumerPollingTimeout); + // updated timeout = 5 seconds + descriptors = + descriptors.withConsumerPollingTimeout(org.joda.time.Duration.standardSeconds(5L)); + ReadFromKafkaDoFn dofnInstanceNew = + ReadFromKafkaDoFn.create(descriptors, RECORDS); + Assert.assertEquals(Duration.ofSeconds(5L), dofnInstanceNew.consumerPollingTimeout); + } + private BoundednessVisitor testBoundedness( Function, ReadSourceDescriptors> readSourceDescriptorsDecorator) {