Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-4152] Flink offline compaction support compacting multi compaction plan at once #5677

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

import com.beust.jcommander.Parameter;
import org.apache.flink.configuration.Configuration;
import org.apache.hudi.sink.compact.strategy.SingleCompactionPlanSelectStrategy;

/**
* Configurations for Hoodie Flink compaction.
Expand Down Expand Up @@ -110,6 +111,21 @@ public class FlinkCompactionConfig extends Configuration {
description = "Min compaction interval of async compaction service, default 10 minutes")
public Integer minCompactionIntervalSeconds = 600;

@Parameter(names = {"--select-strategy"}, description = "The strategy define how to select compaction plan to compact.\n"
+ "1). SingleCompactionPlanSelectStrategy: Select first or last compaction plan."
+ "2). MultiCompactionPlanSelectStrategy: Select first or last n compaction plan (n is defined by compactionPlanMaxSelect)."
+ "3). AllPendingCompactionPlanSelectStrategy: Select all pending compaction plan"
+ "4). InstantCompactionPlanSelectStrategy: Select the compaction plan that instant is specified by compactionPlanInstant")
public String compactionPlanSelectStrategy = SingleCompactionPlanSelectStrategy.class.getName();

@Parameter(names = {"--select-max-number"}, description = "Max number of compaction plan would be selected in compaction."
+ "It's only effective for MultiCompactionPlanSelectStrategy.")
public Integer compactionPlanMaxSelect = 10;

@Parameter(names = {"--select-instant"}, description = "Specify the compaction plan instant to compact and you can specify more than"
+ "one instant in a time by using comma."
+ "It's only effective for InstantCompactionPlanSelectStrategy.")
public String compactionPlanInstant;
@Parameter(names = {"--spillable_map_path"}, description = "Default file path prefix for spillable map.", required = false)
public String spillableMapPath = HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH.defaultValue();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@

package org.apache.hudi.sink.compact;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import org.apache.hudi.async.HoodieAsyncTableService;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.client.HoodieFlinkWriteClient;
Expand All @@ -26,9 +31,11 @@
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.CompactionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.sink.compact.strategy.CompactionPlanSelectStrategy;
import org.apache.hudi.table.HoodieFlinkTable;
import org.apache.hudi.util.CompactionUtil;
import org.apache.hudi.util.StreamerUtil;
Expand All @@ -42,9 +49,12 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;

/**
* Flink hudi compaction program that can be executed manually.
Expand Down Expand Up @@ -218,74 +228,104 @@ private void compact() throws Exception {
}

// fetch the instant based on the configured execution sequence
HoodieTimeline timeline = table.getActiveTimeline().filterPendingCompactionTimeline();
Option<HoodieInstant> requested = CompactionUtil.isLIFO(cfg.compactionSeq) ? timeline.lastInstant() : timeline.firstInstant();
if (!requested.isPresent()) {
HoodieTimeline timeline = table.getActiveTimeline();
List<HoodieInstant> requested = ((CompactionPlanSelectStrategy) ReflectionUtils.loadClass(cfg.compactionPlanSelectStrategy))
.select(timeline.filterPendingCompactionTimeline(), cfg);
if (requested.isEmpty()) {
// do nothing.
LOG.info("No compaction plan scheduled, turns on the compaction plan schedule with --schedule option");
return;
}

String compactionInstantTime = requested.get().getTimestamp();

HoodieInstant inflightInstant = HoodieTimeline.getCompactionInflightInstant(compactionInstantTime);
if (timeline.containsInstant(inflightInstant)) {
LOG.info("Rollback inflight compaction instant: [" + compactionInstantTime + "]");
table.rollbackInflightCompaction(inflightInstant);
table.getMetaClient().reloadActiveTimeline();
}
List<String> compactionInstantTimes = requested.stream().map(HoodieInstant::getTimestamp).collect(Collectors.toList());
compactionInstantTimes.forEach(timestamp -> {
HoodieInstant inflightInstant = HoodieTimeline.getCompactionInflightInstant(timestamp);
if (timeline.containsInstant(inflightInstant)) {
LOG.info("Rollback inflight compaction instant: [" + timestamp + "]");
table.rollbackInflightCompaction(inflightInstant);
table.getMetaClient().reloadActiveTimeline();
}
});

// generate compaction plan
// generate timestamp and compaction plan pair
// should support configurable commit metadata
HoodieCompactionPlan compactionPlan = CompactionUtils.getCompactionPlan(
table.getMetaClient(), compactionInstantTime);

if (compactionPlan == null || (compactionPlan.getOperations() == null)
|| (compactionPlan.getOperations().isEmpty())) {
List<Pair<String, HoodieCompactionPlan>> compactionPlans = compactionInstantTimes.stream()
.map(timestamp -> {
try {
return Pair.of(timestamp, CompactionUtils.getCompactionPlan(table.getMetaClient(), timestamp));
} catch (IOException e) {
throw new HoodieException(e);
}
})
// reject empty compaction plan
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

empty plan need some special handling, like commit an empty meta. otherwise it will stay there forever.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Normally, the compaction plan would not be empty, unless the process of generating compaction plan meet some error. Maybe i should not filter out empty compaction plan, it could make a misunderstanding.

.filter(pair -> !(pair.getRight() == null
|| pair.getRight().getOperations() == null
|| pair.getRight().getOperations().isEmpty()))
.collect(Collectors.toList());

if (compactionPlans.isEmpty()) {
// No compaction plan, do nothing and return.
LOG.info("No compaction plan for instant " + compactionInstantTime);
LOG.info("No compaction plan for instant " + String.join(",", compactionInstantTimes));
return;
}

HoodieInstant instant = HoodieTimeline.getCompactionRequestedInstant(compactionInstantTime);
List<HoodieInstant> instants = compactionInstantTimes.stream().map(HoodieTimeline::getCompactionRequestedInstant).collect(Collectors.toList());
HoodieTimeline pendingCompactionTimeline = table.getActiveTimeline().filterPendingCompactionTimeline();
if (!pendingCompactionTimeline.containsInstant(instant)) {
// this means that the compaction plan was written to auxiliary path(.tmp)
// but not the meta path(.hoodie), this usually happens when the job crush
// exceptionally.

// clean the compaction plan in auxiliary path and cancels the compaction.

LOG.warn("The compaction plan was fetched through the auxiliary path(.tmp) but not the meta path(.hoodie).\n"
+ "Clean the compaction plan in auxiliary path and cancels the compaction");
CompactionUtil.cleanInstant(table.getMetaClient(), instant);
return;
for (HoodieInstant instant : instants) {
if (!pendingCompactionTimeline.containsInstant(instant)) {
// this means that the compaction plan was written to auxiliary path(.tmp)
// but not the meta path(.hoodie), this usually happens when the job crush
// exceptionally.
// clean the compaction plan in auxiliary path and cancels the compaction.
LOG.warn("The compaction plan was fetched through the auxiliary path(.tmp) but not the meta path(.hoodie).\n"
+ "Clean the compaction plan in auxiliary path and cancels the compaction");
CompactionUtil.cleanInstant(table.getMetaClient(), instant);
return;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why 'return' here? E.g., if we clean one of the instant, it should be ok for others to proceed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that you are right. I don't know much about the operation here, so I kept the original logic. If you can confirm that removing this 'return' has no effect, I can remove it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @lanyuanxiaoyao
I'm reviewing HoodieFlinkCompactor code recently and I got confused here as well.

As the comment above // this means that the compaction plan was written to auxiliary path(.tmp), it said there could be a situation that a compaction plan found in .aux folder (not .tmp, it doesn't exist, should be a typo), but not found in Hudi timeline. However, according to the code, instants is from a list compactionInstantTimes, which is from another list requested. So the source of instants is also Hudi timeline, which is not matched with the comment.

Then, I checked the original PR for implementing this HoodieFlinkCompactor, #3046. It also loaded this instant from Hudi timeline, not .aux folder. So I guess this part of code should be deprecated now. Am I right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And I found a JIRA ticket about deprecating .aux folder when writing and reading compaction plan: https://issues.apache.org/jira/browse/HUDI-546

}
}

// get compactionParallelism.
int compactionParallelism = conf.getInteger(FlinkOptions.COMPACTION_TASKS) == -1
? compactionPlan.getOperations().size() : conf.getInteger(FlinkOptions.COMPACTION_TASKS);
? Math.toIntExact(compactionPlans.stream().mapToLong(pair -> pair.getRight().getOperations().size()).sum())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this the total number of file groups?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just follow the logic of old code to rewrite this. It seem to use more paralleism to finish the compaction when the COMPACTION_TASK is set to -1. But it will be unacceptable when there are more compaction plan. I think i can use the default value 4 of COMPACTION_TASK in here. WDYT

: conf.getInteger(FlinkOptions.COMPACTION_TASKS);

LOG.info("Start to compaction for instant " + compactionInstantTime);
LOG.info("Start to compaction for instant " + compactionInstantTimes);

// Mark instant as compaction inflight
table.getActiveTimeline().transitionCompactionRequestedToInflight(instant);
for (HoodieInstant instant : instants) {
table.getActiveTimeline().transitionCompactionRequestedToInflight(instant);
}
table.getMetaClient().reloadActiveTimeline();

env.addSource(new CompactionPlanSourceFunction(compactionPlan, compactionInstantTime))
// use side-output to make operations that is in the same plan to be placed in the same stream
// keyby() cannot sure that different operations are in the different stream
DataStream<CompactionPlanEvent> source = env.addSource(new MultiCompactionPlanSourceFunction(compactionPlans))
.name("compaction_source")
.uid("uid_compaction_source")
.rebalance()
.uid("uid_compaction_source");

SingleOutputStreamOperator<Void> operator = source.rebalance()
.transform("compact_task",
TypeInformation.of(CompactionCommitEvent.class),
new ProcessOperator<>(new CompactFunction(conf)))
.setParallelism(compactionParallelism)
.addSink(new CompactionCommitSink(conf))
.name("clean_commits")
.uid("uid_clean_commits")
.process(new ProcessFunction<CompactionCommitEvent, Void>() {
@Override
public void processElement(CompactionCommitEvent event, ProcessFunction<CompactionCommitEvent, Void>.Context context, Collector<Void> out) {
context.output(new OutputTag<>(event.getInstant(), TypeInformation.of(CompactionCommitEvent.class)), event);
}
})
.name("group_by_compaction_plan")
.uid("uid_group_by_compaction_plan")
.setParallelism(1);

env.execute("flink_hudi_compaction_" + compactionInstantTime);
compactionPlans.forEach(pair ->
operator.getSideOutput(new OutputTag<>(pair.getLeft(), TypeInformation.of(CompactionCommitEvent.class)))
.addSink(new CompactionCommitSink(conf))
garyli1019 marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how could the sink function know when all the compaction instants are completed without having the list of compaction plan?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My concern is the sink function might end earlier than it suppose to

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Flink task of the offline compaction is in batch mode, and flink would wait that all sub task are finished. It would not end before all the sink is over.

.name("clean_commits " + pair.getLeft())
.uid("uid_clean_commits_" + pair.getLeft())
.setParallelism(1));

env.execute("flink_hudi_compaction_" + String.join(",", compactionInstantTimes));
lanyuanxiaoyao marked this conversation as resolved.
Show resolved Hide resolved
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.sink.compact;

import static java.util.stream.Collectors.toList;

import java.util.List;
import org.apache.flink.api.common.functions.AbstractRichFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.common.model.CompactionOperation;
import org.apache.hudi.common.util.collection.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Flink hudi compaction source function.
*
* <P>This function read the compaction plan as {@link CompactionOperation}s then assign the compaction task
* event {@link CompactionPlanEvent} to downstream operators.
*
* <p>The compaction instant time is specified explicitly with strategies:
*
* <ul>
* <li>If the timeline has no inflight instants,
* use {@link org.apache.hudi.common.table.timeline.HoodieActiveTimeline#createNewInstantTime()}
* as the instant time;</li>
* <li>If the timeline has inflight instants,
* use the median instant time between [last complete instant time, earliest inflight instant time]
* as the instant time.</li>
* </ul>
*/
public class MultiCompactionPlanSourceFunction extends AbstractRichFunction implements SourceFunction<CompactionPlanEvent> {

protected static final Logger LOG = LoggerFactory.getLogger(MultiCompactionPlanSourceFunction.class);

/**
* compaction plan instant -> compaction plan
*/
private final List<Pair<String, HoodieCompactionPlan>> compactionPlans;

public MultiCompactionPlanSourceFunction(List<Pair<String, HoodieCompactionPlan>> compactionPlans) {
this.compactionPlans = compactionPlans;
}

@Override
public void open(Configuration parameters) throws Exception {
// no operation
}

@Override
public void run(SourceContext sourceContext) throws Exception {
for (Pair<String, HoodieCompactionPlan> pair : compactionPlans) {
HoodieCompactionPlan compactionPlan = pair.getRight();
List<CompactionOperation> operations = compactionPlan.getOperations().stream()
.map(CompactionOperation::convertFromAvroRecordInstance).collect(toList());
LOG.info("CompactionPlanFunction compacting " + operations + " files");
for (CompactionOperation operation : operations) {
sourceContext.collect(new CompactionPlanEvent(pair.getLeft(), operation));
}
}
}

@Override
public void close() throws Exception {
// no operation
}

@Override
public void cancel() {
// no operation
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.sink.compact.strategy;

import java.util.List;
import java.util.stream.Collectors;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.sink.compact.FlinkCompactionConfig;

/**
* Select all pending compaction plan to compact
*/
public class AllPendingCompactionPlanSelectStrategy implements CompactionPlanSelectStrategy {
@Override
public List<HoodieInstant> select(HoodieTimeline pendingCompactionTimeline, FlinkCompactionConfig config) {
return pendingCompactionTimeline.getInstants().collect(Collectors.toList());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.sink.compact.strategy;

import java.util.List;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.sink.compact.FlinkCompactionConfig;

/**
* CompactionRangeStrategy
*/
public interface CompactionPlanSelectStrategy {
/**
* Define how to select compaction plan to compact
*/
List<HoodieInstant> select(HoodieTimeline pendingCompactionTimeline, FlinkCompactionConfig config);
}
Loading