diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java index cdacefbf173e4..86be092ce883e 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java @@ -171,7 +171,7 @@ public static HoodieWriteConfig getHoodieClientConfig( .withMergeAllowDuplicateOnInserts(OptionsResolver.insertClustering(conf)) .withClusteringConfig( HoodieClusteringConfig.newBuilder() - .withAsyncClustering(conf.getBoolean(FlinkOptions.CLUSTERING_ASYNC_ENABLED)) + .withAsyncClustering(conf.getBoolean(FlinkOptions.CLUSTERING_SCHEDULE_ENABLED)) .withClusteringPlanStrategyClass(conf.getString(FlinkOptions.CLUSTERING_PLAN_STRATEGY_CLASS)) .withClusteringPlanPartitionFilterMode( ClusteringPlanPartitionFilterMode.valueOf(conf.getString(FlinkOptions.CLUSTERING_PLAN_PARTITION_FILTER_MODE_NAME))) diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/cluster/ITTestHoodieFlinkClustering.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/cluster/ITTestHoodieFlinkClustering.java index a0073d8a3703d..f50f5748be702 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/cluster/ITTestHoodieFlinkClustering.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/cluster/ITTestHoodieFlinkClustering.java @@ -63,6 +63,7 @@ import java.util.Map; import java.util.concurrent.TimeUnit; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; /** @@ -220,4 +221,70 @@ public void testHoodieFlinkClusteringService() throws Exception { TestData.checkWrittenData(tempFile, EXPECTED, 4); } + + @Test + public void testHoodieFlinkClusteringSchedule() throws Exception { + // Create hoodie table and insert into data. + EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build(); + TableEnvironment tableEnv = TableEnvironmentImpl.create(settings); + Map options = new HashMap<>(); + options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath()); + + // use append mode + options.put(FlinkOptions.OPERATION.key(), WriteOperationType.INSERT.value()); + options.put(FlinkOptions.INSERT_CLUSTER.key(), "false"); + + String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", options); + tableEnv.executeSql(hoodieTableDDL); + tableEnv.executeSql(TestSQL.INSERT_T1).await(); + + // wait for the asynchronous commit to finish + TimeUnit.SECONDS.sleep(3); + + // Make configuration and setAvroSchema. + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + FlinkClusteringConfig cfg = new FlinkClusteringConfig(); + cfg.path = tempFile.getAbsolutePath(); + Configuration conf = FlinkClusteringConfig.toFlinkConfig(cfg); + + // create metaClient + HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(conf); + + // set the table name + conf.setString(FlinkOptions.TABLE_NAME, metaClient.getTableConfig().getTableName()); + + // set record key field + conf.setString(FlinkOptions.RECORD_KEY_FIELD, metaClient.getTableConfig().getRecordKeyFieldProp()); + // set partition field + conf.setString(FlinkOptions.PARTITION_PATH_FIELD, metaClient.getTableConfig().getPartitionFieldProp()); + + long ckpTimeout = env.getCheckpointConfig().getCheckpointTimeout(); + conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, ckpTimeout); + conf.setString(FlinkOptions.PARTITION_PATH_FIELD, "partition"); + conf.setInteger(FlinkOptions.CLUSTERING_DELTA_COMMITS, 2); + conf.setBoolean(FlinkOptions.CLUSTERING_ASYNC_ENABLED, false); + conf.setBoolean(FlinkOptions.CLUSTERING_SCHEDULE_ENABLED, true); + + // set table schema + CompactionUtil.setAvroSchema(conf, metaClient); + + // To compute the clustering instant time. + String clusteringInstantTime = HoodieActiveTimeline.createNewInstantTime(); + + HoodieFlinkWriteClient writeClient = StreamerUtil.createWriteClient(conf); + + boolean scheduled = writeClient.scheduleClusteringAtInstant(clusteringInstantTime, Option.empty()); + + assertFalse(scheduled, "1 delta commit, the clustering plan should not be scheduled"); + + tableEnv.executeSql(TestSQL.INSERT_T1).await(); + // wait for the asynchronous commit to finish + TimeUnit.SECONDS.sleep(3); + + clusteringInstantTime = HoodieActiveTimeline.createNewInstantTime(); + + scheduled = writeClient.scheduleClusteringAtInstant(clusteringInstantTime, Option.empty()); + + assertTrue(scheduled, "2 delta commits, the clustering plan should be scheduled"); + } }