Skip to content

Commit

Permalink
Involve CheckStopReadingFn class adding a chance to release resources…
Browse files Browse the repository at this point in the history
… gracefully (apache#29100)

* involve CheckStopReadingFn class adding a chance to release resources gracefully

* involve CheckStopReadingFnWrapper to avoid break changes
  • Loading branch information
gabrywu authored Nov 10, 2023
1 parent 56739a0 commit c713425
Show file tree
Hide file tree
Showing 6 changed files with 136 additions and 19 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.kafka;

import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.kafka.common.TopicPartition;

public interface CheckStopReadingFn extends SerializableFunction<TopicPartition, Boolean> {
default void setup() {}

default void teardown() {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.kafka;

import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.kafka.common.TopicPartition;
import org.checkerframework.checker.nullness.qual.Nullable;

public class CheckStopReadingFnWrapper implements CheckStopReadingFn {
private final SerializableFunction<TopicPartition, Boolean> serializableFunction;

private CheckStopReadingFnWrapper(
SerializableFunction<TopicPartition, Boolean> serializableFunction) {
this.serializableFunction = serializableFunction;
}

public static @Nullable CheckStopReadingFnWrapper of(
@Nullable SerializableFunction<TopicPartition, Boolean> serializableFunction) {
return serializableFunction != null
? new CheckStopReadingFnWrapper(serializableFunction)
: null;
}

@Override
public Boolean apply(TopicPartition input) {
return serializableFunction.apply(input);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -685,7 +685,7 @@ public abstract static class Read<K, V>
abstract @Nullable DeserializerProvider<V> getValueDeserializerProvider();

@Pure
abstract @Nullable SerializableFunction<TopicPartition, Boolean> getCheckStopReadingFn();
abstract @Nullable CheckStopReadingFn getCheckStopReadingFn();

abstract Builder<K, V> toBuilder();

Expand Down Expand Up @@ -733,8 +733,12 @@ abstract Builder<K, V> setKeyDeserializerProvider(
abstract Builder<K, V> setValueDeserializerProvider(
DeserializerProvider<V> deserializerProvider);

abstract Builder<K, V> setCheckStopReadingFn(
SerializableFunction<TopicPartition, Boolean> checkStopReadingFn);
abstract Builder<K, V> setCheckStopReadingFn(@Nullable CheckStopReadingFn checkStopReadingFn);

Builder<K, V> setCheckStopReadingFn(
@Nullable SerializableFunction<TopicPartition, Boolean> checkStopReadingFn) {
return setCheckStopReadingFn(CheckStopReadingFnWrapper.of(checkStopReadingFn));
}

abstract Read<K, V> build();

Expand Down Expand Up @@ -1269,13 +1273,23 @@ public Read<K, V> withConsumerConfigUpdates(Map<String, Object> configUpdates) {
return toBuilder().setConsumerConfig(config).build();
}

/**
* A custom {@link CheckStopReadingFn} that determines whether the {@link ReadFromKafkaDoFn}
* should stop reading from the given {@link TopicPartition}.
*/
public Read<K, V> withCheckStopReadingFn(CheckStopReadingFn checkStopReadingFn) {
return toBuilder().setCheckStopReadingFn(checkStopReadingFn).build();
}

/**
* A custom {@link SerializableFunction} that determines whether the {@link ReadFromKafkaDoFn}
* should stop reading from the given {@link TopicPartition}.
*/
public Read<K, V> withCheckStopReadingFn(
SerializableFunction<TopicPartition, Boolean> checkStopReadingFn) {
return toBuilder().setCheckStopReadingFn(checkStopReadingFn).build();
return toBuilder()
.setCheckStopReadingFn(CheckStopReadingFnWrapper.of(checkStopReadingFn))
.build();
}

/** Returns a {@link PTransform} for PCollection of {@link KV}, dropping Kafka metatdata. */
Expand Down Expand Up @@ -1947,7 +1961,7 @@ public abstract static class ReadSourceDescriptors<K, V>
getConsumerFactoryFn();

@Pure
abstract @Nullable SerializableFunction<TopicPartition, Boolean> getCheckStopReadingFn();
abstract @Nullable CheckStopReadingFn getCheckStopReadingFn();

@Pure
abstract @Nullable SerializableFunction<KafkaRecord<K, V>, Instant>
Expand Down Expand Up @@ -1978,7 +1992,12 @@ abstract ReadSourceDescriptors.Builder<K, V> setConsumerFactoryFn(
SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> consumerFactoryFn);

abstract ReadSourceDescriptors.Builder<K, V> setCheckStopReadingFn(
@Nullable SerializableFunction<TopicPartition, Boolean> checkStopReadingFn);
@Nullable CheckStopReadingFn checkStopReadingFn);

ReadSourceDescriptors.Builder<K, V> setCheckStopReadingFn(
@Nullable SerializableFunction<TopicPartition, Boolean> checkStopReadingFn) {
return setCheckStopReadingFn(CheckStopReadingFnWrapper.of(checkStopReadingFn));
}

abstract ReadSourceDescriptors.Builder<K, V> setKeyDeserializerProvider(
@Nullable DeserializerProvider<K> deserializerProvider);
Expand Down Expand Up @@ -2096,13 +2115,24 @@ public ReadSourceDescriptors<K, V> withConsumerFactoryFn(
return toBuilder().setConsumerFactoryFn(consumerFactoryFn).build();
}

/**
* A custom {@link CheckStopReadingFn} that determines whether the {@link ReadFromKafkaDoFn}
* should stop reading from the given {@link TopicPartition}.
*/
public ReadSourceDescriptors<K, V> withCheckStopReadingFn(
@Nullable CheckStopReadingFn checkStopReadingFn) {
return toBuilder().setCheckStopReadingFn(checkStopReadingFn).build();
}

/**
* A custom {@link SerializableFunction} that determines whether the {@link ReadFromKafkaDoFn}
* should stop reading from the given {@link TopicPartition}.
*/
public ReadSourceDescriptors<K, V> withCheckStopReadingFn(
@Nullable SerializableFunction<TopicPartition, Boolean> checkStopReadingFn) {
return toBuilder().setCheckStopReadingFn(checkStopReadingFn).build();
return toBuilder()
.setCheckStopReadingFn(CheckStopReadingFnWrapper.of(checkStopReadingFn))
.build();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ private ReadFromKafkaDoFn(ReadSourceDescriptors<K, V> transform) {

private final @Nullable Map<String, Object> offsetConsumerConfig;

private final @Nullable SerializableFunction<TopicPartition, Boolean> checkStopReadingFn;
private final @Nullable CheckStopReadingFn checkStopReadingFn;

private final SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>>
consumerFactoryFn;
Expand Down Expand Up @@ -514,6 +514,9 @@ public AverageRecordSize load(TopicPartition topicPartition) throws Exception {
keyDeserializerInstance = keyDeserializerProvider.getDeserializer(consumerConfig, true);
valueDeserializerInstance = valueDeserializerProvider.getDeserializer(consumerConfig, false);
offsetEstimatorCache = new HashMap<>();
if (checkStopReadingFn != null) {
checkStopReadingFn.setup();
}
}

@Teardown
Expand All @@ -532,6 +535,9 @@ public void teardown() throws Exception {
if (offsetEstimatorCache != null) {
offsetEstimatorCache.clear();
}
if (checkStopReadingFn != null) {
checkStopReadingFn.teardown();
}
}

private Map<String, Object> overrideBootstrapServersConfig(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class WatchForKafkaTopicPartitions extends PTransform<PBegin, PCollection<KafkaS
private final SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>>
kafkaConsumerFactoryFn;
private final Map<String, Object> kafkaConsumerConfig;
private final @Nullable SerializableFunction<TopicPartition, Boolean> checkStopReadingFn;
private final @Nullable CheckStopReadingFn checkStopReadingFn;
private final Set<String> topics;
private final @Nullable Pattern topicPattern;
private final @Nullable Instant startReadTime;
Expand All @@ -73,7 +73,7 @@ public WatchForKafkaTopicPartitions(
@Nullable Duration checkDuration,
SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> kafkaConsumerFactoryFn,
Map<String, Object> kafkaConsumerConfig,
@Nullable SerializableFunction<TopicPartition, Boolean> checkStopReadingFn,
@Nullable CheckStopReadingFn checkStopReadingFn,
Set<String> topics,
@Nullable Pattern topicPattern,
@Nullable Instant startReadTime,
Expand Down Expand Up @@ -104,12 +104,12 @@ public PCollection<KafkaSourceDescriptor> expand(PBegin input) {
private static class ConvertToDescriptor
extends DoFn<KV<byte[], TopicPartition>, KafkaSourceDescriptor> {

private final @Nullable SerializableFunction<TopicPartition, Boolean> checkStopReadingFn;
private final @Nullable CheckStopReadingFn checkStopReadingFn;
private final @Nullable Instant startReadTime;
private final @Nullable Instant stopReadTime;

private ConvertToDescriptor(
@Nullable SerializableFunction<TopicPartition, Boolean> checkStopReadingFn,
@Nullable CheckStopReadingFn checkStopReadingFn,
@Nullable Instant startReadTime,
@Nullable Instant stopReadTime) {
this.checkStopReadingFn = checkStopReadingFn;
Expand All @@ -131,6 +131,20 @@ public void processElement(
topicPartition, null, startReadTime, null, stopReadTime, null));
}
}

@Setup
public void setup() throws Exception {
if (checkStopReadingFn != null) {
checkStopReadingFn.setup();
}
}

@Teardown
public void teardown() throws Exception {
if (checkStopReadingFn != null) {
checkStopReadingFn.teardown();
}
}
}

private static class WatchPartitionFn extends PollFn<byte[], TopicPartition> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@
import org.apache.beam.sdk.transforms.Keys;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.Values;
import org.apache.beam.sdk.transforms.windowing.CalendarWindows;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
Expand Down Expand Up @@ -483,14 +482,14 @@ public void testKafkaWithDynamicPartitions() throws IOException {

@Test
public void testKafkaWithStopReadingFunction() {
CheckStopReadingFn checkStopReadingFn = new CheckStopReadingFn();
AlwaysStopCheckStopReadingFn checkStopReadingFn = new AlwaysStopCheckStopReadingFn();

PipelineResult readResult = runWithStopReadingFn(checkStopReadingFn, "stop-reading");

assertEquals(-1, readElementMetric(readResult, NAMESPACE, READ_ELEMENT_METRIC_NAME));
}

private static class CheckStopReadingFn implements SerializableFunction<TopicPartition, Boolean> {
private static class AlwaysStopCheckStopReadingFn implements CheckStopReadingFn {
@Override
public Boolean apply(TopicPartition input) {
return true;
Expand Down Expand Up @@ -640,8 +639,7 @@ public void runReadWriteKafkaViaSchemaTransforms(
assertEquals(PipelineResult.State.DONE, readResult.getState());
}

private static class DelayedCheckStopReadingFn
implements SerializableFunction<TopicPartition, Boolean> {
private static class DelayedCheckStopReadingFn implements CheckStopReadingFn {
int checkCount = 0;

@Override
Expand All @@ -654,8 +652,7 @@ public Boolean apply(TopicPartition input) {
}
}

private PipelineResult runWithStopReadingFn(
SerializableFunction<TopicPartition, Boolean> function, String topicSuffix) {
private PipelineResult runWithStopReadingFn(CheckStopReadingFn function, String topicSuffix) {
writePipeline
.apply("Generate records", Read.from(new SyntheticBoundedSource(sourceOptions)))
.apply("Measure write time", ParDo.of(new TimeMonitor<>(NAMESPACE, WRITE_TIME_METRIC_NAME)))
Expand Down

0 comments on commit c713425

Please sign in to comment.