From 9607a566bc35ac7908e4f4abf550924f1d1cad49 Mon Sep 17 00:00:00 2001 From: "yuzhao.cyz" Date: Wed, 6 Jul 2022 12:25:56 +0800 Subject: [PATCH] [HUDI-4366] Synchronous cleaning for flink bounded source --- .../org/apache/hudi/configuration/OptionsResolver.java | 8 ++++++++ .../src/main/java/org/apache/hudi/sink/CleanFunction.java | 4 ++-- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java index 4cfa0bc92aa40..64bd91f480e32 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java @@ -164,4 +164,12 @@ public static boolean needsScheduleClustering(Configuration conf) { public static boolean sortClusteringEnabled(Configuration conf) { return !StringUtils.isNullOrEmpty(conf.getString(FlinkOptions.CLUSTERING_SORT_COLUMNS)); } + + /** + * Returns whether the operation is INSERT OVERWRITE (table or partition). + */ + public static boolean isInsertOverwrite(Configuration conf) { + return conf.getString(FlinkOptions.OPERATION).equals(WriteOperationType.INSERT_OVERWRITE_TABLE.value()) + || conf.getString(FlinkOptions.OPERATION).equals(WriteOperationType.INSERT_OVERWRITE.value()); + } } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java index 65f07d7c7a83e..1c827517ff144 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java @@ -19,9 +19,9 @@ package org.apache.hudi.sink; import org.apache.hudi.client.HoodieFlinkWriteClient; -import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.configuration.OptionsResolver; import org.apache.hudi.sink.utils.NonThrownExecutor; import org.apache.hudi.util.StreamerUtil; @@ -65,7 +65,7 @@ public void open(Configuration parameters) throws Exception { this.writeClient = StreamerUtil.createWriteClient(conf, getRuntimeContext()); this.executor = NonThrownExecutor.builder(LOG).waitForTasksFinish(true).build(); - if (conf.getString(FlinkOptions.OPERATION).equals(WriteOperationType.INSERT_OVERWRITE_TABLE.value())) { + if (OptionsResolver.isInsertOverwrite(conf)) { String instantTime = HoodieActiveTimeline.createNewInstantTime(); LOG.info(String.format("exec sync clean with instant time %s...", instantTime)); executor.execute(() -> writeClient.clean(instantTime), "wait for sync cleaning finish");