Skip to content

Commit

Permalink
[minor] following 4152, refactor the clazz about plan selection strat…
Browse files Browse the repository at this point in the history
…egy (#6060)
  • Loading branch information
danny0405 authored Jul 8, 2022
1 parent c744848 commit a998586
Show file tree
Hide file tree
Showing 12 changed files with 168 additions and 399 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.common.model.CompactionOperation;
import org.apache.hudi.common.util.collection.Pair;

import org.apache.flink.api.common.functions.AbstractRichFunction;
import org.apache.flink.configuration.Configuration;
Expand All @@ -28,8 +29,7 @@
import org.slf4j.LoggerFactory;

import java.util.List;

import static java.util.stream.Collectors.toList;
import java.util.stream.Collectors;

/**
* Flink hudi compaction source function.
Expand All @@ -53,18 +53,12 @@ public class CompactionPlanSourceFunction extends AbstractRichFunction implement
protected static final Logger LOG = LoggerFactory.getLogger(CompactionPlanSourceFunction.class);

/**
* Compaction instant time.
*/
private final String compactionInstantTime;

/**
* The compaction plan.
* compaction plan instant -> compaction plan
*/
private final HoodieCompactionPlan compactionPlan;
private final List<Pair<String, HoodieCompactionPlan>> compactionPlans;

public CompactionPlanSourceFunction(HoodieCompactionPlan compactionPlan, String compactionInstantTime) {
this.compactionPlan = compactionPlan;
this.compactionInstantTime = compactionInstantTime;
public CompactionPlanSourceFunction(List<Pair<String, HoodieCompactionPlan>> compactionPlans) {
this.compactionPlans = compactionPlans;
}

@Override
Expand All @@ -74,11 +68,14 @@ public void open(Configuration parameters) throws Exception {

@Override
public void run(SourceContext sourceContext) throws Exception {
List<CompactionOperation> operations = this.compactionPlan.getOperations().stream()
.map(CompactionOperation::convertFromAvroRecordInstance).collect(toList());
LOG.info("CompactionPlanFunction compacting " + operations + " files");
for (CompactionOperation operation : operations) {
sourceContext.collect(new CompactionPlanEvent(compactionInstantTime, operation));
for (Pair<String, HoodieCompactionPlan> pair : compactionPlans) {
HoodieCompactionPlan compactionPlan = pair.getRight();
List<CompactionOperation> operations = compactionPlan.getOperations().stream()
.map(CompactionOperation::convertFromAvroRecordInstance).collect(Collectors.toList());
LOG.info("CompactionPlanFunction compacting " + operations + " files");
for (CompactionOperation operation : operations) {
sourceContext.collect(new CompactionPlanEvent(pair.getLeft(), operation));
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@

import org.apache.hudi.config.HoodieMemoryConfig;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.sink.compact.strategy.CompactionPlanStrategy;

import com.beust.jcommander.Parameter;
import org.apache.flink.configuration.Configuration;
import org.apache.hudi.sink.compact.strategy.SingleCompactionPlanSelectStrategy;

/**
* Configurations for Hoodie Flink compaction.
Expand Down Expand Up @@ -102,7 +102,7 @@ public class FlinkCompactionConfig extends Configuration {
@Parameter(names = {"--seq"}, description = "Compaction plan execution sequence, two options are supported:\n"
+ "1). FIFO: execute the oldest plan first;\n"
+ "2). LIFO: execute the latest plan first, by default LIFO", required = false)
public String compactionSeq = SEQ_LIFO;
public String compactionSeq = SEQ_FIFO;

@Parameter(names = {"--service"}, description = "Flink Compaction runs in service mode, disable by default")
public Boolean serviceMode = false;
Expand All @@ -111,21 +111,21 @@ public class FlinkCompactionConfig extends Configuration {
description = "Min compaction interval of async compaction service, default 10 minutes")
public Integer minCompactionIntervalSeconds = 600;

@Parameter(names = {"--select-strategy"}, description = "The strategy define how to select compaction plan to compact.\n"
+ "1). SingleCompactionPlanSelectStrategy: Select first or last compaction plan."
+ "2). MultiCompactionPlanSelectStrategy: Select first or last n compaction plan (n is defined by compactionPlanMaxSelect)."
+ "3). AllPendingCompactionPlanSelectStrategy: Select all pending compaction plan"
+ "4). InstantCompactionPlanSelectStrategy: Select the compaction plan that instant is specified by compactionPlanInstant")
public String compactionPlanSelectStrategy = SingleCompactionPlanSelectStrategy.class.getName();
@Parameter(names = {"--plan-select-strategy"}, description = "The strategy define how to select compaction plan to compact.\n"
+ "1). num_instants: select plans by specific number of instants, it's the default strategy with 1 instant at a time;\n"
+ "3). all: Select all pending compaction plan;\n"
+ "4). instants: Select the compaction plan by specific instants")
public String compactionPlanSelectStrategy = CompactionPlanStrategy.NUM_INSTANTS;

@Parameter(names = {"--select-max-number"}, description = "Max number of compaction plan would be selected in compaction."
@Parameter(names = {"--max-num-plans"}, description = "Max number of compaction plan would be selected in compaction."
+ "It's only effective for MultiCompactionPlanSelectStrategy.")
public Integer compactionPlanMaxSelect = 10;
public Integer maxNumCompactionPlans = 1;

@Parameter(names = {"--select-instant"}, description = "Specify the compaction plan instant to compact and you can specify more than"
+ "one instant in a time by using comma."
+ "It's only effective for InstantCompactionPlanSelectStrategy.")
@Parameter(names = {"--target-instants"}, description = "Specify the compaction plan instants to compact,\n"
+ "Multiple instants are supported by comma separated instant time.\n"
+ "It's only effective for 'instants' plan selection strategy.")
public String compactionPlanInstant;

@Parameter(names = {"--spillable_map_path"}, description = "Default file path prefix for spillable map.", required = false)
public String spillableMapPath = HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH.defaultValue();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,6 @@

package org.apache.hudi.sink.compact;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import org.apache.hudi.async.HoodieAsyncTableService;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.client.HoodieFlinkWriteClient;
Expand All @@ -31,11 +26,10 @@
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.CompactionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.sink.compact.strategy.CompactionPlanSelectStrategy;
import org.apache.hudi.sink.compact.strategy.CompactionPlanStrategies;
import org.apache.hudi.table.HoodieFlinkTable;
import org.apache.hudi.util.CompactionUtil;
import org.apache.hudi.util.StreamerUtil;
Expand Down Expand Up @@ -228,9 +222,8 @@ private void compact() throws Exception {
}

// fetch the instant based on the configured execution sequence
HoodieTimeline timeline = table.getActiveTimeline();
List<HoodieInstant> requested = ((CompactionPlanSelectStrategy) ReflectionUtils.loadClass(cfg.compactionPlanSelectStrategy))
.select(timeline.filterPendingCompactionTimeline(), cfg);
HoodieTimeline pendingCompactionTimeline = table.getActiveTimeline().filterPendingCompactionTimeline();
List<HoodieInstant> requested = CompactionPlanStrategies.getStrategy(cfg).select(pendingCompactionTimeline);
if (requested.isEmpty()) {
// do nothing.
LOG.info("No compaction plan scheduled, turns on the compaction plan schedule with --schedule option");
Expand All @@ -240,7 +233,7 @@ private void compact() throws Exception {
List<String> compactionInstantTimes = requested.stream().map(HoodieInstant::getTimestamp).collect(Collectors.toList());
compactionInstantTimes.forEach(timestamp -> {
HoodieInstant inflightInstant = HoodieTimeline.getCompactionInflightInstant(timestamp);
if (timeline.containsInstant(inflightInstant)) {
if (pendingCompactionTimeline.containsInstant(inflightInstant)) {
LOG.info("Rollback inflight compaction instant: [" + timestamp + "]");
table.rollbackInflightCompaction(inflightInstant);
table.getMetaClient().reloadActiveTimeline();
Expand All @@ -254,13 +247,11 @@ private void compact() throws Exception {
try {
return Pair.of(timestamp, CompactionUtils.getCompactionPlan(table.getMetaClient(), timestamp));
} catch (IOException e) {
throw new HoodieException(e);
throw new HoodieException("Get compaction plan at instant " + timestamp + " error", e);
}
})
// reject empty compaction plan
.filter(pair -> !(pair.getRight() == null
|| pair.getRight().getOperations() == null
|| pair.getRight().getOperations().isEmpty()))
.filter(pair -> validCompactionPlan(pair.getRight()))
.collect(Collectors.toList());

if (compactionPlans.isEmpty()) {
Expand All @@ -270,7 +261,6 @@ private void compact() throws Exception {
}

List<HoodieInstant> instants = compactionInstantTimes.stream().map(HoodieTimeline::getCompactionRequestedInstant).collect(Collectors.toList());
HoodieTimeline pendingCompactionTimeline = table.getActiveTimeline().filterPendingCompactionTimeline();
for (HoodieInstant instant : instants) {
if (!pendingCompactionTimeline.containsInstant(instant)) {
// this means that the compaction plan was written to auxiliary path(.tmp)
Expand All @@ -297,34 +287,19 @@ private void compact() throws Exception {
}
table.getMetaClient().reloadActiveTimeline();

// use side-output to make operations that is in the same plan to be placed in the same stream
// keyby() cannot sure that different operations are in the different stream
DataStream<CompactionPlanEvent> source = env.addSource(new MultiCompactionPlanSourceFunction(compactionPlans))
env.addSource(new CompactionPlanSourceFunction(compactionPlans))
.name("compaction_source")
.uid("uid_compaction_source");

SingleOutputStreamOperator<Void> operator = source.rebalance()
.uid("uid_compaction_source")
.rebalance()
.transform("compact_task",
TypeInformation.of(CompactionCommitEvent.class),
new ProcessOperator<>(new CompactFunction(conf)))
.setParallelism(compactionParallelism)
.process(new ProcessFunction<CompactionCommitEvent, Void>() {
@Override
public void processElement(CompactionCommitEvent event, ProcessFunction<CompactionCommitEvent, Void>.Context context, Collector<Void> out) {
context.output(new OutputTag<>(event.getInstant(), TypeInformation.of(CompactionCommitEvent.class)), event);
}
})
.name("group_by_compaction_plan")
.uid("uid_group_by_compaction_plan")
.addSink(new CompactionCommitSink(conf))
.name("compaction_commit")
.uid("uid_compaction_commit")
.setParallelism(1);

compactionPlans.forEach(pair ->
operator.getSideOutput(new OutputTag<>(pair.getLeft(), TypeInformation.of(CompactionCommitEvent.class)))
.addSink(new CompactionCommitSink(conf))
.name("clean_commits " + pair.getLeft())
.uid("uid_clean_commits_" + pair.getLeft())
.setParallelism(1));

env.execute("flink_hudi_compaction_" + String.join(",", compactionInstantTimes));
}

Expand All @@ -342,4 +317,8 @@ public void shutDown() {
shutdownAsyncService(false);
}
}

private static boolean validCompactionPlan(HoodieCompactionPlan plan) {
return plan != null && plan.getOperations() != null && plan.getOperations().size() > 0;
}
}

This file was deleted.

This file was deleted.

Loading

0 comments on commit a998586

Please sign in to comment.