From c4b0d353ae0c6570c85328e15f9895c102861968 Mon Sep 17 00:00:00 2001 From: Tao Qin Date: Fri, 13 Oct 2023 22:36:17 -0700 Subject: [PATCH] Refine --- .../kafka/validator/TopicNameValidator.java | 8 +- .../kafka/validator/TopicValidatorBase.java | 10 +-- .../kafka/validator/TopicValidators.java | 79 ++++++++++++++++--- .../kafka/validator/TopicValidatorsTest.java | 56 +++++++++++-- .../apache/gobblin/util/ExecutorsUtils.java | 15 ++++ 5 files changed, 138 insertions(+), 30 deletions(-) diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/validator/TopicNameValidator.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/validator/TopicNameValidator.java index 37cf7377023..c8dd8223f3d 100644 --- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/validator/TopicNameValidator.java +++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/validator/TopicNameValidator.java @@ -16,7 +16,7 @@ */ package org.apache.gobblin.source.extractor.extract.kafka.validator; -import org.apache.gobblin.configuration.SourceState; +import org.apache.gobblin.configuration.State; import org.apache.gobblin.source.extractor.extract.kafka.KafkaTopic; /** @@ -25,8 +25,8 @@ public class TopicNameValidator extends TopicValidatorBase { private static final String DOT = "."; - public TopicNameValidator(SourceState sourceState) { - super(sourceState); + public TopicNameValidator(State state) { + super(state); } /** @@ -36,7 +36,7 @@ public TopicNameValidator(SourceState sourceState) { * @return true if the topic name is valid (aka. doesn't contain ".") */ @Override - public boolean validate(KafkaTopic topic) { + public boolean validate(KafkaTopic topic) throws Exception { return !topic.getName().contains(DOT); } } diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/validator/TopicValidatorBase.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/validator/TopicValidatorBase.java index 6637819aade..69c5bc92a65 100644 --- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/validator/TopicValidatorBase.java +++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/validator/TopicValidatorBase.java @@ -16,18 +16,18 @@ */ package org.apache.gobblin.source.extractor.extract.kafka.validator; -import org.apache.gobblin.configuration.SourceState; +import org.apache.gobblin.configuration.State; import org.apache.gobblin.source.extractor.extract.kafka.KafkaTopic; /** * The base class of a topic validator */ public abstract class TopicValidatorBase { - protected SourceState sourceState; + protected State state; - public TopicValidatorBase(SourceState sourceState) { - this.sourceState = sourceState; + public TopicValidatorBase(State sourceState) { + this.state = sourceState; } - public abstract boolean validate(KafkaTopic topic); + public abstract boolean validate(KafkaTopic topic) throws Exception; } diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/validator/TopicValidators.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/validator/TopicValidators.java index 8597d83ef59..fbed07c7609 100644 --- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/validator/TopicValidators.java +++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/validator/TopicValidators.java @@ -16,15 +16,24 @@ */ package org.apache.gobblin.source.extractor.extract.kafka.validator; +import com.google.common.base.Optional; +import com.google.common.base.Stopwatch; import java.util.ArrayList; import java.util.List; -import java.util.stream.Collectors; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang.StringUtils; -import org.apache.gobblin.configuration.SourceState; +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.configuration.State; import org.apache.gobblin.source.extractor.extract.kafka.KafkaTopic; +import org.apache.gobblin.util.ExecutorsUtils; import org.apache.gobblin.util.reflection.GobblinConstructorUtils; + /** * The TopicValidators contains a list of {@link TopicValidatorBase} that validate topics. * To enable it, add below settings in the config: @@ -34,11 +43,16 @@ public class TopicValidators { public static final String VALIDATOR_CLASSES_KEY = "gobblin.kafka.topicValidators"; - public static final String VALIDATOR_CLASS_DELIMITER = ","; + private static long DEFAULTL_TIMEOUT = 10L; + + private static TimeUnit DEFAULT_TIMEOUT_UNIT = TimeUnit.MINUTES; private final List validators = new ArrayList<>(); - public TopicValidators(SourceState state) { + private final State state; + + public TopicValidators(State state) { + this.state = state; for (String validatorClassName : state.getPropAsList(VALIDATOR_CLASSES_KEY, StringUtils.EMPTY)) { try { this.validators.add(GobblinConstructorUtils.invokeConstructor(TopicValidatorBase.class, validatorClassName, @@ -50,26 +64,65 @@ public TopicValidators(SourceState state) { } /** - * Validate topics with all the internal validators. - * Note: the validations for every topic run in parallel. + * Validate topics with all the internal validators. The default timeout is set to 1 hour. + * Note: + * 1. the validations for every topic run in parallel. + * 2. when timeout happens, un-validated topics are still treated as "valid". * @param topics the topics to be validated * @return the topics that pass all the validators */ public List validate(List topics) { - // Validate the topics in parallel - return topics.parallelStream() - .filter(this::validate) - .collect(Collectors.toList()); + return validate(topics, DEFAULTL_TIMEOUT, DEFAULT_TIMEOUT_UNIT); + } + + /** + * Validate topics with all the internal validators. + * Note: + * 1. the validations for every topic run in parallel. + * 2. when timeout happens, un-validated topics are still treated as "valid". + * @param topics the topics to be validated + * @param timeout the timeout for the validation + * @param timeoutUnit the time unit for the timeout + * @return the topics that pass all the validators + */ + public List validate(List topics, long timeout, TimeUnit timeoutUnit) { + int numOfThreads = state.getPropAsInt(ConfigurationKeys.KAFKA_SOURCE_WORK_UNITS_CREATION_THREADS, + ConfigurationKeys.KAFKA_SOURCE_WORK_UNITS_CREATION_DEFAULT_THREAD_COUNT); + + // Tasks running in the thread pool will have the same access control and class loader settings as current thread + ExecutorService threadPool = Executors.newFixedThreadPool(numOfThreads, ExecutorsUtils.newPrivilegedThreadFactory( + Optional.of(log))); + + List> results = new ArrayList<>(); + Stopwatch stopwatch = Stopwatch.createStarted(); + for (KafkaTopic topic : topics) { + results.add(threadPool.submit(() -> validate(topic))); + } + ExecutorsUtils.shutdownExecutorService(threadPool, Optional.of(log), timeout, timeoutUnit); + log.info(String.format("Validate %d topics in %d seconds", topics.size(), stopwatch.elapsed(TimeUnit.SECONDS))); + + List validTopics = new ArrayList<>(); + for (int i = 0; i < results.size(); ++i) { + try { + if (results.get(i).get()) { + validTopics.add(topics.get(i)); + } + } catch (InterruptedException | ExecutionException e) { + log.warn("Failed to validate topic: {}, treat it as a valid topic", topics.get(i)); + validTopics.add(topics.get(i)); + } + } + return validTopics; } /** * Validates a single topic with all the internal validators */ - private boolean validate(KafkaTopic topic) { - log.debug("Validating topic {} in thread: {}", topic, Thread.currentThread().getName()); + private boolean validate(KafkaTopic topic) throws Exception { + log.info("Validating topic {} in thread: {}", topic, Thread.currentThread().getName()); for (TopicValidatorBase validator : this.validators) { if (!validator.validate(topic)) { - log.info("Skip KafkaTopic: {}, by validator: {}", topic, validator.getClass().getName()); + log.warn("KafkaTopic: {} doesn't pass the validator: {}", topic, validator.getClass().getName()); return false; } } diff --git a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/validator/TopicValidatorsTest.java b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/validator/TopicValidatorsTest.java index d75fdc6662e..2691ae112c2 100644 --- a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/validator/TopicValidatorsTest.java +++ b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/validator/TopicValidatorsTest.java @@ -22,8 +22,9 @@ import java.util.Collections; import java.util.List; import java.util.Set; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; -import org.apache.gobblin.configuration.SourceState; +import org.apache.gobblin.configuration.State; import org.apache.gobblin.source.extractor.extract.kafka.KafkaTopic; import org.testng.Assert; import org.testng.annotations.Test; @@ -36,18 +37,17 @@ public void testTopicValidators() { "topic1", "topic2", // allowed "topic-with.period-in_middle", ".topic-with-period-at-start", "topicWithPeriodAtEnd.", // bad topics "topic3", "topic4"); // in deny list - List topics = allTopics.stream() - .map(topicName -> new KafkaTopic(topicName, Collections.emptyList())).collect(Collectors.toList()); + List topics = buildKafkaTopics(allTopics); - SourceState state = new SourceState(); + State state = new State(); // Without any topic validators List validTopics = new TopicValidators(state).validate(topics); Assert.assertEquals(validTopics.size(), 7); // Use 2 topic validators: TopicNameValidator and DenyListValidator - String validatorsToUse = String.join(TopicValidators.VALIDATOR_CLASS_DELIMITER, - ImmutableList.of(TopicNameValidator.class.getName(), DenyListValidator.class.getName())); + String validatorsToUse = String.join(",", ImmutableList.of( + TopicNameValidator.class.getName(), DenyListValidator.class.getName())); state.setProp(TopicValidators.VALIDATOR_CLASSES_KEY, validatorsToUse); validTopics = new TopicValidators(state).validate(topics); @@ -56,12 +56,29 @@ public void testTopicValidators() { Assert.assertTrue(validTopics.stream().anyMatch(topic -> topic.getName().equals("topic2"))); } + @Test + public void testValidatorTimeout() { + List allTopics = Arrays.asList("topic1", "topic2", "topic3"); + List topics = buildKafkaTopics(allTopics); + State state = new State(); + state.setProp(TopicValidators.VALIDATOR_CLASSES_KEY, RejectEverythingValidator.class.getName()); + List validTopics = new TopicValidators(state).validate(topics, 5, TimeUnit.SECONDS); + Assert.assertEquals(validTopics.size(), 1); // topic 2 times out, it should be treated as a valid topic + Assert.assertEquals(validTopics.get(0).getName(), "topic2"); + } + + private List buildKafkaTopics(List topics) { + return topics.stream() + .map(topicName -> new KafkaTopic(topicName, Collections.emptyList())) + .collect(Collectors.toList()); + } + // A TopicValidator class to mimic a deny list public static class DenyListValidator extends TopicValidatorBase { Set denyList = ImmutableSet.of("topic3", "topic4"); - public DenyListValidator(SourceState sourceState) { - super(sourceState); + public DenyListValidator(State state) { + super(state); } @Override @@ -69,4 +86,27 @@ public boolean validate(KafkaTopic topic) { return !this.denyList.contains(topic.getName()); } } + + // A validator that always returns false when validate() is called. + // Sleep for 5 sec when processing topic2 to simulate a slow validation. + public static class RejectEverythingValidator extends TopicValidatorBase { + + public RejectEverythingValidator(State state) { + super(state); + } + + @Override + public boolean validate(KafkaTopic topic) { + if (!topic.getName().equals("topic2")) { + return false; + } + + try { + Thread.sleep(10000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + return false; + } + } } diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/ExecutorsUtils.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/ExecutorsUtils.java index b05674a9592..09f2f00b49a 100644 --- a/gobblin-utility/src/main/java/org/apache/gobblin/util/ExecutorsUtils.java +++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/ExecutorsUtils.java @@ -103,6 +103,21 @@ public static ThreadFactory newDaemonThreadFactory(Optional logger, Opti return newThreadFactory(new ThreadFactoryBuilder().setDaemon(true), logger, nameFormat); } + /** + * Get a new {@link ThreadFactory} that uses a {@link LoggingUncaughtExceptionHandler} + * to handle uncaught exceptions. + * Tasks running within such threads will have the same access control and class loader settings as the + * thread that invokes this method. + * + * @param logger an {@link Optional} wrapping the {@link Logger} that the + * {@link LoggingUncaughtExceptionHandler} uses to log uncaught exceptions thrown in threads + * @return a new {@link ThreadFactory} + */ + public static ThreadFactory newPrivilegedThreadFactory(Optional logger) { + return newThreadFactory(new ThreadFactoryBuilder().setThreadFactory(Executors.privilegedThreadFactory()), logger, + Optional.absent()); + } + private static ThreadFactory newThreadFactory(ThreadFactoryBuilder builder, Optional logger, Optional nameFormat) { if (nameFormat.isPresent()) {