Skip to content

Commit

Permalink
[#30870]: support consumer polling timeout in KafkaIO expansion servi…
Browse files Browse the repository at this point in the history
…ce (#30915)

* [#30870]: support consumer polling timeout in KafkaIO expansion service

* fixed spotless complains

* fixed python format complains

* Update sdks/python/apache_beam/io/kafka.py

Co-authored-by: Jonathan Sabbagh <108473809+jbsabbagh@users.noreply.github.com>

* Update sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java

Co-authored-by: Jonathan Sabbagh <108473809+jbsabbagh@users.noreply.github.com>

* Update sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java

Co-authored-by: Jonathan Sabbagh <108473809+jbsabbagh@users.noreply.github.com>

* fixed formating issue

* fixed pylint and pydoc issues

* shorten the variable name

* fixed format and upgrade test

* fixed test

* fixed test

---------

Co-authored-by: Jonathan Sabbagh <108473809+jbsabbagh@users.noreply.github.com>
  • Loading branch information
xianhualiu and jbsabbagh authored Apr 16, 2024
1 parent fb6bfc3 commit a44c4f1
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -831,6 +831,16 @@ static <K, V> void setupExternalBuilder(
// We can expose dynamic read to external build when ReadFromKafkaDoFn is the default
// implementation.
builder.setDynamicRead(false);

if (config.consumerPollingTimeout != null) {
if (config.consumerPollingTimeout <= 0) {
throw new IllegalArgumentException("consumerPollingTimeout should be > 0.");
}
builder.setConsumerPollingTimeout(
Duration.standardSeconds(config.consumerPollingTimeout));
} else {
builder.setConsumerPollingTimeout(Duration.standardSeconds(2L));
}
}

private static <T> Coder<T> resolveCoder(Class<Deserializer<T>> deserializer) {
Expand Down Expand Up @@ -893,6 +903,7 @@ public static class Configuration {
private Long maxNumRecords;
private Long maxReadTime;
private Boolean commitOffsetInFinalize;
private Long consumerPollingTimeout;
private String timestampPolicy;

public void setConsumerConfig(Map<String, String> consumerConfig) {
Expand Down Expand Up @@ -934,6 +945,10 @@ public void setCommitOffsetInFinalize(Boolean commitOffsetInFinalize) {
public void setTimestampPolicy(String timestampPolicy) {
this.timestampPolicy = timestampPolicy;
}

public void setConsumerPollingTimeout(Long consumerPollingTimeout) {
this.consumerPollingTimeout = consumerPollingTimeout;
}
}
}

Expand Down Expand Up @@ -1341,8 +1356,9 @@ public Read<K, V> withBadRecordErrorHandler(ErrorHandler<BadRecord, ?> badRecord
}

/**
* Sets the timeout time for Kafka consumer polling request in the {@link ReadFromKafkaDoFn}.
* The default is 2 second.
* Sets the timeout time for Kafka consumer polling request in the {@link ReadFromKafkaDoFn}. A
* lower timeout optimizes for latency. Increase the timeout if the consumer is not fetching
* enough (or any) records. The default is 2 seconds.
*/
public Read<K, V> withConsumerPollingTimeout(Duration duration) {
checkState(
Expand Down Expand Up @@ -2386,8 +2402,9 @@ public ReadSourceDescriptors<K, V> withBadRecordErrorHandler(
}

/**
* Sets the timeout time for Kafka consumer polling request in the {@link ReadFromKafkaDoFn}.
* The default is 2 second.
* Sets the timeout time for Kafka consumer polling request in the {@link ReadFromKafkaDoFn}. A
* lower timeout optimizes for latency. Increase the timeout if the consumer is not fetching
* enough (or any) records. The default is 2 seconds.
*/
public ReadSourceDescriptors<K, V> withConsumerPollingTimeout(@Nullable Duration duration) {
return toBuilder().setConsumerPollingTimeout(duration).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,14 +107,16 @@ public void testConstructKafkaRead() throws Exception {
Field.of("value_deserializer", FieldType.STRING),
Field.of("start_read_time", FieldType.INT64),
Field.of("commit_offset_in_finalize", FieldType.BOOLEAN),
Field.of("timestamp_policy", FieldType.STRING)))
Field.of("timestamp_policy", FieldType.STRING),
Field.of("consumer_polling_timeout", FieldType.INT64)))
.withFieldValue("topics", topics)
.withFieldValue("consumer_config", consumerConfig)
.withFieldValue("key_deserializer", keyDeserializer)
.withFieldValue("value_deserializer", valueDeserializer)
.withFieldValue("start_read_time", startReadTime)
.withFieldValue("commit_offset_in_finalize", false)
.withFieldValue("timestamp_policy", "ProcessingTime")
.withFieldValue("consumer_polling_timeout", 5L)
.build());

RunnerApi.Components defaultInstance = RunnerApi.Components.getDefaultInstance();
Expand Down Expand Up @@ -265,6 +267,7 @@ public void testConstructKafkaReadWithoutMetadata() throws Exception {
expansionService.expand(request, observer);
ExpansionApi.ExpansionResponse result = observer.result;
RunnerApi.PTransform transform = result.getTransform();

assertThat(
transform.getSubtransformsList(),
Matchers.hasItem(MatchesPattern.matchesPattern(".*KafkaIO-Read.*")));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ public class KafkaIOTranslationTest {
READ_TRANSFORM_SCHEMA_MAPPING.put(
"getValueDeserializerProvider", "value_deserializer_provider");
READ_TRANSFORM_SCHEMA_MAPPING.put("getCheckStopReadingFn", "check_stop_reading_fn");
READ_TRANSFORM_SCHEMA_MAPPING.put("getConsumerPollingTimeout", "consumer_polling_timeout");
}

// A mapping from Write transform builder methods to the corresponding schema fields in
Expand Down
11 changes: 9 additions & 2 deletions sdks/python/apache_beam/io/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@
('value_deserializer', str), ('start_read_time', typing.Optional[int]),
('max_num_records', typing.Optional[int]),
('max_read_time', typing.Optional[int]),
('commit_offset_in_finalize', bool), ('timestamp_policy', str)])
('commit_offset_in_finalize', bool), ('timestamp_policy', str),
('consumer_polling_timeout', typing.Optional[int])])


def default_io_expansion_service(append_args=None):
Expand Down Expand Up @@ -134,6 +135,7 @@ def __init__(
max_read_time=None,
commit_offset_in_finalize=False,
timestamp_policy=processing_time_policy,
consumer_polling_timeout=None,
with_metadata=False,
expansion_service=None,
):
Expand All @@ -159,6 +161,10 @@ def __init__(
:param commit_offset_in_finalize: Whether to commit offsets when finalizing.
:param timestamp_policy: The built-in timestamp policy which is used for
extracting timestamp from KafkaRecord.
:param consumer_polling_timeout: Kafka client polling request
timeout time in seconds. A lower timeout optimizes for latency. Increase
the timeout if the consumer is not fetching any records. Default is 2
seconds.
:param with_metadata: whether the returned PCollection should contain
Kafka related metadata or not. If False (default), elements of the
returned PCollection will be of type 'bytes' if True, elements of the
Expand Down Expand Up @@ -186,7 +192,8 @@ def __init__(
max_read_time=max_read_time,
start_read_time=start_read_time,
commit_offset_in_finalize=commit_offset_in_finalize,
timestamp_policy=timestamp_policy)),
timestamp_policy=timestamp_policy,
consumer_polling_timeout=consumer_polling_timeout)),
expansion_service or default_io_expansion_service())


Expand Down

0 comments on commit a44c4f1

Please sign in to comment.