diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java index e2d2972a0de43..92c73e2b9f5f1 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java @@ -173,6 +173,8 @@ public AsyncCompactionService(FlinkCompactionConfig cfg, Configuration conf, Str // set table schema CompactionUtil.setAvroSchema(conf, metaClient); + CompactionUtil.setPreCombineField(conf, metaClient); + // infer changelog mode CompactionUtil.inferChangelogMode(conf, metaClient); diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java index 3d386cf8cc175..b0bac433b60e0 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java @@ -119,6 +119,20 @@ public static void setAvroSchema(HoodieWriteConfig writeConfig, HoodieTableMetaC writeConfig.setSchema(tableAvroSchema.toString()); } + /** + * Sets up the preCombine field into the given configuration {@code conf} + * through reading from the hoodie table metadata. + * + * This value is non-null as compaction can only be performed on MOR tables. + * Of which, MOR tables will have non-null precombine fields. + * + * @param conf The configuration + */ + public static void setPreCombineField(Configuration conf, HoodieTableMetaClient metaClient) { + String preCombineField = metaClient.getTableConfig().getPreCombineField(); + conf.setString(FlinkOptions.PRECOMBINE_FIELD, preCombineField); + } + /** * Infers the changelog mode based on the data file schema(including metadata fields). *