Skip to content

Commit

Permalink
[apache#30870]: support consumer polling timeout in KafkaIO expansion…
Browse files Browse the repository at this point in the history
… service
  • Loading branch information
xianhualiu committed Apr 10, 2024
1 parent 0658874 commit a647f88
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -831,6 +831,17 @@ static <K, V> void setupExternalBuilder(
// We can expose dynamic read to external build when ReadFromKafkaDoFn is the default
// implementation.
builder.setDynamicRead(false);

if(config.consumerPollingTimeoutSeconds != null) {
if(config.consumerPollingTimeoutSeconds <= 0) {
throw new IllegalArgumentException("consumerPollingTimeoutSeconds should be > 0.");
}
builder.setConsumerPollingTimeout(
Duration.standardSeconds(config.consumerPollingTimeoutSeconds));
} else {
builder.setConsumerPollingTimeout(
Duration.standardSeconds(2L));
}
}

private static <T> Coder<T> resolveCoder(Class<Deserializer<T>> deserializer) {
Expand Down Expand Up @@ -893,6 +904,7 @@ public static class Configuration {
private Long maxNumRecords;
private Long maxReadTime;
private Boolean commitOffsetInFinalize;
private Long consumerPollingTimeoutSeconds;
private String timestampPolicy;

public void setConsumerConfig(Map<String, String> consumerConfig) {
Expand Down Expand Up @@ -934,6 +946,10 @@ public void setCommitOffsetInFinalize(Boolean commitOffsetInFinalize) {
public void setTimestampPolicy(String timestampPolicy) {
this.timestampPolicy = timestampPolicy;
}

public void setConsumerPollingTimeoutSeconds(Long consumerPollingTimeoutSeconds) {
this.consumerPollingTimeoutSeconds = consumerPollingTimeoutSeconds;
}
}
}

Expand Down Expand Up @@ -1342,7 +1358,7 @@ public Read<K, V> withBadRecordErrorHandler(ErrorHandler<BadRecord, ?> badRecord

/**
* Sets the timeout time for Kafka consumer polling request in the {@link ReadFromKafkaDoFn}.
* The default is 2 second.
* The default is 2 seconds.
*/
public Read<K, V> withConsumerPollingTimeout(Duration duration) {
checkState(
Expand Down Expand Up @@ -2387,7 +2403,7 @@ public ReadSourceDescriptors<K, V> withBadRecordErrorHandler(

/**
* Sets the timeout time for Kafka consumer polling request in the {@link ReadFromKafkaDoFn}.
* The default is 2 second.
* The default is 2 seconds.
*/
public ReadSourceDescriptors<K, V> withConsumerPollingTimeout(@Nullable Duration duration) {
return toBuilder().setConsumerPollingTimeout(duration).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,14 +107,16 @@ public void testConstructKafkaRead() throws Exception {
Field.of("value_deserializer", FieldType.STRING),
Field.of("start_read_time", FieldType.INT64),
Field.of("commit_offset_in_finalize", FieldType.BOOLEAN),
Field.of("timestamp_policy", FieldType.STRING)))
Field.of("timestamp_policy", FieldType.STRING),
Field.of("consumer_polling_timeout_seconds", FieldType.INT64)))
.withFieldValue("topics", topics)
.withFieldValue("consumer_config", consumerConfig)
.withFieldValue("key_deserializer", keyDeserializer)
.withFieldValue("value_deserializer", valueDeserializer)
.withFieldValue("start_read_time", startReadTime)
.withFieldValue("commit_offset_in_finalize", false)
.withFieldValue("timestamp_policy", "ProcessingTime")
.withFieldValue("consumer_polling_timeout_seconds", 5L)
.build());

RunnerApi.Components defaultInstance = RunnerApi.Components.getDefaultInstance();
Expand Down Expand Up @@ -265,6 +267,7 @@ public void testConstructKafkaReadWithoutMetadata() throws Exception {
expansionService.expand(request, observer);
ExpansionApi.ExpansionResponse result = observer.result;
RunnerApi.PTransform transform = result.getTransform();

assertThat(
transform.getSubtransformsList(),
Matchers.hasItem(MatchesPattern.matchesPattern(".*KafkaIO-Read.*")));
Expand Down
9 changes: 7 additions & 2 deletions sdks/python/apache_beam/io/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@
('value_deserializer', str), ('start_read_time', typing.Optional[int]),
('max_num_records', typing.Optional[int]),
('max_read_time', typing.Optional[int]),
('commit_offset_in_finalize', bool), ('timestamp_policy', str)])
('commit_offset_in_finalize', bool), ('timestamp_policy', str),
('consumer_polling_timeout_seconds', typing.Optional[int])])


def default_io_expansion_service(append_args=None):
Expand Down Expand Up @@ -134,6 +135,7 @@ def __init__(
max_read_time=None,
commit_offset_in_finalize=False,
timestamp_policy=processing_time_policy,
consumer_polling_timeout_seconds=None,
with_metadata=False,
expansion_service=None,
):
Expand All @@ -159,6 +161,8 @@ def __init__(
:param commit_offset_in_finalize: Whether to commit offsets when finalizing.
:param timestamp_policy: The built-in timestamp policy which is used for
extracting timestamp from KafkaRecord.
:param consumer_polling_timeout_seconds: Kafka client polling request
timeout time in seconds. Default is 2 seconds.
:param with_metadata: whether the returned PCollection should contain
Kafka related metadata or not. If False (default), elements of the
returned PCollection will be of type 'bytes' if True, elements of the
Expand Down Expand Up @@ -186,7 +190,8 @@ def __init__(
max_read_time=max_read_time,
start_read_time=start_read_time,
commit_offset_in_finalize=commit_offset_in_finalize,
timestamp_policy=timestamp_policy)),
timestamp_policy=timestamp_policy,
consumer_polling_timeout_seconds=consumer_polling_timeout_seconds)),
expansion_service or default_io_expansion_service())


Expand Down

0 comments on commit a647f88

Please sign in to comment.