From 03e3807dda0048edfbdac22c034a61cf4ac35107 Mon Sep 17 00:00:00 2001 From: Danny Chan Date: Wed, 7 Dec 2022 18:31:26 +0800 Subject: [PATCH] [HUDI-3661] Flink async compaction is not thread safe when use watermark (#7399) --- .../sink/clustering/ClusteringOperator.java | 8 ++- ...pactFunction.java => CompactOperator.java} | 31 ++++++++--- .../sink/compact/CompactionCommitEvent.java | 2 +- .../sink/compact/HoodieFlinkCompactor.java | 3 +- .../org/apache/hudi/sink/utils/Pipelines.java | 5 +- .../compact/ITTestHoodieFlinkCompactor.java | 5 +- .../sink/utils/ClusteringFunctionWrapper.java | 2 +- .../sink/utils/CompactFunctionWrapper.java | 53 ++++++++++--------- .../utils/StreamWriteFunctionWrapper.java | 2 +- 9 files changed, 66 insertions(+), 45 deletions(-) rename hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/{CompactFunction.java => CompactOperator.java} (82%) diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java index ca1cd54c1fed3..e7bde41ca8b0a 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java @@ -60,6 +60,7 @@ import org.apache.flink.streaming.api.operators.BoundedOneInput; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.binary.BinaryRowData; @@ -159,7 +160,12 @@ public void open() throws Exception { this.executor = NonThrownExecutor.builder(LOG).build(); } - collector = new StreamRecordCollector<>(output); + this.collector = new StreamRecordCollector<>(output); + } + + @Override + public void processWatermark(Watermark mark) { + // no need to propagate the watermark } @Override diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactFunction.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactOperator.java similarity index 82% rename from hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactFunction.java rename to hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactOperator.java index db05b0dbabeee..65f70ad6aaf0f 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactFunction.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactOperator.java @@ -32,7 +32,11 @@ import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.configuration.Configuration; -import org.apache.flink.streaming.api.functions.ProcessFunction; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.table.runtime.operators.TableStreamOperator; +import org.apache.flink.table.runtime.util.StreamRecordCollector; import org.apache.flink.util.Collector; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,11 +45,12 @@ import java.util.List; /** - * Function to execute the actual compaction task assigned by the compaction plan task. + * Operator to execute the actual compaction task assigned by the compaction plan task. * In order to execute scalable, the input should shuffle by the compact event {@link CompactionPlanEvent}. */ -public class CompactFunction extends ProcessFunction { - private static final Logger LOG = LoggerFactory.getLogger(CompactFunction.class); +public class CompactOperator extends TableStreamOperator + implements OneInputStreamOperator { + private static final Logger LOG = LoggerFactory.getLogger(CompactOperator.class); /** * Config options. @@ -72,22 +77,34 @@ public class CompactFunction extends ProcessFunction collector; + + public CompactOperator(Configuration conf) { this.conf = conf; this.asyncCompaction = OptionsResolver.needsAsyncCompaction(conf); } @Override - public void open(Configuration parameters) throws Exception { + public void open() throws Exception { this.taskID = getRuntimeContext().getIndexOfThisSubtask(); this.writeClient = FlinkWriteClients.createWriteClient(conf, getRuntimeContext()); if (this.asyncCompaction) { this.executor = NonThrownExecutor.builder(LOG).build(); } + this.collector = new StreamRecordCollector<>(output); + } + + @Override + public void processWatermark(Watermark mark) { + // no need to propagate the watermark } @Override - public void processElement(CompactionPlanEvent event, Context context, Collector collector) throws Exception { + public void processElement(StreamRecord record) throws Exception { + final CompactionPlanEvent event = record.getValue(); final String instantTime = event.getCompactionInstantTime(); final CompactionOperation compactionOperation = event.getOperation(); if (asyncCompaction) { diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitEvent.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitEvent.java index 398dfcf6195fb..faad4c2338d1b 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitEvent.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitEvent.java @@ -24,7 +24,7 @@ import java.util.List; /** - * Represents a commit event from the compaction task {@link CompactFunction}. + * Represents a commit event from the compaction task {@link CompactOperator}. */ public class CompactionCommitEvent implements Serializable { private static final long serialVersionUID = 1L; diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java index 34576cfb0174e..1475a493c1a38 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java @@ -41,7 +41,6 @@ import org.apache.flink.client.deployment.application.ApplicationExecutionException; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.operators.ProcessOperator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -311,7 +310,7 @@ private void compact() throws Exception { .rebalance() .transform("compact_task", TypeInformation.of(CompactionCommitEvent.class), - new ProcessOperator<>(new CompactFunction(conf))) + new CompactOperator(conf)) .setParallelism(compactionParallelism) .addSink(new CompactionCommitSink(conf)) .name("compaction_commit") diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java index a045a9276c544..d17213dcc0493 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java @@ -39,7 +39,7 @@ import org.apache.hudi.sink.clustering.ClusteringPlanEvent; import org.apache.hudi.sink.clustering.ClusteringPlanOperator; import org.apache.hudi.sink.common.WriteOperatorFactory; -import org.apache.hudi.sink.compact.CompactFunction; +import org.apache.hudi.sink.compact.CompactOperator; import org.apache.hudi.sink.compact.CompactionCommitEvent; import org.apache.hudi.sink.compact.CompactionCommitSink; import org.apache.hudi.sink.compact.CompactionPlanEvent; @@ -57,7 +57,6 @@ import org.apache.flink.streaming.api.datastream.DataStreamSink; import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.streaming.api.operators.KeyedProcessOperator; -import org.apache.flink.streaming.api.operators.ProcessOperator; import org.apache.flink.table.data.RowData; import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil; import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; @@ -372,7 +371,7 @@ public static DataStreamSink compact(Configuration conf, .keyBy(plan -> plan.getOperation().getFileGroupId().getFileId()) .transform("compact_task", TypeInformation.of(CompactionCommitEvent.class), - new ProcessOperator<>(new CompactFunction(conf))) + new CompactOperator(conf)) .setParallelism(conf.getInteger(FlinkOptions.COMPACTION_TASKS)) .addSink(new CompactionCommitSink(conf)) .name("compact_commit") diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java index 711b738288fe8..6157b5e901130 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java @@ -39,7 +39,6 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.operators.ProcessOperator; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.config.ExecutionConfigOptions; @@ -158,7 +157,7 @@ public void testHoodieFlinkCompactor(boolean enableChangelog) throws Exception { .rebalance() .transform("compact_task", TypeInformation.of(CompactionCommitEvent.class), - new ProcessOperator<>(new CompactFunction(conf))) + new CompactOperator(conf)) .setParallelism(FlinkMiniCluster.DEFAULT_PARALLELISM) .addSink(new CompactionCommitSink(conf)) .name("clean_commits") @@ -282,7 +281,7 @@ public void testHoodieFlinkCompactorWithPlanSelectStrategy(boolean enableChangel .rebalance() .transform("compact_task", TypeInformation.of(CompactionCommitEvent.class), - new ProcessOperator<>(new CompactFunction(conf))) + new CompactOperator(conf)) .setParallelism(1) .addSink(new CompactionCommitSink(conf)) .name("compaction_commit") diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/ClusteringFunctionWrapper.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/ClusteringFunctionWrapper.java index 55a79915d4755..e3b75cbf6379c 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/ClusteringFunctionWrapper.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/ClusteringFunctionWrapper.java @@ -69,6 +69,7 @@ public class ClusteringFunctionWrapper { private ClusteringCommitSink commitSink; public ClusteringFunctionWrapper(Configuration conf, StreamTask streamTask, StreamConfig streamConfig) { + this.conf = conf; this.ioManager = new IOManagerAsync(); MockEnvironment environment = new MockEnvironmentBuilder() .setTaskName("mockTask") @@ -76,7 +77,6 @@ public ClusteringFunctionWrapper(Configuration conf, StreamTask streamTask .setIOManager(ioManager) .build(); this.runtimeContext = new MockStreamingRuntimeContext(false, 1, 0, environment); - this.conf = conf; this.streamTask = streamTask; this.streamConfig = streamConfig; } diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/CompactFunctionWrapper.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/CompactFunctionWrapper.java index 1dba81ce2b7b6..78a8305c9c51b 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/CompactFunctionWrapper.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/CompactFunctionWrapper.java @@ -19,7 +19,7 @@ package org.apache.hudi.sink.utils; import org.apache.hudi.avro.model.HoodieCompactionPlan; -import org.apache.hudi.sink.compact.CompactFunction; +import org.apache.hudi.sink.compact.CompactOperator; import org.apache.hudi.sink.compact.CompactionCommitEvent; import org.apache.hudi.sink.compact.CompactionCommitSink; import org.apache.hudi.sink.compact.CompactionPlanEvent; @@ -33,14 +33,14 @@ import org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorContext; import org.apache.flink.runtime.operators.testutils.MockEnvironment; import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder; +import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; -import org.apache.flink.util.Collector; - -import java.util.ArrayList; -import java.util.List; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.StreamTask; +import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService; /** - * A wrapper class to manipulate the {@link org.apache.hudi.sink.compact.CompactFunction} instance for testing. + * A wrapper class to manipulate the {@link CompactOperator} instance for testing. */ public class CompactFunctionWrapper { private final Configuration conf; @@ -48,20 +48,28 @@ public class CompactFunctionWrapper { private final IOManager ioManager; private final StreamingRuntimeContext runtimeContext; + private final StreamTask streamTask; + private final StreamConfig streamConfig; + /** * Function that generates the {@link HoodieCompactionPlan}. */ private CompactionPlanOperator compactionPlanOperator; + /** + * Output to collect the compaction commit events. + */ + private CollectorOutput commitEventOutput; /** * Function that executes the compaction task. */ - private CompactFunction compactFunction; + private CompactOperator compactOperator; /** * Stream sink to handle compaction commits. */ private CompactionCommitSink commitSink; - public CompactFunctionWrapper(Configuration conf) throws Exception { + public CompactFunctionWrapper(Configuration conf, StreamTask streamTask, StreamConfig streamConfig) { + this.conf = conf; this.ioManager = new IOManagerAsync(); MockEnvironment environment = new MockEnvironmentBuilder() .setTaskName("mockTask") @@ -69,19 +77,23 @@ public CompactFunctionWrapper(Configuration conf) throws Exception { .setIOManager(ioManager) .build(); this.runtimeContext = new MockStreamingRuntimeContext(false, 1, 0, environment); - this.conf = conf; + this.streamTask = streamTask; + this.streamConfig = streamConfig; } public void openFunction() throws Exception { compactionPlanOperator = new CompactionPlanOperator(conf); compactionPlanOperator.open(); - compactFunction = new CompactFunction(conf); - compactFunction.setRuntimeContext(runtimeContext); - compactFunction.open(conf); + compactOperator = new CompactOperator(conf); + // CAUTION: deprecated API used. + compactOperator.setProcessingTimeService(new TestProcessingTimeService()); + commitEventOutput = new CollectorOutput<>(); + compactOperator.setup(streamTask, streamConfig, commitEventOutput); + compactOperator.open(); final NonThrownExecutor syncExecutor = new MockCoordinatorExecutor( new MockOperatorCoordinatorContext(new OperatorID(), 1)); - compactFunction.setExecutor(syncExecutor); + compactOperator.setExecutor(syncExecutor); commitSink = new CompactionCommitSink(conf); commitSink.setRuntimeContext(runtimeContext); @@ -94,22 +106,11 @@ public void compact(long checkpointID) throws Exception { compactionPlanOperator.setOutput(output); compactionPlanOperator.notifyCheckpointComplete(checkpointID); // collect the CompactCommitEvents - List compactCommitEvents = new ArrayList<>(); for (CompactionPlanEvent event : output.getRecords()) { - compactFunction.processElement(event, null, new Collector() { - @Override - public void collect(CompactionCommitEvent event) { - compactCommitEvents.add(event); - } - - @Override - public void close() { - - } - }); + compactOperator.processElement(new StreamRecord<>(event)); } // handle and commit the compaction - for (CompactionCommitEvent event : compactCommitEvents) { + for (CompactionCommitEvent event : commitEventOutput.getRecords()) { commitSink.invoke(event, null); } } diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java index a1a14456e3c59..db8ff36962b34 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java @@ -117,7 +117,6 @@ public StreamWriteFunctionWrapper(String tablePath, Configuration conf) throws E // one function this.coordinatorContext = new MockOperatorCoordinatorContext(new OperatorID(), 1); this.coordinator = new StreamWriteOperatorCoordinator(conf, this.coordinatorContext); - this.compactFunctionWrapper = new CompactFunctionWrapper(this.conf); this.bucketAssignFunctionContext = new MockBucketAssignFunctionContext(); this.stateInitializationContext = new MockStateInitializationContext(); this.asyncCompaction = OptionsResolver.needsAsyncCompaction(conf); @@ -127,6 +126,7 @@ public StreamWriteFunctionWrapper(String tablePath, Configuration conf) throws E .setConfig(new StreamConfig(conf)) .setExecutionConfig(new ExecutionConfig().enableObjectReuse()) .build(); + this.compactFunctionWrapper = new CompactFunctionWrapper(this.conf, this.streamTask, this.streamConfig); } public void openFunction() throws Exception {