Skip to content

Commit

Permalink
[bug30870]: make consumer polling timeout configurable for KafkaIO.Re…
Browse files Browse the repository at this point in the history
…ad (apache#30877)

* [bug30870]: make consumer polling timeout configurable for KafkaIO.Read

* fixed spotless complains

* fixed unit tests

* added logs and increased default polling timeout from 1 to 2 seconds.

* spotless apply changes

* Update CHANGES.md

updated changes.md with changes to make consumer polling timeout configurable for KafkaIO.Read

* Update CHANGES.md

* Update CHANGES.md

added break changes

* Update CHANGES.md
  • Loading branch information
xianhualiu authored Apr 9, 2024
1 parent 282ff87 commit 3f4b256
Show file tree
Hide file tree
Showing 6 changed files with 75 additions and 4 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
## Breaking Changes

* X behavior was changed ([#X](https://github.com/apache/beam/issues/X)).
* Default consumer polling timeout for KafkaIO.Read was increased from 1 second to 2 seconds. Use KafkaIO.read().withConsumerPollingTimeout(Duration duration) to configure this timeout value when necessary ([#30870](https://github.com/apache/beam/issues/30870)).

## Deprecations

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -587,6 +587,7 @@ public static <K, V> Read<K, V> read() {
.setCommitOffsetsInFinalizeEnabled(false)
.setDynamicRead(false)
.setTimestampPolicyFactory(TimestampPolicyFactory.withProcessingTime())
.setConsumerPollingTimeout(Duration.standardSeconds(2L))
.build();
}

Expand Down Expand Up @@ -706,6 +707,9 @@ public abstract static class Read<K, V>
@Pure
public abstract @Nullable ErrorHandler<BadRecord, ?> getBadRecordErrorHandler();

@Pure
public abstract @Nullable Duration getConsumerPollingTimeout();

abstract Builder<K, V> toBuilder();

@AutoValue.Builder
Expand Down Expand Up @@ -762,6 +766,8 @@ Builder<K, V> setCheckStopReadingFn(
return setCheckStopReadingFn(CheckStopReadingFnWrapper.of(checkStopReadingFn));
}

abstract Builder<K, V> setConsumerPollingTimeout(Duration consumerPollingTimeout);

abstract Read<K, V> build();

static <K, V> void setupExternalBuilder(
Expand Down Expand Up @@ -1334,6 +1340,17 @@ public Read<K, V> withBadRecordErrorHandler(ErrorHandler<BadRecord, ?> badRecord
return toBuilder().setBadRecordErrorHandler(badRecordErrorHandler).build();
}

/**
* Sets the timeout time for Kafka consumer polling request in the {@link ReadFromKafkaDoFn}.
* The default is 2 second.
*/
public Read<K, V> withConsumerPollingTimeout(Duration duration) {
checkState(
duration == null || duration.compareTo(Duration.ZERO) > 0,
"Consumer polling timeout must be greater than 0.");
return toBuilder().setConsumerPollingTimeout(duration).build();
}

/** Returns a {@link PTransform} for PCollection of {@link KV}, dropping Kafka metatdata. */
public PTransform<PBegin, PCollection<KV<K, V>>> withoutMetadata() {
return new TypedWithoutMetadata<>(this);
Expand Down Expand Up @@ -1596,7 +1613,8 @@ public PCollection<KafkaRecord<K, V>> expand(PBegin input) {
.withValueDeserializerProvider(kafkaRead.getValueDeserializerProvider())
.withManualWatermarkEstimator()
.withTimestampPolicyFactory(kafkaRead.getTimestampPolicyFactory())
.withCheckStopReadingFn(kafkaRead.getCheckStopReadingFn());
.withCheckStopReadingFn(kafkaRead.getCheckStopReadingFn())
.withConsumerPollingTimeout(kafkaRead.getConsumerPollingTimeout());
if (kafkaRead.isCommitOffsetsInFinalizeEnabled()) {
readTransform = readTransform.commitOffsets();
}
Expand Down Expand Up @@ -2036,6 +2054,9 @@ public abstract static class ReadSourceDescriptors<K, V>
@Pure
abstract ErrorHandler<BadRecord, ?> getBadRecordErrorHandler();

@Pure
abstract @Nullable Duration getConsumerPollingTimeout();

abstract boolean isBounded();

abstract ReadSourceDescriptors.Builder<K, V> toBuilder();
Expand Down Expand Up @@ -2086,6 +2107,9 @@ abstract ReadSourceDescriptors.Builder<K, V> setBadRecordRouter(
abstract ReadSourceDescriptors.Builder<K, V> setBadRecordErrorHandler(
ErrorHandler<BadRecord, ?> badRecordErrorHandler);

abstract ReadSourceDescriptors.Builder<K, V> setConsumerPollingTimeout(
@Nullable Duration duration);

abstract ReadSourceDescriptors.Builder<K, V> setBounded(boolean bounded);

abstract ReadSourceDescriptors<K, V> build();
Expand All @@ -2099,6 +2123,7 @@ public static <K, V> ReadSourceDescriptors<K, V> read() {
.setBounded(false)
.setBadRecordRouter(BadRecordRouter.THROWING_ROUTER)
.setBadRecordErrorHandler(new ErrorHandler.DefaultErrorHandler<>())
.setConsumerPollingTimeout(Duration.standardSeconds(2L))
.build()
.withProcessingTime()
.withMonotonicallyIncreasingWatermarkEstimator();
Expand Down Expand Up @@ -2360,6 +2385,14 @@ public ReadSourceDescriptors<K, V> withBadRecordErrorHandler(
.build();
}

/**
* Sets the timeout time for Kafka consumer polling request in the {@link ReadFromKafkaDoFn}.
* The default is 2 second.
*/
public ReadSourceDescriptors<K, V> withConsumerPollingTimeout(@Nullable Duration duration) {
return toBuilder().setConsumerPollingTimeout(duration).build();
}

ReadAllFromRow<K, V> forExternalBuild() {
return new ReadAllFromRow<>(this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ Object getDefaultValue() {
VALUE_DESERIALIZER_PROVIDER,
CHECK_STOP_READING_FN(SDF),
BAD_RECORD_ERROR_HANDLER(SDF),
CONSUMER_POLLING_TIMEOUT,
;

@Nonnull private final ImmutableSet<KafkaIOReadImplementation> supportedImplementations;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,12 @@ private ReadFromKafkaDoFn(
this.checkStopReadingFn = transform.getCheckStopReadingFn();
this.badRecordRouter = transform.getBadRecordRouter();
this.recordTag = recordTag;
if (transform.getConsumerPollingTimeout() != null) {
this.consumerPollingTimeout =
java.time.Duration.ofMillis(transform.getConsumerPollingTimeout().getMillis());
} else {
this.consumerPollingTimeout = KAFKA_POLL_TIMEOUT;
}
}

private static final Logger LOG = LoggerFactory.getLogger(ReadFromKafkaDoFn.class);
Expand All @@ -217,8 +223,9 @@ private ReadFromKafkaDoFn(

private transient @Nullable LoadingCache<TopicPartition, AverageRecordSize> avgRecordSize;

private static final java.time.Duration KAFKA_POLL_TIMEOUT = java.time.Duration.ofSeconds(1);
private static final java.time.Duration KAFKA_POLL_TIMEOUT = java.time.Duration.ofSeconds(2);

@VisibleForTesting final java.time.Duration consumerPollingTimeout;
@VisibleForTesting final DeserializerProvider<K> keyDeserializerProvider;
@VisibleForTesting final DeserializerProvider<V> valueDeserializerProvider;
@VisibleForTesting final Map<String, Object> consumerConfig;
Expand Down Expand Up @@ -508,7 +515,7 @@ private ConsumerRecords<byte[], byte[]> poll(
java.time.Duration elapsed = java.time.Duration.ZERO;
while (true) {
final ConsumerRecords<byte[], byte[]> rawRecords =
consumer.poll(KAFKA_POLL_TIMEOUT.minus(elapsed));
consumer.poll(consumerPollingTimeout.minus(elapsed));
if (!rawRecords.isEmpty()) {
// return as we have found some entries
return rawRecords;
Expand All @@ -518,8 +525,11 @@ private ConsumerRecords<byte[], byte[]> poll(
return rawRecords;
}
elapsed = sw.elapsed();
if (elapsed.toMillis() >= KAFKA_POLL_TIMEOUT.toMillis()) {
if (elapsed.toMillis() >= consumerPollingTimeout.toMillis()) {
// timeout is over
LOG.warn(
"No messages retrieved with polling timeout {} seconds. Consider increasing the consumer polling timeout using withConsumerPollingTimeout method.",
consumerPollingTimeout.getSeconds());
return rawRecords;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2121,6 +2121,18 @@ public void testSinkMetrics() throws Exception {
}
}

@Test(expected = IllegalStateException.class)
public void testWithInvalidConsumerPollingTimeout() {
KafkaIO.<Integer, Long>read().withConsumerPollingTimeout(Duration.standardSeconds(-5));
}

@Test
public void testWithValidConsumerPollingTimeout() {
KafkaIO.Read<Integer, Long> reader =
KafkaIO.<Integer, Long>read().withConsumerPollingTimeout(Duration.standardSeconds(15));
assertEquals(15, reader.getConsumerPollingTimeout().getStandardSeconds());
}

private static void verifyProducerRecords(
MockProducer<Integer, Long> mockProducer,
String topic,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -641,6 +641,20 @@ public void testUnbounded() {
Assert.assertNotEquals(0, visitor.unboundedPCollections.size());
}

@Test
public void testConstructorWithPollTimeout() {
ReadSourceDescriptors<String, String> descriptors = makeReadSourceDescriptor(consumer);
// default poll timeout = 1 scond
ReadFromKafkaDoFn<String, String> dofnInstance = ReadFromKafkaDoFn.create(descriptors, RECORDS);
Assert.assertEquals(Duration.ofSeconds(2L), dofnInstance.consumerPollingTimeout);
// updated timeout = 5 seconds
descriptors =
descriptors.withConsumerPollingTimeout(org.joda.time.Duration.standardSeconds(5L));
ReadFromKafkaDoFn<String, String> dofnInstanceNew =
ReadFromKafkaDoFn.create(descriptors, RECORDS);
Assert.assertEquals(Duration.ofSeconds(5L), dofnInstanceNew.consumerPollingTimeout);
}

private BoundednessVisitor testBoundedness(
Function<ReadSourceDescriptors<String, String>, ReadSourceDescriptors<String, String>>
readSourceDescriptorsDecorator) {
Expand Down

0 comments on commit 3f4b256

Please sign in to comment.