Skip to content

Commit

Permalink
[HUDI-4397] Flink Inline Cluster and Compact plan distribute strategy…
Browse files Browse the repository at this point in the history
… changed from rebalance to hash to avoid potential multiple threads accessing the same file (#6106)

Co-authored-by: jerryyue <jerryyue@didiglobal.com>
  • Loading branch information
JerryYue-M and jerryyue authored Jul 15, 2022
1 parent 4898ea5 commit b781b31
Showing 1 changed file with 8 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hudi.sink.utils;

import org.apache.hudi.common.model.ClusteringOperation;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.configuration.FlinkOptions;
Expand Down Expand Up @@ -65,6 +66,7 @@

import java.util.HashMap;
import java.util.Map;
import java.util.stream.Collectors;

/**
* Utilities to generate all kinds of sub-pipelines.
Expand Down Expand Up @@ -357,7 +359,7 @@ public static DataStream<Object> hoodieStreamWrite(Configuration conf, int defau
*
* <pre>
* /=== | task1 | ===\
* | plan generation | ===> re-balance | commit |
* | plan generation | ===> hash | commit |
* \=== | task2 | ===/
*
* Note: both the compaction plan generation task and commission task are singleton.
Expand All @@ -372,7 +374,7 @@ public static DataStreamSink<CompactionCommitEvent> compact(Configuration conf,
TypeInformation.of(CompactionPlanEvent.class),
new CompactionPlanOperator(conf))
.setParallelism(1) // plan generate must be singleton
.rebalance()
.keyBy(plan -> plan.getOperation().getFileGroupId().getFileId())
.transform("compact_task",
TypeInformation.of(CompactionCommitEvent.class),
new ProcessOperator<>(new CompactFunction(conf)))
Expand All @@ -392,7 +394,7 @@ public static DataStreamSink<CompactionCommitEvent> compact(Configuration conf,
*
* <pre>
* /=== | task1 | ===\
* | plan generation | ===> re-balance | commit |
* | plan generation | ===> hash | commit |
* \=== | task2 | ===/
*
* Note: both the clustering plan generation task and commission task are singleton.
Expand All @@ -408,7 +410,9 @@ public static DataStreamSink<ClusteringCommitEvent> cluster(Configuration conf,
TypeInformation.of(ClusteringPlanEvent.class),
new ClusteringPlanOperator(conf))
.setParallelism(1) // plan generate must be singleton
.rebalance()
.keyBy(plan -> plan.getClusteringGroupInfo().getOperations()
.stream().map(ClusteringOperation::getFileId)
.collect(Collectors.joining()))
.transform("clustering_task",
TypeInformation.of(ClusteringCommitEvent.class),
new ClusteringOperator(conf, rowType))
Expand Down

0 comments on commit b781b31

Please sign in to comment.