From 54df0acef9c32b435b53998350558673b02b083d Mon Sep 17 00:00:00 2001 From: jerryyue Date: Thu, 14 Jul 2022 17:27:05 +0800 Subject: [PATCH] [HUDI-4397] Flink Inline Cluster and Compact plan distribute strategy changed from rebalance to hash to avoid potential multiple threads accessing the same file --- .../java/org/apache/hudi/sink/utils/Pipelines.java | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java index 43d476cf2aa8..87a6551986d4 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java @@ -18,6 +18,7 @@ package org.apache.hudi.sink.utils; +import org.apache.hudi.common.model.ClusteringOperation; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.configuration.FlinkOptions; @@ -65,6 +66,7 @@ import java.util.HashMap; import java.util.Map; +import java.util.stream.Collectors; /** * Utilities to generate all kinds of sub-pipelines. @@ -357,7 +359,7 @@ public static DataStream hoodieStreamWrite(Configuration conf, int defau * *
    *                                           /=== | task1 | ===\
-   *      | plan generation | ===> re-balance                      | commit |
+   *      | plan generation | ===> hash                           | commit |
    *                                           \=== | task2 | ===/
    *
    *      Note: both the compaction plan generation task and commission task are singleton.
@@ -372,7 +374,7 @@ public static DataStreamSink compact(Configuration conf,
             TypeInformation.of(CompactionPlanEvent.class),
             new CompactionPlanOperator(conf))
         .setParallelism(1) // plan generate must be singleton
-        .rebalance()
+        .keyBy(plan -> plan.getOperation().getFileGroupId().getFileId())
         .transform("compact_task",
             TypeInformation.of(CompactionCommitEvent.class),
             new ProcessOperator<>(new CompactFunction(conf)))
@@ -392,7 +394,7 @@ public static DataStreamSink compact(Configuration conf,
    *
    * 
    *                                           /=== | task1 | ===\
-   *      | plan generation | ===> re-balance                      | commit |
+   *      | plan generation | ===> hash                           | commit |
    *                                           \=== | task2 | ===/
    *
    *      Note: both the clustering plan generation task and commission task are singleton.
@@ -408,7 +410,9 @@ public static DataStreamSink cluster(Configuration conf,
             TypeInformation.of(ClusteringPlanEvent.class),
             new ClusteringPlanOperator(conf))
         .setParallelism(1) // plan generate must be singleton
-        .rebalance()
+        .keyBy(plan -> plan.getClusteringGroupInfo().getOperations()
+          .stream().map(ClusteringOperation::getFileId)
+          .collect(Collectors.joining()))
         .transform("clustering_task",
             TypeInformation.of(ClusteringCommitEvent.class),
             new ClusteringOperator(conf, rowType))