Skip to content

Commit

Permalink
[GOBBLIN-1922]Add function in Kafka Source to recompute workUnits for…
Browse files Browse the repository at this point in the history
… filtered partitions (#3798)

* add function in Kafka Source to recompute workUnits for filtered partitions

* address comments

* set default min container value to 1

* add condition when create empty wu

* update the condition
  • Loading branch information
hanghangliu authored Oct 26, 2023
1 parent 9a516d3 commit 918716f
Show file tree
Hide file tree
Showing 3 changed files with 204 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,6 @@ protected String canonicalMetricName(String metricGroup, Collection<String> metr
return processedName;
}


/**
* Get a list of all kafka topics
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@
package org.apache.gobblin.source.extractor.extract.kafka;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand All @@ -29,6 +33,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern;

import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -60,6 +65,7 @@
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.event.lineage.LineageInfo;
import org.apache.gobblin.source.extractor.extract.EventBasedSource;
import org.apache.gobblin.source.extractor.extract.kafka.workunit.packer.KafkaTopicGroupingWorkUnitPacker;
import org.apache.gobblin.source.extractor.extract.kafka.workunit.packer.KafkaWorkUnitPacker;
import org.apache.gobblin.source.extractor.extract.kafka.validator.TopicValidators;
import org.apache.gobblin.source.extractor.limiter.LimiterConfigurationKeys;
Expand Down Expand Up @@ -190,10 +196,27 @@ private void setLimiterReportKeyListToWorkUnits(List<WorkUnit> workUnits, List<S

@Override
public List<WorkUnit> getWorkunits(SourceState state) {
return this.getWorkunitsForFilteredPartitions(state, Optional.absent(), Optional.absent());
}

/**
* Compute Workunits for Kafka Topics. If filteredTopicPartition present, respect this map and only compute the provided
* topics and filtered partitions. If not, use state to discover Kafka topics and all available partitions.
*
* @param filteredTopicPartition optional parameter to determine if only filtered topic-partitions are needed.
* @param minContainer give an option to specify a minimum container count. Please be advised that how it being used is
* determined by the implementation of concrete {@link KafkaWorkUnitPacker} class.
*
* TODO: Utilize the minContainer in {@link KafkaTopicGroupingWorkUnitPacker#pack(Map, int)}, as the numContainers variable
* is not used currently.
*/
public List<WorkUnit> getWorkunitsForFilteredPartitions(SourceState state,
Optional<Map<String, List<Integer>>> filteredTopicPartition, Optional<Integer> minContainer) {
this.metricContext = Instrumented.getMetricContext(state, KafkaSource.class);
this.lineageInfo = LineageInfo.getLineageInfo(state.getBroker());

Map<String, List<WorkUnit>> workUnits = Maps.newConcurrentMap();
Map<String, List<Integer>> filteredTopicPartitionMap = filteredTopicPartition.or(new HashMap<>());
Map<String, List<WorkUnit>> kafkaTopicWorkunitMap = Maps.newConcurrentMap();
if (state.getPropAsBoolean(KafkaSource.GOBBLIN_KAFKA_EXTRACT_ALLOW_TABLE_TYPE_NAMESPACE_CUSTOMIZATION)) {
String tableTypeStr =
state.getProp(ConfigurationKeys.EXTRACT_TABLE_TYPE_KEY, KafkaSource.DEFAULT_TABLE_TYPE.toString());
Expand All @@ -213,18 +236,22 @@ public List<WorkUnit> getWorkunits(SourceState state) {
try {
Config config = ConfigUtils.propertiesToConfig(state.getProperties());
GobblinKafkaConsumerClientFactory kafkaConsumerClientFactory = kafkaConsumerClientResolver
.resolveClass(
state.getProp(GOBBLIN_KAFKA_CONSUMER_CLIENT_FACTORY_CLASS,
DEFAULT_GOBBLIN_KAFKA_CONSUMER_CLIENT_FACTORY_CLASS)).newInstance();
.resolveClass(
state.getProp(GOBBLIN_KAFKA_CONSUMER_CLIENT_FACTORY_CLASS,
DEFAULT_GOBBLIN_KAFKA_CONSUMER_CLIENT_FACTORY_CLASS)).newInstance();

this.kafkaConsumerClient.set(kafkaConsumerClientFactory.create(config));

List<KafkaTopic> topics = getValidTopics(getFilteredTopics(state), state);
Collection<KafkaTopic> topics;
if(filteredTopicPartition.isPresent()) {
// If filteredTopicPartition present, use it to construct the whitelist pattern while leave blacklist empty
topics = this.kafkaConsumerClient.get().getFilteredTopics(Collections.emptyList(),
filteredTopicPartitionMap.keySet().stream().map(Pattern::compile).collect(Collectors.toList()));
} else {
topics = getValidTopics(getFilteredTopics(state), state);
}
this.topicsToProcess = topics.stream().map(KafkaTopic::getName).collect(toSet());

for (String topic : this.topicsToProcess) {
LOG.info("Discovered topic " + topic);
}
Map<String, State> topicSpecificStateMap =
DatasetUtils.getDatasetSpecificProps(Iterables.transform(topics, new Function<KafkaTopic, String>() {

Expand All @@ -234,20 +261,13 @@ public String apply(KafkaTopic topic) {
}
}), state);

for (KafkaTopic topic : topics) {
if (topic.getTopicSpecificState().isPresent()) {
topicSpecificStateMap.computeIfAbsent(topic.getName(), k -> new State())
.addAllIfNotExist(topic.getTopicSpecificState().get());
}
}

int numOfThreads = state.getPropAsInt(ConfigurationKeys.KAFKA_SOURCE_WORK_UNITS_CREATION_THREADS,
ConfigurationKeys.KAFKA_SOURCE_WORK_UNITS_CREATION_DEFAULT_THREAD_COUNT);
ExecutorService threadPool =
Executors.newFixedThreadPool(numOfThreads, ExecutorsUtils.newThreadFactory(Optional.of(LOG)));

if (state.getPropAsBoolean(ConfigurationKeys.KAFKA_SOURCE_SHARE_CONSUMER_CLIENT,
ConfigurationKeys.DEFAULT_KAFKA_SOURCE_SHARE_CONSUMER_CLIENT)) {
ConfigurationKeys.DEFAULT_KAFKA_SOURCE_SHARE_CONSUMER_CLIENT)) {
this.sharedKafkaConsumerClient = this.kafkaConsumerClient.get();
} else {
// preallocate one client per thread
Expand All @@ -257,32 +277,44 @@ public String apply(KafkaTopic topic) {
Stopwatch createWorkUnitStopwatch = Stopwatch.createStarted();

for (KafkaTopic topic : topics) {
LOG.info("Discovered topic " + topic);
if (topic.getTopicSpecificState().isPresent()) {
topicSpecificStateMap.computeIfAbsent(topic.getName(), k -> new State())
.addAllIfNotExist(topic.getTopicSpecificState().get());
}
Optional<Set<Integer>> partitionIDSet = Optional.absent();
if(filteredTopicPartition.isPresent()) {
List<Integer> list = java.util.Optional.ofNullable(filteredTopicPartitionMap.get(topic.getName()))
.orElse(new ArrayList<>());
partitionIDSet = Optional.of(new HashSet<>(list));
LOG.info("Compute the workunit for topic {} with num of filtered partitions: {}",
topic.getName(), list.size());
}

threadPool.submit(
new WorkUnitCreator(topic, state, Optional.fromNullable(topicSpecificStateMap.get(topic.getName())),
workUnits));
kafkaTopicWorkunitMap, partitionIDSet));
}

ExecutorsUtils.shutdownExecutorService(threadPool, Optional.of(LOG), 1L, TimeUnit.HOURS);
LOG.info(String.format("Created workunits for %d topics in %d seconds", workUnits.size(),
LOG.info(String.format("Created workunits for %d topics in %d seconds", kafkaTopicWorkunitMap.size(),
createWorkUnitStopwatch.elapsed(TimeUnit.SECONDS)));

// Create empty WorkUnits for skipped partitions (i.e., partitions that have previous offsets,
// but aren't processed).
createEmptyWorkUnitsForSkippedPartitions(workUnits, topicSpecificStateMap, state);
//determine the number of mappers
int maxMapperNum =
state.getPropAsInt(ConfigurationKeys.MR_JOB_MAX_MAPPERS_KEY, ConfigurationKeys.DEFAULT_MR_JOB_MAX_MAPPERS);
// but aren't processed). When filteredTopicPartition present, only filtered topic-partitions are needed so skip this call
if(!filteredTopicPartition.isPresent()) {
createEmptyWorkUnitsForSkippedPartitions(kafkaTopicWorkunitMap, topicSpecificStateMap, state);
}

KafkaWorkUnitPacker kafkaWorkUnitPacker = KafkaWorkUnitPacker.getInstance(this, state, Optional.of(this.metricContext));
int numOfMultiWorkunits = maxMapperNum;
if(state.contains(ConfigurationKeys.MR_TARGET_MAPPER_SIZE)) {
double totalEstDataSize = kafkaWorkUnitPacker.setWorkUnitEstSizes(workUnits);
LOG.info(String.format("The total estimated data size is %.2f", totalEstDataSize));
double targetMapperSize = state.getPropAsDouble(ConfigurationKeys.MR_TARGET_MAPPER_SIZE);
numOfMultiWorkunits = (int) (totalEstDataSize / targetMapperSize) + 1;
numOfMultiWorkunits = Math.min(numOfMultiWorkunits, maxMapperNum);
int numOfMultiWorkunits = minContainer.or(1);
if(state.contains(ConfigurationKeys.MR_JOB_MAX_MAPPERS_KEY)) {
numOfMultiWorkunits = Math.max(numOfMultiWorkunits,
calculateNumMappersForPacker(state, kafkaWorkUnitPacker, kafkaTopicWorkunitMap));
}
addTopicSpecificPropsToWorkUnits(workUnits, topicSpecificStateMap);
List<WorkUnit> workUnitList = kafkaWorkUnitPacker.pack(workUnits, numOfMultiWorkunits);

addTopicSpecificPropsToWorkUnits(kafkaTopicWorkunitMap, topicSpecificStateMap);
List<WorkUnit> workUnitList = kafkaWorkUnitPacker.pack(kafkaTopicWorkunitMap, numOfMultiWorkunits);
setLimiterReportKeyListToWorkUnits(workUnitList, getLimiterExtractorReportKeys());
return workUnitList;
} catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) {
Expand All @@ -305,6 +337,7 @@ public String apply(KafkaTopic topic) {
LOG.error("Exception {} encountered closing GobblinKafkaConsumerClient ", t);
}
}

}

protected void populateClientPool(int count,
Expand Down Expand Up @@ -377,17 +410,37 @@ private void createEmptyWorkUnitsForSkippedPartitions(Map<String, List<WorkUnit>
}
}

//determine the number of mappers/containers for workunit packer
private int calculateNumMappersForPacker(SourceState state,
KafkaWorkUnitPacker kafkaWorkUnitPacker, Map<String, List<WorkUnit>> workUnits) {
int maxMapperNum =
state.getPropAsInt(ConfigurationKeys.MR_JOB_MAX_MAPPERS_KEY, ConfigurationKeys.DEFAULT_MR_JOB_MAX_MAPPERS);
int numContainers = maxMapperNum;
if(state.contains(ConfigurationKeys.MR_TARGET_MAPPER_SIZE)) {
double totalEstDataSize = kafkaWorkUnitPacker.setWorkUnitEstSizes(workUnits);
LOG.info(String.format("The total estimated data size is %.2f", totalEstDataSize));
double targetMapperSize = state.getPropAsDouble(ConfigurationKeys.MR_TARGET_MAPPER_SIZE);
numContainers = (int) (totalEstDataSize / targetMapperSize) + 1;
numContainers = Math.min(numContainers, maxMapperNum);
}
return numContainers;
}

/*
* This function need to be thread safe since it is called in the Runnable
*/
private List<WorkUnit> getWorkUnitsForTopic(KafkaTopic topic, SourceState state, Optional<State> topicSpecificState) {
private List<WorkUnit> getWorkUnitsForTopic(KafkaTopic topic, SourceState state,
Optional<State> topicSpecificState, Optional<Set<Integer>> filteredPartitions) {
Timer.Context context = this.metricContext.timer("isTopicQualifiedTimer").time();
boolean topicQualified = isTopicQualified(topic);
context.close();

List<WorkUnit> workUnits = Lists.newArrayList();
List<KafkaPartition> topicPartitions = topic.getPartitions();
for (KafkaPartition partition : topicPartitions) {
if(filteredPartitions.isPresent() && !filteredPartitions.get().contains(partition.getId())) {
continue;
}
WorkUnit workUnit = getWorkUnitForTopicPartition(partition, state, topicSpecificState);
if (workUnit != null) {
// For disqualified topics, for each of its workunits set the high watermark to be the same
Expand Down Expand Up @@ -895,13 +948,20 @@ private class WorkUnitCreator implements Runnable {
private final SourceState state;
private final Optional<State> topicSpecificState;
private final Map<String, List<WorkUnit>> allTopicWorkUnits;
private final Optional<Set<Integer>> filteredPartitionsId;

WorkUnitCreator(KafkaTopic topic, SourceState state, Optional<State> topicSpecificState,
Map<String, List<WorkUnit>> workUnits) {
this(topic, state, topicSpecificState, workUnits, Optional.absent());
}

WorkUnitCreator(KafkaTopic topic, SourceState state, Optional<State> topicSpecificState,
Map<String, List<WorkUnit>> workUnits, Optional<Set<Integer>> filteredPartitionsId) {
this.topic = topic;
this.state = state;
this.topicSpecificState = topicSpecificState;
this.allTopicWorkUnits = workUnits;
this.filteredPartitionsId = filteredPartitionsId;
}

@Override
Expand All @@ -917,7 +977,7 @@ public void run() {
}

this.allTopicWorkUnits.put(this.topic.getName(),
KafkaSource.this.getWorkUnitsForTopic(this.topic, this.state, this.topicSpecificState));
KafkaSource.this.getWorkUnitsForTopic(this.topic, this.state, this.topicSpecificState, this.filteredPartitionsId));
} catch (Throwable t) {
LOG.error("Caught error in creating work unit for " + this.topic.getName(), t);
throw new RuntimeException(t);
Expand All @@ -930,4 +990,4 @@ public void run() {
}
}
}
}
}
Loading

0 comments on commit 918716f

Please sign in to comment.