From 1db6864ef968478893a50e533c89ffe324582514 Mon Sep 17 00:00:00 2001 From: lanyuanxiaoyao Date: Wed, 25 May 2022 11:54:53 +0800 Subject: [PATCH 1/6] [HUDI-4152] Flink offline compaction allow compact multi compaction plan at once --- .../sink/compact/FlinkCompactionConfig.java | 16 +++ .../sink/compact/HoodieFlinkCompactor.java | 130 ++++++++++++------ ...llPendingCompactionPlanSelectStrategy.java | 35 +++++ .../CompactionPlanSelectStrategy.java | 34 +++++ .../InstantCompactionPlanSelectStrategy.java | 40 ++++++ .../MultiCompactionPlanSelectStrategy.java | 42 ++++++ .../SingleCompactionPlanSelectStrategy.java | 43 ++++++ 7 files changed, 299 insertions(+), 41 deletions(-) create mode 100644 hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/strategy/AllPendingCompactionPlanSelectStrategy.java create mode 100644 hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/strategy/CompactionPlanSelectStrategy.java create mode 100644 hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/strategy/InstantCompactionPlanSelectStrategy.java create mode 100644 hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/strategy/MultiCompactionPlanSelectStrategy.java create mode 100644 hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/strategy/SingleCompactionPlanSelectStrategy.java diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/FlinkCompactionConfig.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/FlinkCompactionConfig.java index 4f3faadb92f01..3ff9ffb9ca6f4 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/FlinkCompactionConfig.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/FlinkCompactionConfig.java @@ -22,6 +22,7 @@ import com.beust.jcommander.Parameter; import org.apache.flink.configuration.Configuration; +import org.apache.hudi.sink.compact.strategy.SingleCompactionPlanSelectStrategy; /** * Configurations for Hoodie Flink compaction. @@ -109,6 +110,21 @@ public class FlinkCompactionConfig extends Configuration { description = "Min compaction interval of async compaction service, default 10 minutes") public Integer minCompactionIntervalSeconds = 600; + @Parameter(names = {"--select-strategy"}, description = "The strategy define how to select compaction plan to compact.\n" + + "1). SingleCompactionPlanSelectStrategy: Select first or last compaction plan." + + "2). MultiCompactionPlanSelectStrategy: Select first or last n compaction plan (n is defined by compactionPlanMaxSelect)." + + "3). AllPendingCompactionPlanSelectStrategy: Select all pending compaction plan" + + "4). InstantCompactionPlanSelectStrategy: Select the compaction plan that instant is specified by compactionPlanInstant") + public String compactionPlanSelectStrategy = SingleCompactionPlanSelectStrategy.class.getName(); + + @Parameter(names = {"--select-max-number"}, description = "Max number of compaction plan would be selected in compaction." + + "It's only effective for MultiCompactionPlanSelectStrategy.") + public Integer compactionPlanMaxSelect = 10; + + @Parameter(names = {"--select-instant"}, description = "Specify the compaction plan instant to compact." + + "It's only effective for InstantCompactionPlanSelectStrategy.") + public String compactionPlanInstant; + /** * Transforms a {@code HoodieFlinkCompaction.config} into {@code Configuration}. * The latter is more suitable for the table APIs. It reads all the properties diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java index 546136e416b7f..7a0cd9cf63761 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java @@ -18,6 +18,11 @@ package org.apache.hudi.sink.compact; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.functions.ProcessFunction; +import org.apache.flink.util.Collector; +import org.apache.flink.util.OutputTag; import org.apache.hudi.async.HoodieAsyncTableService; import org.apache.hudi.avro.model.HoodieCompactionPlan; import org.apache.hudi.client.HoodieFlinkWriteClient; @@ -26,9 +31,11 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.CompactionUtils; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.ReflectionUtils; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.sink.compact.strategy.CompactionPlanSelectStrategy; import org.apache.hudi.table.HoodieFlinkTable; import org.apache.hudi.util.CompactionUtil; import org.apache.hudi.util.StreamerUtil; @@ -42,9 +49,12 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.stream.Collectors; /** * Flink hudi compaction program that can be executed manually. @@ -218,74 +228,112 @@ private void compact() throws Exception { } // fetch the instant based on the configured execution sequence - HoodieTimeline timeline = table.getActiveTimeline().filterPendingCompactionTimeline(); - Option requested = CompactionUtil.isLIFO(cfg.compactionSeq) ? timeline.lastInstant() : timeline.firstInstant(); - if (!requested.isPresent()) { + HoodieTimeline timeline = table.getActiveTimeline(); + List requested = ((CompactionPlanSelectStrategy) ReflectionUtils.loadClass(cfg.compactionPlanSelectStrategy)) + .select(timeline.filterPendingCompactionTimeline(), cfg); + if (requested.isEmpty()) { // do nothing. LOG.info("No compaction plan scheduled, turns on the compaction plan schedule with --schedule option"); return; } - String compactionInstantTime = requested.get().getTimestamp(); - - HoodieInstant inflightInstant = HoodieTimeline.getCompactionInflightInstant(compactionInstantTime); - if (timeline.containsInstant(inflightInstant)) { - LOG.info("Rollback inflight compaction instant: [" + compactionInstantTime + "]"); - table.rollbackInflightCompaction(inflightInstant); - table.getMetaClient().reloadActiveTimeline(); - } + List compactionInstantTimes = requested.stream().map(HoodieInstant::getTimestamp).collect(Collectors.toList()); + compactionInstantTimes.forEach(timestamp -> { + HoodieInstant inflightInstant = HoodieTimeline.getCompactionInflightInstant(timestamp); + if (timeline.containsInstant(inflightInstant)) { + LOG.info("Rollback inflight compaction instant: [" + timestamp + "]"); + table.rollbackInflightCompaction(inflightInstant); + table.getMetaClient().reloadActiveTimeline(); + } + }); - // generate compaction plan + // generate timestamp and compaction plan pair // should support configurable commit metadata - HoodieCompactionPlan compactionPlan = CompactionUtils.getCompactionPlan( - table.getMetaClient(), compactionInstantTime); - - if (compactionPlan == null || (compactionPlan.getOperations() == null) - || (compactionPlan.getOperations().isEmpty())) { + List> compactionPlans = compactionInstantTimes.stream() + .map(timestamp -> { + try { + return Pair.of(timestamp, CompactionUtils.getCompactionPlan(table.getMetaClient(), timestamp)); + } catch (IOException e) { + throw new HoodieException(e); + } + }) + // reject empty compaction plan + .filter(pair -> !(pair.getRight() == null + || pair.getRight().getOperations() == null + || pair.getRight().getOperations().isEmpty())) + .collect(Collectors.toList()); + + if (compactionPlans.isEmpty()) { // No compaction plan, do nothing and return. - LOG.info("No compaction plan for instant " + compactionInstantTime); + LOG.info("No compaction plan for instant " + String.join(",", compactionInstantTimes)); return; } - HoodieInstant instant = HoodieTimeline.getCompactionRequestedInstant(compactionInstantTime); + List instants = compactionInstantTimes.stream().map(HoodieTimeline::getCompactionRequestedInstant).collect(Collectors.toList()); HoodieTimeline pendingCompactionTimeline = table.getActiveTimeline().filterPendingCompactionTimeline(); - if (!pendingCompactionTimeline.containsInstant(instant)) { - // this means that the compaction plan was written to auxiliary path(.tmp) - // but not the meta path(.hoodie), this usually happens when the job crush - // exceptionally. - - // clean the compaction plan in auxiliary path and cancels the compaction. - - LOG.warn("The compaction plan was fetched through the auxiliary path(.tmp) but not the meta path(.hoodie).\n" - + "Clean the compaction plan in auxiliary path and cancels the compaction"); - CompactionUtil.cleanInstant(table.getMetaClient(), instant); - return; + for (HoodieInstant instant : instants) { + if (!pendingCompactionTimeline.containsInstant(instant)) { + // this means that the compaction plan was written to auxiliary path(.tmp) + // but not the meta path(.hoodie), this usually happens when the job crush + // exceptionally. + // clean the compaction plan in auxiliary path and cancels the compaction. + LOG.warn("The compaction plan was fetched through the auxiliary path(.tmp) but not the meta path(.hoodie).\n" + + "Clean the compaction plan in auxiliary path and cancels the compaction"); + CompactionUtil.cleanInstant(table.getMetaClient(), instant); + return; + } } // get compactionParallelism. int compactionParallelism = conf.getInteger(FlinkOptions.COMPACTION_TASKS) == -1 - ? compactionPlan.getOperations().size() : conf.getInteger(FlinkOptions.COMPACTION_TASKS); + ? Math.toIntExact(compactionPlans.stream().mapToLong(pair -> pair.getRight().getOperations().size()).sum()) + : conf.getInteger(FlinkOptions.COMPACTION_TASKS); - LOG.info("Start to compaction for instant " + compactionInstantTime); + LOG.info("Start to compaction for instant " + compactionInstantTimes); // Mark instant as compaction inflight - table.getActiveTimeline().transitionCompactionRequestedToInflight(instant); + for (HoodieInstant instant : instants) { + table.getActiveTimeline().transitionCompactionRequestedToInflight(instant); + } table.getMetaClient().reloadActiveTimeline(); - env.addSource(new CompactionPlanSourceFunction(compactionPlan, compactionInstantTime)) - .name("compaction_source") - .uid("uid_compaction_source") - .rebalance() + // use side-output to make operations that is in the same plan to be placed in the same stream + // keyby() cannot sure that different operations are in the different stream + Pair firstPlan = compactionPlans.get(0); + DataStream source = env.addSource(new CompactionPlanSourceFunction(firstPlan.getRight(), firstPlan.getLeft())) + .name("compaction_source " + firstPlan.getLeft()) + .uid("uid_compaction_source " + firstPlan.getLeft()); + if (compactionPlans.size() > 1) { + for (Pair pair : compactionPlans.subList(1, compactionPlans.size())) { + source = source.union(env.addSource(new CompactionPlanSourceFunction(pair.getRight(), pair.getLeft())) + .name("compaction_source " + pair.getLeft()) + .uid("uid_compaction_source " + pair.getLeft())); + } + } + + SingleOutputStreamOperator operator = source.rebalance() .transform("compact_task", TypeInformation.of(CompactionCommitEvent.class), new ProcessOperator<>(new CompactFunction(conf))) .setParallelism(compactionParallelism) - .addSink(new CompactionCommitSink(conf)) - .name("clean_commits") - .uid("uid_clean_commits") + .process(new ProcessFunction() { + @Override + public void processElement(CompactionCommitEvent event, ProcessFunction.Context context, Collector out) { + context.output(new OutputTag<>(event.getInstant(), TypeInformation.of(CompactionCommitEvent.class)), event); + } + }) + .name("group_by_compaction_plan") + .uid("uid_group_by_compaction_plan") .setParallelism(1); - env.execute("flink_hudi_compaction_" + compactionInstantTime); + compactionPlans.forEach(pair -> + operator.getSideOutput(new OutputTag<>(pair.getLeft(), TypeInformation.of(CompactionCommitEvent.class))) + .addSink(new CompactionCommitSink(conf)) + .name("clean_commits") + .uid("uid_clean_commits") + .setParallelism(1)); + + env.execute("flink_hudi_compaction_" + String.join(",", compactionInstantTimes)); } /** diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/strategy/AllPendingCompactionPlanSelectStrategy.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/strategy/AllPendingCompactionPlanSelectStrategy.java new file mode 100644 index 0000000000000..23b6708ff304c --- /dev/null +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/strategy/AllPendingCompactionPlanSelectStrategy.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.sink.compact.strategy; + +import java.util.List; +import java.util.stream.Collectors; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.sink.compact.FlinkCompactionConfig; + +/** + * Select all pending compaction plan to compact + */ +public class AllPendingCompactionPlanSelectStrategy implements CompactionPlanSelectStrategy { + @Override + public List select(HoodieTimeline pendingCompactionTimeline, FlinkCompactionConfig config) { + return pendingCompactionTimeline.getInstants().collect(Collectors.toList()); + } +} diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/strategy/CompactionPlanSelectStrategy.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/strategy/CompactionPlanSelectStrategy.java new file mode 100644 index 0000000000000..a41fcef198139 --- /dev/null +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/strategy/CompactionPlanSelectStrategy.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.sink.compact.strategy; + +import java.util.List; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.sink.compact.FlinkCompactionConfig; + +/** + * CompactionRangeStrategy + */ +public interface CompactionPlanSelectStrategy { + /** + * Define how to select compaction plan to compact + */ + List select(HoodieTimeline pendingCompactionTimeline, FlinkCompactionConfig config); +} diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/strategy/InstantCompactionPlanSelectStrategy.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/strategy/InstantCompactionPlanSelectStrategy.java new file mode 100644 index 0000000000000..2cf73b421a08f --- /dev/null +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/strategy/InstantCompactionPlanSelectStrategy.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.sink.compact.strategy; + +import java.util.Collections; +import java.util.List; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.sink.compact.FlinkCompactionConfig; + +/** + * Specify the compaction plan instant to compact + */ +public class InstantCompactionPlanSelectStrategy implements CompactionPlanSelectStrategy { + @Override + public List select(HoodieTimeline pendingCompactionTimeline, FlinkCompactionConfig config) { + HoodieInstant specifiedInstant = pendingCompactionTimeline.getInstants() + .filter(instant -> config.compactionPlanInstant.equals(instant.getTimestamp())) + .findFirst() + .orElseThrow(() -> new HoodieException("The instant " + config.compactionPlanInstant + " is not found in timeline")); + return Collections.singletonList(specifiedInstant); + } +} diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/strategy/MultiCompactionPlanSelectStrategy.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/strategy/MultiCompactionPlanSelectStrategy.java new file mode 100644 index 0000000000000..37ba1ac9376fc --- /dev/null +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/strategy/MultiCompactionPlanSelectStrategy.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.sink.compact.strategy; + +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.sink.compact.FlinkCompactionConfig; +import org.apache.hudi.util.CompactionUtil; + +/** + * Select multi compaction plan to compact + */ +public class MultiCompactionPlanSelectStrategy implements CompactionPlanSelectStrategy { + @Override + public List select(HoodieTimeline pendingCompactionTimeline, FlinkCompactionConfig config) { + List pendingCompactionPlanInstants = pendingCompactionTimeline.getInstants().collect(Collectors.toList()); + if (!CompactionUtil.isLIFO(config.compactionSeq)) { + Collections.reverse(pendingCompactionPlanInstants); + } + int range = Math.min(config.compactionPlanMaxSelect, pendingCompactionPlanInstants.size()); + return pendingCompactionPlanInstants.subList(0, range); + } +} diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/strategy/SingleCompactionPlanSelectStrategy.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/strategy/SingleCompactionPlanSelectStrategy.java new file mode 100644 index 0000000000000..d03ef4ef27d14 --- /dev/null +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/strategy/SingleCompactionPlanSelectStrategy.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.sink.compact.strategy; + +import java.util.Collections; +import java.util.List; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.sink.compact.FlinkCompactionConfig; +import org.apache.hudi.util.CompactionUtil; + +/** + * Select one compaction plan to compact + */ +public class SingleCompactionPlanSelectStrategy implements CompactionPlanSelectStrategy { + @Override + public List select(HoodieTimeline pendingCompactionTimeline, FlinkCompactionConfig config) { + Option compactionPlanInstant = CompactionUtil.isLIFO(config.compactionSeq) + ? pendingCompactionTimeline.firstInstant() + : pendingCompactionTimeline.lastInstant(); + if (compactionPlanInstant.isPresent()) { + return Collections.singletonList(compactionPlanInstant.get()); + } + return Collections.emptyList(); + } +} From 538fdfbd216be49ea3652af8460333622a68d0ff Mon Sep 17 00:00:00 2001 From: lanyuanxiaoyao Date: Thu, 26 May 2022 09:48:44 +0800 Subject: [PATCH 2/6] [HUDI-4152] Fix exception for duplicated uid when multi compaction plan are compacted --- .../org/apache/hudi/sink/compact/HoodieFlinkCompactor.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java index 7a0cd9cf63761..17e83fb55c129 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java @@ -307,7 +307,7 @@ private void compact() throws Exception { for (Pair pair : compactionPlans.subList(1, compactionPlans.size())) { source = source.union(env.addSource(new CompactionPlanSourceFunction(pair.getRight(), pair.getLeft())) .name("compaction_source " + pair.getLeft()) - .uid("uid_compaction_source " + pair.getLeft())); + .uid("uid_compaction_source_" + pair.getLeft())); } } @@ -329,8 +329,8 @@ public void processElement(CompactionCommitEvent event, ProcessFunction operator.getSideOutput(new OutputTag<>(pair.getLeft(), TypeInformation.of(CompactionCommitEvent.class))) .addSink(new CompactionCommitSink(conf)) - .name("clean_commits") - .uid("uid_clean_commits") + .name("clean_commits " + pair.getLeft()) + .uid("uid_clean_commits_" + pair.getLeft()) .setParallelism(1)); env.execute("flink_hudi_compaction_" + String.join(",", compactionInstantTimes)); From 6df6164388f94b22835df7a2b31cdd8af4acc07e Mon Sep 17 00:00:00 2001 From: lanyuanxiaoyao Date: Wed, 15 Jun 2022 13:58:55 +0800 Subject: [PATCH 3/6] [HUDI-4152] Provider UT & IT for compact multi compaction plan --- .../compact/ITTestHoodieFlinkCompactor.java | 125 +++++++++++++++ .../TestCompactionPlanSelectStrategy.java | 145 ++++++++++++++++++ 2 files changed, 270 insertions(+) create mode 100644 hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/TestCompactionPlanSelectStrategy.java diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java index e1e86ce32bd82..53f086549bac5 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java @@ -18,6 +18,11 @@ package org.apache.hudi.sink.compact; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.functions.ProcessFunction; +import org.apache.flink.util.Collector; +import org.apache.flink.util.OutputTag; import org.apache.hudi.avro.model.HoodieCompactionPlan; import org.apache.hudi.client.HoodieFlinkWriteClient; import org.apache.hudi.common.table.HoodieTableMetaClient; @@ -25,6 +30,7 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.CompactionUtils; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.table.HoodieFlinkTable; import org.apache.hudi.util.CompactionUtil; @@ -49,6 +55,7 @@ import java.io.File; import java.util.Arrays; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -67,6 +74,8 @@ public class ITTestHoodieFlinkCompactor { private static final Map> EXPECTED2 = new HashMap<>(); + private static final Map> EXPECTED3 = new HashMap<>(); + static { EXPECTED1.put("par1", Arrays.asList("id1,par1,id1,Danny,23,1000,par1", "id2,par1,id2,Stephen,33,2000,par1")); EXPECTED1.put("par2", Arrays.asList("id3,par2,id3,Julian,53,3000,par2", "id4,par2,id4,Fabian,31,4000,par2")); @@ -77,6 +86,12 @@ public class ITTestHoodieFlinkCompactor { EXPECTED2.put("par2", Arrays.asList("id3,par2,id3,Julian,54,3000,par2", "id4,par2,id4,Fabian,32,4000,par2")); EXPECTED2.put("par3", Arrays.asList("id5,par3,id5,Sophia,18,5000,par3", "id6,par3,id6,Emma,20,6000,par3", "id9,par3,id9,Jane,19,6000,par3")); EXPECTED2.put("par4", Arrays.asList("id7,par4,id7,Bob,44,7000,par4", "id8,par4,id8,Han,56,8000,par4", "id10,par4,id10,Ella,38,7000,par4", "id11,par4,id11,Phoebe,52,8000,par4")); + + EXPECTED3.put("par1", Arrays.asList("id1,par1,id1,Danny,23,1000,par1", "id2,par1,id2,Stephen,33,2000,par1")); + EXPECTED3.put("par2", Arrays.asList("id3,par2,id3,Julian,53,3000,par2", "id4,par2,id4,Fabian,31,4000,par2")); + EXPECTED3.put("par3", Arrays.asList("id5,par3,id5,Sophia,18,5000,par3", "id6,par3,id6,Emma,20,6000,par3")); + EXPECTED3.put("par4", Arrays.asList("id7,par4,id7,Bob,44,7000,par4", "id8,par4,id8,Han,56,8000,par4")); + EXPECTED3.put("par5", Arrays.asList("id12,par5,id12,Tony,27,9000,par5", "id13,par5,id13,Jenny,72,10000,par5")); } @TempDir @@ -203,4 +218,114 @@ public void testHoodieFlinkCompactorService(boolean enableChangelog) throws Exce TestData.checkWrittenFullData(tempFile, EXPECTED2); } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testHoodieFlinkCompactorWithPlanSelectStrategy(boolean enableChangelog) throws Exception { + // Create hoodie table and insert into data. + EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build(); + TableEnvironment tableEnv = TableEnvironmentImpl.create(settings); + tableEnv.getConfig().getConfiguration() + .setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1); + Map options = new HashMap<>(); + options.put(FlinkOptions.COMPACTION_ASYNC_ENABLED.key(), "false"); + options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath()); + options.put(FlinkOptions.TABLE_TYPE.key(), "MERGE_ON_READ"); + options.put(FlinkOptions.CHANGELOG_ENABLED.key(), enableChangelog + ""); + String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", options); + tableEnv.executeSql(hoodieTableDDL); + tableEnv.executeSql(TestSQL.INSERT_T1).await(); + + TimeUnit.SECONDS.sleep(3); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + FlinkCompactionConfig cfg = new FlinkCompactionConfig(); + cfg.path = tempFile.getAbsolutePath(); + Configuration conf = FlinkCompactionConfig.toFlinkConfig(cfg); + conf.setString(FlinkOptions.TABLE_TYPE.key(), "MERGE_ON_READ"); + + HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(conf); + conf.setString(FlinkOptions.TABLE_NAME, metaClient.getTableConfig().getTableName()); + CompactionUtil.setAvroSchema(conf, metaClient); + CompactionUtil.inferChangelogMode(conf, metaClient); + + List compactionInstantTimeList = new ArrayList<>(2); + + HoodieFlinkWriteClient writeClient = StreamerUtil.createWriteClient(conf); + + compactionInstantTimeList.add(scheduleCompactionPlan(metaClient, writeClient)); + + // insert a new record to new partition, so that we can generate a new compaction plan + String insertT1ForNewPartition = "insert into t1 values\n" + + "('id12','Tony',27,TIMESTAMP '1970-01-01 00:00:09','par5'),\n" + + "('id13','Jenny',72,TIMESTAMP '1970-01-01 00:00:10','par5')"; + tableEnv.executeSql(insertT1ForNewPartition).await(); + + TimeUnit.SECONDS.sleep(3); + + compactionInstantTimeList.add(scheduleCompactionPlan(metaClient, writeClient)); + + HoodieFlinkTable table = writeClient.getHoodieTable(); + + List> compactionPlans = new ArrayList<>(2); + for (String compactionInstantTime : compactionInstantTimeList) { + HoodieCompactionPlan plan = CompactionUtils.getCompactionPlan(table.getMetaClient(), compactionInstantTime); + compactionPlans.add(Pair.of(compactionInstantTime, plan)); + } + + // Mark instant as compaction inflight + for (String compactionInstantTime : compactionInstantTimeList) { + HoodieInstant hoodieInstant = HoodieTimeline.getCompactionRequestedInstant(compactionInstantTime); + table.getActiveTimeline().transitionCompactionRequestedToInflight(hoodieInstant); + } + table.getMetaClient().reloadActiveTimeline(); + + Pair firstPlan = compactionPlans.get(0); + DataStream source = env.addSource(new CompactionPlanSourceFunction(firstPlan.getRight(), firstPlan.getLeft())) + .name("compaction_source " + firstPlan.getLeft()) + .uid("uid_compaction_source " + firstPlan.getLeft()); + if (compactionPlans.size() > 1) { + for (Pair pair : compactionPlans.subList(1, compactionPlans.size())) { + source = source.union(env.addSource(new CompactionPlanSourceFunction(pair.getRight(), pair.getLeft())) + .name("compaction_source " + pair.getLeft()) + .uid("uid_compaction_source " + pair.getLeft())); + } + } + SingleOutputStreamOperator operator = source.rebalance() + .transform("compact_task", + TypeInformation.of(CompactionCommitEvent.class), + new ProcessOperator<>(new CompactFunction(conf))) + .setParallelism(1) + .process(new ProcessFunction() { + @Override + public void processElement(CompactionCommitEvent event, ProcessFunction.Context context, Collector out) { + context.output(new OutputTag<>(event.getInstant(), TypeInformation.of(CompactionCommitEvent.class)), event); + } + }) + .name("group_by_compaction_plan") + .uid("uid_group_by_compaction_plan") + .setParallelism(1); + compactionPlans.forEach(pair -> + operator.getSideOutput(new OutputTag<>(pair.getLeft(), TypeInformation.of(CompactionCommitEvent.class))) + .addSink(new CompactionCommitSink(conf)) + .name("clean_commits " + pair.getLeft()) + .uid("uid_clean_commits_" + pair.getLeft()) + .setParallelism(1)); + + env.execute("flink_hudi_compaction"); + writeClient.close(); + TestData.checkWrittenFullData(tempFile, EXPECTED3); + } + + private String scheduleCompactionPlan(HoodieTableMetaClient metaClient, HoodieFlinkWriteClient writeClient) { + boolean scheduled = false; + // judge whether have operation + // To compute the compaction instant time and do compaction. + Option compactionInstantTimeOption = CompactionUtil.getCompactionInstantTime(metaClient); + if (compactionInstantTimeOption.isPresent()) { + scheduled = writeClient.scheduleCompactionAtInstant(compactionInstantTimeOption.get(), Option.empty()); + } + assertTrue(scheduled, "The compaction plan should be scheduled"); + return compactionInstantTimeOption.get(); + } } diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/TestCompactionPlanSelectStrategy.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/TestCompactionPlanSelectStrategy.java new file mode 100644 index 0000000000000..7403874764544 --- /dev/null +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/TestCompactionPlanSelectStrategy.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.sink.compact; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import java.util.Arrays; +import java.util.List; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.sink.compact.strategy.AllPendingCompactionPlanSelectStrategy; +import org.apache.hudi.sink.compact.strategy.CompactionPlanSelectStrategy; +import org.apache.hudi.sink.compact.strategy.InstantCompactionPlanSelectStrategy; +import org.apache.hudi.sink.compact.strategy.MultiCompactionPlanSelectStrategy; +import org.apache.hudi.sink.compact.strategy.SingleCompactionPlanSelectStrategy; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +/** + * Test case for every {@link CompactionPlanSelectStrategy} implements + */ +public class TestCompactionPlanSelectStrategy { + private HoodieTimeline timeline; + private HoodieTimeline emptyTimeline; + private HoodieTimeline allCompleteTimeline; + + private static final HoodieInstant INSTANT_001 = new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "001"); + private static final HoodieInstant INSTANT_002 = new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, "002"); + private static final HoodieInstant INSTANT_003 = new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, "003"); + private static final HoodieInstant INSTANT_004 = new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, "004"); + private static final HoodieInstant INSTANT_005 = new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.COMPACTION_ACTION, "005"); + private static final HoodieInstant INSTANT_006 = new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, "006"); + + @BeforeEach + public void beforeEach() { + timeline = new MockHoodieActiveTimeline(INSTANT_001, INSTANT_002, INSTANT_003, INSTANT_004, INSTANT_005, INSTANT_006); + emptyTimeline = new MockHoodieActiveTimeline(); + allCompleteTimeline = new MockHoodieActiveTimeline(INSTANT_001, INSTANT_005); + } + + @Test + void testSingleCompactionPlanSelectStrategy() { + HoodieTimeline pendingCompactionTimeline = this.timeline.filterPendingCompactionTimeline(); + FlinkCompactionConfig compactionConfig = new FlinkCompactionConfig(); + + SingleCompactionPlanSelectStrategy strategy = new SingleCompactionPlanSelectStrategy(); + assertHoodieInstantsEquals(new HoodieInstant[]{INSTANT_006}, strategy.select(pendingCompactionTimeline, compactionConfig)); + + compactionConfig.compactionSeq = FlinkCompactionConfig.SEQ_FIFO; + assertHoodieInstantsEquals(new HoodieInstant[]{INSTANT_002}, strategy.select(pendingCompactionTimeline, compactionConfig)); + + HoodieTimeline emptyPendingCompactionTimeline = emptyTimeline.filterPendingCompactionTimeline(); + assertHoodieInstantsEquals(new HoodieInstant[]{}, strategy.select(emptyPendingCompactionTimeline, compactionConfig)); + + HoodieTimeline allCompleteCompactionTimeline = allCompleteTimeline.filterPendingCompactionTimeline(); + assertHoodieInstantsEquals(new HoodieInstant[]{}, strategy.select(allCompleteCompactionTimeline, compactionConfig)); + } + + @Test + void testMultiCompactionPlanSelectStrategy() { + HoodieTimeline pendingCompactionTimeline = this.timeline.filterPendingCompactionTimeline(); + FlinkCompactionConfig compactionConfig = new FlinkCompactionConfig(); + compactionConfig.compactionPlanMaxSelect = 2; + + MultiCompactionPlanSelectStrategy strategy = new MultiCompactionPlanSelectStrategy(); + assertHoodieInstantsEquals(new HoodieInstant[]{INSTANT_006, INSTANT_004}, strategy.select(pendingCompactionTimeline, compactionConfig)); + + compactionConfig.compactionSeq = FlinkCompactionConfig.SEQ_FIFO; + assertHoodieInstantsEquals(new HoodieInstant[]{INSTANT_002, INSTANT_003}, strategy.select(pendingCompactionTimeline, compactionConfig)); + + HoodieTimeline emptyPendingCompactionTimeline = emptyTimeline.filterPendingCompactionTimeline(); + assertHoodieInstantsEquals(new HoodieInstant[]{}, strategy.select(emptyPendingCompactionTimeline, compactionConfig)); + + HoodieTimeline allCompleteCompactionTimeline = allCompleteTimeline.filterPendingCompactionTimeline(); + assertHoodieInstantsEquals(new HoodieInstant[]{}, strategy.select(allCompleteCompactionTimeline, compactionConfig)); + } + + @Test + void testAllPendingCompactionPlanSelectStrategy() { + HoodieTimeline pendingCompactionTimeline = this.timeline.filterPendingCompactionTimeline(); + FlinkCompactionConfig compactionConfig = new FlinkCompactionConfig(); + + AllPendingCompactionPlanSelectStrategy strategy = new AllPendingCompactionPlanSelectStrategy(); + assertHoodieInstantsEquals(new HoodieInstant[]{INSTANT_002, INSTANT_003, INSTANT_004, INSTANT_006}, + strategy.select(pendingCompactionTimeline, compactionConfig)); + + HoodieTimeline emptyPendingCompactionTimeline = emptyTimeline.filterPendingCompactionTimeline(); + assertHoodieInstantsEquals(new HoodieInstant[]{}, strategy.select(emptyPendingCompactionTimeline, compactionConfig)); + + HoodieTimeline allCompleteCompactionTimeline = allCompleteTimeline.filterPendingCompactionTimeline(); + assertHoodieInstantsEquals(new HoodieInstant[]{}, strategy.select(allCompleteCompactionTimeline, compactionConfig)); + } + + @Test + void testInstantCompactionPlanSelectStrategy() { + HoodieTimeline pendingCompactionTimeline = this.timeline.filterPendingCompactionTimeline(); + FlinkCompactionConfig compactionConfig = new FlinkCompactionConfig(); + compactionConfig.compactionPlanInstant = "004"; + + InstantCompactionPlanSelectStrategy strategy = new InstantCompactionPlanSelectStrategy(); + assertHoodieInstantsEquals(new HoodieInstant[]{INSTANT_004}, strategy.select(pendingCompactionTimeline, compactionConfig)); + + compactionConfig.compactionPlanInstant = "005"; + assertThrows(HoodieException.class, () -> strategy.select(pendingCompactionTimeline, compactionConfig)); + } + + private void assertHoodieInstantsEquals(HoodieInstant[] expected, List actual) { + assertEquals(expected.length, actual.size()); + for (int index = 0; index < expected.length; index++) { + assertHoodieInstantEquals(expected[index], actual.get(index)); + } + } + + private void assertHoodieInstantEquals(HoodieInstant expected, HoodieInstant actual) { + assertEquals(expected.getState(), actual.getState()); + assertEquals(expected.getAction(), actual.getAction()); + assertEquals(expected.getTimestamp(), actual.getTimestamp()); + } + + private static final class MockHoodieActiveTimeline extends HoodieActiveTimeline { + public MockHoodieActiveTimeline(HoodieInstant... instants) { + super(); + setInstants(Arrays.asList(instants)); + } + } +} From 87e2821ae0098d7ceb2aef5b1ec5360d625474d7 Mon Sep 17 00:00:00 2001 From: lanyuanxiaoyao Date: Wed, 6 Jul 2022 18:37:22 +0800 Subject: [PATCH 4/6] [HUDI-4152] Put multi compaction plans into one compaction plan source --- .../sink/compact/HoodieFlinkCompactor.java | 14 +-- .../MultiCompactionPlanSourceFunction.java | 90 +++++++++++++++++++ .../compact/ITTestHoodieFlinkCompactor.java | 14 +-- 3 files changed, 96 insertions(+), 22 deletions(-) create mode 100644 hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/MultiCompactionPlanSourceFunction.java diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java index 17e83fb55c129..f56b5a2f0fb1d 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java @@ -299,17 +299,9 @@ private void compact() throws Exception { // use side-output to make operations that is in the same plan to be placed in the same stream // keyby() cannot sure that different operations are in the different stream - Pair firstPlan = compactionPlans.get(0); - DataStream source = env.addSource(new CompactionPlanSourceFunction(firstPlan.getRight(), firstPlan.getLeft())) - .name("compaction_source " + firstPlan.getLeft()) - .uid("uid_compaction_source " + firstPlan.getLeft()); - if (compactionPlans.size() > 1) { - for (Pair pair : compactionPlans.subList(1, compactionPlans.size())) { - source = source.union(env.addSource(new CompactionPlanSourceFunction(pair.getRight(), pair.getLeft())) - .name("compaction_source " + pair.getLeft()) - .uid("uid_compaction_source_" + pair.getLeft())); - } - } + DataStream source = env.addSource(new MultiCompactionPlanSourceFunction(compactionPlans)) + .name("compaction_source") + .uid("uid_compaction_source"); SingleOutputStreamOperator operator = source.rebalance() .transform("compact_task", diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/MultiCompactionPlanSourceFunction.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/MultiCompactionPlanSourceFunction.java new file mode 100644 index 0000000000000..8a8c3f6b4eeb3 --- /dev/null +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/MultiCompactionPlanSourceFunction.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.sink.compact; + +import static java.util.stream.Collectors.toList; + +import java.util.List; +import org.apache.flink.api.common.functions.AbstractRichFunction; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.hudi.avro.model.HoodieCompactionPlan; +import org.apache.hudi.common.model.CompactionOperation; +import org.apache.hudi.common.util.collection.Pair; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Flink hudi compaction source function. + * + *

This function read the compaction plan as {@link CompactionOperation}s then assign the compaction task + * event {@link CompactionPlanEvent} to downstream operators. + * + *

The compaction instant time is specified explicitly with strategies: + * + *

    + *
  • If the timeline has no inflight instants, + * use {@link org.apache.hudi.common.table.timeline.HoodieActiveTimeline#createNewInstantTime()} + * as the instant time;
  • + *
  • If the timeline has inflight instants, + * use the median instant time between [last complete instant time, earliest inflight instant time] + * as the instant time.
  • + *
+ */ +public class MultiCompactionPlanSourceFunction extends AbstractRichFunction implements SourceFunction { + + protected static final Logger LOG = LoggerFactory.getLogger(MultiCompactionPlanSourceFunction.class); + + /** + * compaction plan instant -> compaction plan + */ + private final List> compactionPlans; + + public MultiCompactionPlanSourceFunction(List> compactionPlans) { + this.compactionPlans = compactionPlans; + } + + @Override + public void open(Configuration parameters) throws Exception { + // no operation + } + + @Override + public void run(SourceContext sourceContext) throws Exception { + for (Pair pair : compactionPlans) { + HoodieCompactionPlan compactionPlan = pair.getRight(); + List operations = compactionPlan.getOperations().stream() + .map(CompactionOperation::convertFromAvroRecordInstance).collect(toList()); + LOG.info("CompactionPlanFunction compacting " + operations + " files"); + for (CompactionOperation operation : operations) { + sourceContext.collect(new CompactionPlanEvent(pair.getLeft(), operation)); + } + } + } + + @Override + public void close() throws Exception { + // no operation + } + + @Override + public void cancel() { + // no operation + } +} diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java index 53f086549bac5..43e4ed511452d 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java @@ -280,17 +280,9 @@ public void testHoodieFlinkCompactorWithPlanSelectStrategy(boolean enableChangel } table.getMetaClient().reloadActiveTimeline(); - Pair firstPlan = compactionPlans.get(0); - DataStream source = env.addSource(new CompactionPlanSourceFunction(firstPlan.getRight(), firstPlan.getLeft())) - .name("compaction_source " + firstPlan.getLeft()) - .uid("uid_compaction_source " + firstPlan.getLeft()); - if (compactionPlans.size() > 1) { - for (Pair pair : compactionPlans.subList(1, compactionPlans.size())) { - source = source.union(env.addSource(new CompactionPlanSourceFunction(pair.getRight(), pair.getLeft())) - .name("compaction_source " + pair.getLeft()) - .uid("uid_compaction_source " + pair.getLeft())); - } - } + DataStream source = env.addSource(new MultiCompactionPlanSourceFunction(compactionPlans)) + .name("compaction_source") + .uid("uid_compaction_source"); SingleOutputStreamOperator operator = source.rebalance() .transform("compact_task", TypeInformation.of(CompactionCommitEvent.class), From 8d0647a11d7294eeec4d04e241f3bee0fc8fef3d Mon Sep 17 00:00:00 2001 From: lanyuanxiaoyao Date: Wed, 6 Jul 2022 18:51:23 +0800 Subject: [PATCH 5/6] [HUDI-4152] InstantCompactionPlanSelectStrategy allow multi instant by using comma --- .../hudi/sink/compact/FlinkCompactionConfig.java | 3 ++- .../InstantCompactionPlanSelectStrategy.java | 15 ++++++++++++++- 2 files changed, 16 insertions(+), 2 deletions(-) diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/FlinkCompactionConfig.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/FlinkCompactionConfig.java index be1c3211b03e7..02041690f1dec 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/FlinkCompactionConfig.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/FlinkCompactionConfig.java @@ -122,7 +122,8 @@ public class FlinkCompactionConfig extends Configuration { + "It's only effective for MultiCompactionPlanSelectStrategy.") public Integer compactionPlanMaxSelect = 10; - @Parameter(names = {"--select-instant"}, description = "Specify the compaction plan instant to compact." + @Parameter(names = {"--select-instant"}, description = "Specify the compaction plan instant to compact and you can specify more than" + + "one instant in a time by using comma." + "It's only effective for InstantCompactionPlanSelectStrategy.") public String compactionPlanInstant; @Parameter(names = {"--spillable_map_path"}, description = "Default file path prefix for spillable map.", required = false) diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/strategy/InstantCompactionPlanSelectStrategy.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/strategy/InstantCompactionPlanSelectStrategy.java index 2cf73b421a08f..c680caddf1ccf 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/strategy/InstantCompactionPlanSelectStrategy.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/strategy/InstantCompactionPlanSelectStrategy.java @@ -18,21 +18,34 @@ package org.apache.hudi.sink.compact.strategy; +import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.stream.Stream; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.sink.compact.FlinkCompactionConfig; +import org.apache.hudi.sink.compact.HoodieFlinkCompactor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Specify the compaction plan instant to compact */ public class InstantCompactionPlanSelectStrategy implements CompactionPlanSelectStrategy { + protected static final Logger LOG = LoggerFactory.getLogger(HoodieFlinkCompactor.class); + @Override public List select(HoodieTimeline pendingCompactionTimeline, FlinkCompactionConfig config) { + if (StringUtils.isNullOrEmpty(config.compactionPlanInstant)) { + LOG.warn("None instant is selected"); + return Collections.emptyList(); + } + Stream instants = Arrays.stream(config.compactionPlanInstant.split(",")); HoodieInstant specifiedInstant = pendingCompactionTimeline.getInstants() - .filter(instant -> config.compactionPlanInstant.equals(instant.getTimestamp())) + .filter(instant -> instants.anyMatch(i -> i.equals(instant.getTimestamp()))) .findFirst() .orElseThrow(() -> new HoodieException("The instant " + config.compactionPlanInstant + " is not found in timeline")); return Collections.singletonList(specifiedInstant); From 6fa4f84fc911fe895335a52911c1c78f14e9abbe Mon Sep 17 00:00:00 2001 From: lanyuanxiaoyao Date: Thu, 7 Jul 2022 00:13:07 +0800 Subject: [PATCH 6/6] [HUDI-4152] Add IT for InstantCompactionPlanSelectStrategy --- .../InstantCompactionPlanSelectStrategy.java | 13 +++++-------- .../strategy/MultiCompactionPlanSelectStrategy.java | 2 +- .../SingleCompactionPlanSelectStrategy.java | 4 ++-- .../compact/TestCompactionPlanSelectStrategy.java | 10 +++++++--- 4 files changed, 15 insertions(+), 14 deletions(-) diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/strategy/InstantCompactionPlanSelectStrategy.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/strategy/InstantCompactionPlanSelectStrategy.java index c680caddf1ccf..45382b70c4def 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/strategy/InstantCompactionPlanSelectStrategy.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/strategy/InstantCompactionPlanSelectStrategy.java @@ -21,11 +21,10 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; -import java.util.stream.Stream; +import java.util.stream.Collectors; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.StringUtils; -import org.apache.hudi.exception.HoodieException; import org.apache.hudi.sink.compact.FlinkCompactionConfig; import org.apache.hudi.sink.compact.HoodieFlinkCompactor; import org.slf4j.Logger; @@ -43,11 +42,9 @@ public List select(HoodieTimeline pendingCompactionTimeline, Flin LOG.warn("None instant is selected"); return Collections.emptyList(); } - Stream instants = Arrays.stream(config.compactionPlanInstant.split(",")); - HoodieInstant specifiedInstant = pendingCompactionTimeline.getInstants() - .filter(instant -> instants.anyMatch(i -> i.equals(instant.getTimestamp()))) - .findFirst() - .orElseThrow(() -> new HoodieException("The instant " + config.compactionPlanInstant + " is not found in timeline")); - return Collections.singletonList(specifiedInstant); + List instants = Arrays.asList(config.compactionPlanInstant.split(",")); + return pendingCompactionTimeline.getInstants() + .filter(instant -> instants.contains(instant.getTimestamp())) + .collect(Collectors.toList()); } } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/strategy/MultiCompactionPlanSelectStrategy.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/strategy/MultiCompactionPlanSelectStrategy.java index 37ba1ac9376fc..ee0e93653f87d 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/strategy/MultiCompactionPlanSelectStrategy.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/strategy/MultiCompactionPlanSelectStrategy.java @@ -33,7 +33,7 @@ public class MultiCompactionPlanSelectStrategy implements CompactionPlanSelectSt @Override public List select(HoodieTimeline pendingCompactionTimeline, FlinkCompactionConfig config) { List pendingCompactionPlanInstants = pendingCompactionTimeline.getInstants().collect(Collectors.toList()); - if (!CompactionUtil.isLIFO(config.compactionSeq)) { + if (CompactionUtil.isLIFO(config.compactionSeq)) { Collections.reverse(pendingCompactionPlanInstants); } int range = Math.min(config.compactionPlanMaxSelect, pendingCompactionPlanInstants.size()); diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/strategy/SingleCompactionPlanSelectStrategy.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/strategy/SingleCompactionPlanSelectStrategy.java index d03ef4ef27d14..7ca939866ceec 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/strategy/SingleCompactionPlanSelectStrategy.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/strategy/SingleCompactionPlanSelectStrategy.java @@ -33,8 +33,8 @@ public class SingleCompactionPlanSelectStrategy implements CompactionPlanSelectS @Override public List select(HoodieTimeline pendingCompactionTimeline, FlinkCompactionConfig config) { Option compactionPlanInstant = CompactionUtil.isLIFO(config.compactionSeq) - ? pendingCompactionTimeline.firstInstant() - : pendingCompactionTimeline.lastInstant(); + ? pendingCompactionTimeline.lastInstant() + : pendingCompactionTimeline.firstInstant(); if (compactionPlanInstant.isPresent()) { return Collections.singletonList(compactionPlanInstant.get()); } diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/TestCompactionPlanSelectStrategy.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/TestCompactionPlanSelectStrategy.java index 7403874764544..3ac9f6c6663ef 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/TestCompactionPlanSelectStrategy.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/TestCompactionPlanSelectStrategy.java @@ -19,14 +19,12 @@ package org.apache.hudi.sink.compact; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertThrows; import java.util.Arrays; import java.util.List; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; -import org.apache.hudi.exception.HoodieException; import org.apache.hudi.sink.compact.strategy.AllPendingCompactionPlanSelectStrategy; import org.apache.hudi.sink.compact.strategy.CompactionPlanSelectStrategy; import org.apache.hudi.sink.compact.strategy.InstantCompactionPlanSelectStrategy; @@ -119,8 +117,14 @@ void testInstantCompactionPlanSelectStrategy() { InstantCompactionPlanSelectStrategy strategy = new InstantCompactionPlanSelectStrategy(); assertHoodieInstantsEquals(new HoodieInstant[]{INSTANT_004}, strategy.select(pendingCompactionTimeline, compactionConfig)); + compactionConfig.compactionPlanInstant = "002,003"; + assertHoodieInstantsEquals(new HoodieInstant[]{INSTANT_002, INSTANT_003}, strategy.select(pendingCompactionTimeline, compactionConfig)); + + compactionConfig.compactionPlanInstant = "002,005"; + assertHoodieInstantsEquals(new HoodieInstant[]{INSTANT_002}, strategy.select(pendingCompactionTimeline, compactionConfig)); + compactionConfig.compactionPlanInstant = "005"; - assertThrows(HoodieException.class, () -> strategy.select(pendingCompactionTimeline, compactionConfig)); + assertHoodieInstantsEquals(new HoodieInstant[]{}, strategy.select(pendingCompactionTimeline, compactionConfig)); } private void assertHoodieInstantsEquals(HoodieInstant[] expected, List actual) {