Skip to content

Commit

Permalink
[HUDI-5042]fix clustering schedule problem in flink when enable sched…
Browse files Browse the repository at this point in the history
…ule clustering and disable async clustering.
  • Loading branch information
hbg committed Oct 20, 2022
1 parent 779a965 commit 89e7924
Show file tree
Hide file tree
Showing 6 changed files with 227 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
Expand Down Expand Up @@ -56,35 +57,36 @@ public ClusteringPlanActionExecutor(HoodieEngineContext context,
this.extraMetadata = extraMetadata;
}

protected Option<HoodieClusteringPlan> createClusteringPlan() {
protected boolean isScheduleClustering(HoodieActiveTimeline activeTimeline) {
LOG.info("Checking if clustering needs to be run on " + config.getBasePath());
Option<HoodieInstant> lastClusteringInstant = table.getActiveTimeline()
Option<HoodieInstant> lastClusteringInstant = activeTimeline
.filter(s -> s.getAction().equalsIgnoreCase(HoodieTimeline.REPLACE_COMMIT_ACTION)).lastInstant();

int commitsSinceLastClustering = table.getActiveTimeline().getCommitsTimeline().filterCompletedInstants()
int commitsSinceLastClustering = activeTimeline.getCommitsTimeline().filterCompletedInstants()
.findInstantsAfter(lastClusteringInstant.map(HoodieInstant::getTimestamp).orElse("0"), Integer.MAX_VALUE)
.countInstants();

if (config.inlineClusteringEnabled() && config.getInlineClusterMaxCommits() > commitsSinceLastClustering) {
LOG.info("Not scheduling inline clustering as only " + commitsSinceLastClustering
int clusterMaxCommit = config.inlineClusteringEnabled() ? config.getInlineClusterMaxCommits() : config.getAsyncClusterMaxCommits();
if (clusterMaxCommit > commitsSinceLastClustering) {
LOG.info("Not scheduling clustering as only " + commitsSinceLastClustering
+ " commits was found since last clustering " + lastClusteringInstant + ". Waiting for "
+ config.getInlineClusterMaxCommits());
return Option.empty();
+ clusterMaxCommit);
return false;
} else {
LOG.info("Generating clustering plan for table " + config.getBasePath());
return true;
}
}

if (config.isAsyncClusteringEnabled() && config.getAsyncClusterMaxCommits() > commitsSinceLastClustering) {
LOG.info("Not scheduling async clustering as only " + commitsSinceLastClustering
+ " commits was found since last clustering " + lastClusteringInstant + ". Waiting for "
+ config.getAsyncClusterMaxCommits());
protected Option<HoodieClusteringPlan> createClusteringPlan() {
if (isScheduleClustering(table.getActiveTimeline())) {
ClusteringPlanStrategy strategy = (ClusteringPlanStrategy) ReflectionUtils.loadClass(
ClusteringPlanStrategy.checkAndGetClusteringPlanStrategy(config),
new Class<?>[] {HoodieTable.class, HoodieEngineContext.class, HoodieWriteConfig.class}, table, context, config);
return strategy.generateClusteringPlan();
} else {
return Option.empty();
}

LOG.info("Generating clustering plan for table " + config.getBasePath());
ClusteringPlanStrategy strategy = (ClusteringPlanStrategy) ReflectionUtils.loadClass(
ClusteringPlanStrategy.checkAndGetClusteringPlanStrategy(config),
new Class<?>[] {HoodieTable.class, HoodieEngineContext.class, HoodieWriteConfig.class}, table, context, config);

return strategy.generateClusteringPlan();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.hudi.table.action.cluster;

import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.engine.HoodieLocalEngineContext;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.testutils.MockHoodieTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodieWriteConfig;

import org.apache.hadoop.conf.Configuration;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import java.util.Arrays;
import java.util.List;

import static org.apache.hudi.common.model.ActionType.commit;
import static org.apache.hudi.common.model.ActionType.replacecommit;

class TestClusteringPlanActionExecutor {
@Test
public void testDeltaCommitsClusteringPlanScheduling() {
ClusteringPlanActionExecutor inlineExecutor = getClusteringPlanActionExecutor(true);
ClusteringPlanActionExecutor notInlineExecutor = getClusteringPlanActionExecutor(false);

List<HoodieInstant> instants =
Arrays.asList(
new HoodieInstant(HoodieInstant.State.COMPLETED, commit.name(), "1"),
new HoodieInstant(HoodieInstant.State.COMPLETED, commit.name(), "2")
);
HoodieActiveTimeline activeTimeline = new MockHoodieTimeline(instants);
Assertions.assertFalse(inlineExecutor.isScheduleClustering(activeTimeline),
"Enable inline clustering, 2 delta commits, should not schedule clustering");
Assertions.assertFalse(notInlineExecutor.isScheduleClustering(activeTimeline),
"Disable inline clustering, 2 delta commits, should not schedule clustering");

instants =
Arrays.asList(
new HoodieInstant(HoodieInstant.State.COMPLETED, commit.name(), "1"),
new HoodieInstant(HoodieInstant.State.COMPLETED, commit.name(), "2"),
new HoodieInstant(HoodieInstant.State.COMPLETED, commit.name(), "3")
);
activeTimeline = new MockHoodieTimeline(instants);
Assertions.assertFalse(inlineExecutor.isScheduleClustering(activeTimeline),
"Enable inline clustering, 3 delta commits, should not schedule clustering");
Assertions.assertTrue(notInlineExecutor.isScheduleClustering(activeTimeline),
"Disable inline clustering, 3 delta commits, should schedule clustering");

instants =
Arrays.asList(
new HoodieInstant(HoodieInstant.State.COMPLETED, commit.name(), "1"),
new HoodieInstant(HoodieInstant.State.COMPLETED, commit.name(), "2"),
new HoodieInstant(HoodieInstant.State.COMPLETED, replacecommit.name(), "3"),
new HoodieInstant(HoodieInstant.State.COMPLETED, commit.name(), "4"),
new HoodieInstant(HoodieInstant.State.COMPLETED, commit.name(), "5"),
new HoodieInstant(HoodieInstant.State.COMPLETED, commit.name(), "6")
);
activeTimeline = new MockHoodieTimeline(instants);
Assertions.assertFalse(inlineExecutor.isScheduleClustering(activeTimeline),
"Enable inline clustering, 3 delta commits after replacecommit, should not schedule clustering");
Assertions.assertTrue(notInlineExecutor.isScheduleClustering(activeTimeline),
"Disable inline clustering, 3 delta commits after replacecommit, should schedule clustering");

instants =
Arrays.asList(
new HoodieInstant(HoodieInstant.State.COMPLETED, commit.name(), "1"),
new HoodieInstant(HoodieInstant.State.COMPLETED, commit.name(), "2"),
new HoodieInstant(HoodieInstant.State.COMPLETED, commit.name(), "3"),
new HoodieInstant(HoodieInstant.State.COMPLETED, commit.name(), "4"),
new HoodieInstant(HoodieInstant.State.COMPLETED, commit.name(), "5")
);
activeTimeline = new MockHoodieTimeline(instants);
Assertions.assertTrue(inlineExecutor.isScheduleClustering(activeTimeline),
"Enable inline clustering, 5 delta commits, should schedule clustering");
Assertions.assertTrue(notInlineExecutor.isScheduleClustering(activeTimeline),
"Disable inline clustering, 5 delta commits, should schedule clustering");
}

private ClusteringPlanActionExecutor getClusteringPlanActionExecutor(boolean isInlineClustering) {
HoodieEngineContext engineContext = new HoodieLocalEngineContext(new Configuration());
String instantTime = "4";
HoodieWriteConfig config =
HoodieWriteConfig.newBuilder()
.withPath("/db/tbl")
.withClusteringConfig(
HoodieClusteringConfig.newBuilder()
.withAsyncClustering(false)
.withInlineClustering(isInlineClustering)
.withAsyncClusteringMaxCommits(3)
.withInlineClusteringNumCommits(5)
.build())
.build();

return new ClusteringPlanActionExecutor(engineContext,
config,
null,
instantTime,
Option.empty());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,9 @@ public void setup(int maxFileSize, Map<String, String> options) throws IOExcepti
.withClusteringConfig(HoodieClusteringConfig.newBuilder()
.withClusteringPlanStrategyClass(SparkConsistentBucketClusteringPlanStrategy.class.getName())
.withClusteringExecutionStrategyClass(SparkConsistentBucketClusteringExecutionStrategy.class.getName())
.withClusteringUpdatesStrategy(SparkConsistentBucketDuplicateUpdateStrategy.class.getName()).build())
.withClusteringUpdatesStrategy(SparkConsistentBucketDuplicateUpdateStrategy.class.getName())
.withInlineClusteringNumCommits(0)
.withAsyncClusteringMaxCommits(0).build())
.build();

writeClient = getHoodieWriteClient(config);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import java.util.Map;
import java.util.concurrent.TimeUnit;

import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

/**
Expand Down Expand Up @@ -220,4 +221,69 @@ public void testHoodieFlinkClusteringService() throws Exception {

TestData.checkWrittenData(tempFile, EXPECTED, 4);
}

@Test
public void testHoodieFlinkClusteringSchedule() throws Exception {
// Create hoodie table and insert into data.
EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build();
TableEnvironment tableEnv = TableEnvironmentImpl.create(settings);
Map<String, String> options = new HashMap<>();
options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());

// use append mode
options.put(FlinkOptions.OPERATION.key(), WriteOperationType.INSERT.value());
options.put(FlinkOptions.INSERT_CLUSTER.key(), "false");

String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", options);
tableEnv.executeSql(hoodieTableDDL);
tableEnv.executeSql(TestSQL.INSERT_T1).await();

// wait for the asynchronous commit to finish
TimeUnit.SECONDS.sleep(3);

// Make configuration and setAvroSchema.
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
FlinkClusteringConfig cfg = new FlinkClusteringConfig();
cfg.path = tempFile.getAbsolutePath();
Configuration conf = FlinkClusteringConfig.toFlinkConfig(cfg);

// create metaClient
HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(conf);

// set the table name
conf.setString(FlinkOptions.TABLE_NAME, metaClient.getTableConfig().getTableName());

// set record key field
conf.setString(FlinkOptions.RECORD_KEY_FIELD, metaClient.getTableConfig().getRecordKeyFieldProp());
// set partition field
conf.setString(FlinkOptions.PARTITION_PATH_FIELD, metaClient.getTableConfig().getPartitionFieldProp());

long ckpTimeout = env.getCheckpointConfig().getCheckpointTimeout();
conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, ckpTimeout);
conf.setString(FlinkOptions.PARTITION_PATH_FIELD, "partition");
conf.setInteger(FlinkOptions.CLUSTERING_DELTA_COMMITS, 2);
conf.setBoolean(FlinkOptions.CLUSTERING_ASYNC_ENABLED, false);

// set table schema
CompactionUtil.setAvroSchema(conf, metaClient);

// To compute the clustering instant time.
String clusteringInstantTime = HoodieActiveTimeline.createNewInstantTime();

HoodieFlinkWriteClient writeClient = StreamerUtil.createWriteClient(conf);

boolean scheduled = writeClient.scheduleClusteringAtInstant(clusteringInstantTime, Option.empty());

assertFalse(scheduled, "1 delta commit, the clustering plan should not be scheduled");

tableEnv.executeSql(TestSQL.INSERT_T1).await();
// wait for the asynchronous commit to finish
TimeUnit.SECONDS.sleep(3);

clusteringInstantTime = HoodieActiveTimeline.createNewInstantTime();

scheduled = writeClient.scheduleClusteringAtInstant(clusteringInstantTime, Option.empty());

assertTrue(scheduled, "2 delta commits, the clustering plan should be scheduled");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,12 @@ class RunClusteringProcedure extends BaseProcedure
val basePath: String = getBasePath(tableName, tablePath)
val metaClient = HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build
var conf: Map[String, String] = Map.empty

conf = conf ++ Map(
HoodieClusteringConfig.ASYNC_CLUSTERING_MAX_COMMITS.key() -> "0",
HoodieClusteringConfig.INLINE_CLUSTERING_MAX_COMMITS.key() -> "0"
)

predicate match {
case Some(p) =>
val prunedPartitions = prunePartition(metaClient, p.asInstanceOf[String])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package org.apache.spark.sql.hudi.procedure
import org.apache.hadoop.fs.Path
import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieInstant, HoodieTimeline}
import org.apache.hudi.common.util.{Option => HOption}
import org.apache.hudi.config.HoodieClusteringConfig
import org.apache.hudi.{HoodieCLIUtils, HoodieDataSourceHelpers}

import scala.collection.JavaConverters.asScalaIteratorConverter
Expand Down Expand Up @@ -52,7 +53,11 @@ class TestClusteringProcedure extends HoodieSparkProcedureTestBase {
spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
spark.sql(s"insert into $tableName values(2, 'a2', 10, 1001)")
spark.sql(s"insert into $tableName values(3, 'a3', 10, 1002)")
val client = HoodieCLIUtils.createHoodieClientFromPath(spark, basePath, Map.empty)

val configMap = Map(
HoodieClusteringConfig.ASYNC_CLUSTERING_MAX_COMMITS.key() -> "0"
)
val client = HoodieCLIUtils.createHoodieClientFromPath(spark, basePath, configMap)
// Generate the first clustering plan
val firstScheduleInstant = HoodieActiveTimeline.createNewInstantTime
client.scheduleClusteringAtInstant(firstScheduleInstant, HOption.empty())
Expand Down Expand Up @@ -153,7 +158,11 @@ class TestClusteringProcedure extends HoodieSparkProcedureTestBase {
spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
spark.sql(s"insert into $tableName values(2, 'a2', 10, 1001)")
spark.sql(s"insert into $tableName values(3, 'a3', 10, 1002)")
val client = HoodieCLIUtils.createHoodieClientFromPath(spark, basePath, Map.empty)

val configMap = Map(
HoodieClusteringConfig.ASYNC_CLUSTERING_MAX_COMMITS.key() -> "0"
)
val client = HoodieCLIUtils.createHoodieClientFromPath(spark, basePath, configMap)
// Generate the first clustering plan
val firstScheduleInstant = HoodieActiveTimeline.createNewInstantTime
client.scheduleClusteringAtInstant(firstScheduleInstant, HOption.empty())
Expand Down

0 comments on commit 89e7924

Please sign in to comment.