Skip to content

Commit

Permalink
[HUDI-7090] Set the maxParallelism for singleton operator (apache#10090)
Browse files Browse the repository at this point in the history
  • Loading branch information
hehuiyuan authored Nov 17, 2023
1 parent faa73e9 commit f06ff5b
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,9 @@ private void cluster() throws Exception {
.addSink(new ClusteringCommitSink(conf))
.name("clustering_commit")
.uid("uid_clustering_commit")
.setParallelism(1);
.setParallelism(1)
.getTransformation()
.setMaxParallelism(1);

env.execute("flink_hudi_clustering_" + clusteringInstant.getTimestamp());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,9 @@ private void compact() throws Exception {
.addSink(new CompactionCommitSink(conf))
.name("compaction_commit")
.uid("uid_compaction_commit")
.setParallelism(1);
.setParallelism(1)
.getTransformation()
.setMaxParallelism(1);

env.execute("flink_hudi_compaction_" + String.join(",", compactionInstantTimes));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -410,10 +410,11 @@ public static DataStream<Object> hoodieStreamWrite(Configuration conf, DataStrea
* @return the compaction pipeline
*/
public static DataStreamSink<CompactionCommitEvent> compact(Configuration conf, DataStream<Object> dataStream) {
return dataStream.transform("compact_plan_generate",
DataStreamSink<CompactionCommitEvent> compactionCommitEventDataStream = dataStream.transform("compact_plan_generate",
TypeInformation.of(CompactionPlanEvent.class),
new CompactionPlanOperator(conf))
.setParallelism(1) // plan generate must be singleton
.setMaxParallelism(1)
// make the distribution strategy deterministic to avoid concurrent modifications
// on the same bucket files
.keyBy(plan -> plan.getOperation().getFileGroupId().getFileId())
Expand All @@ -424,6 +425,8 @@ public static DataStreamSink<CompactionCommitEvent> compact(Configuration conf,
.addSink(new CompactionCommitSink(conf))
.name("compact_commit")
.setParallelism(1); // compaction commit should be singleton
compactionCommitEventDataStream.getTransformation().setMaxParallelism(1);
return compactionCommitEventDataStream;
}

/**
Expand Down Expand Up @@ -452,6 +455,7 @@ public static DataStreamSink<ClusteringCommitEvent> cluster(Configuration conf,
TypeInformation.of(ClusteringPlanEvent.class),
new ClusteringPlanOperator(conf))
.setParallelism(1) // plan generate must be singleton
.setMaxParallelism(1) // plan generate must be singleton
.keyBy(plan ->
// make the distribution strategy deterministic to avoid concurrent modifications
// on the same bucket files
Expand All @@ -465,15 +469,19 @@ public static DataStreamSink<ClusteringCommitEvent> cluster(Configuration conf,
ExecNodeUtil.setManagedMemoryWeight(clusteringStream.getTransformation(),
conf.getInteger(FlinkOptions.WRITE_SORT_MEMORY) * 1024L * 1024L);
}
return clusteringStream.addSink(new ClusteringCommitSink(conf))
DataStreamSink<ClusteringCommitEvent> clusteringCommitEventDataStream = clusteringStream.addSink(new ClusteringCommitSink(conf))
.name("clustering_commit")
.setParallelism(1); // clustering commit should be singleton
clusteringCommitEventDataStream.getTransformation().setMaxParallelism(1);
return clusteringCommitEventDataStream;
}

public static DataStreamSink<Object> clean(Configuration conf, DataStream<Object> dataStream) {
return dataStream.addSink(new CleanFunction<>(conf))
DataStreamSink<Object> cleanCommitDataStream = dataStream.addSink(new CleanFunction<>(conf))
.setParallelism(1)
.name("clean_commits");
cleanCommitDataStream.getTransformation().setMaxParallelism(1);
return cleanCommitDataStream;
}

public static DataStreamSink<Object> dummySink(DataStream<Object> dataStream) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ public DataStream<RowData> produceDataStream(StreamExecutionEnvironment execEnv)
SingleOutputStreamOperator<RowData> source = execEnv.addSource(monitoringFunction, getSourceOperatorName("split_monitor"))
.uid(Pipelines.opUID("split_monitor", conf))
.setParallelism(1)
.setMaxParallelism(1)
.keyBy(MergeOnReadInputSplit::getFileId)
.transform("split_reader", typeInfo, factory)
.uid(Pipelines.opUID("split_reader", conf))
Expand Down

0 comments on commit f06ff5b

Please sign in to comment.