Skip to content

Commit

Permalink
[HUDI-3661] Flink async compaction is not thread safe when use waterm…
Browse files Browse the repository at this point in the history
…ark (#7399)
  • Loading branch information
danny0405 authored and nsivabalan committed Dec 13, 2022
1 parent 2455743 commit 49e6976
Show file tree
Hide file tree
Showing 9 changed files with 66 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.BinaryRowData;
Expand Down Expand Up @@ -159,7 +160,12 @@ public void open() throws Exception {
this.executor = NonThrownExecutor.builder(LOG).build();
}

collector = new StreamRecordCollector<>(output);
this.collector = new StreamRecordCollector<>(output);
}

@Override
public void processWatermark(Watermark mark) {
// no need to propagate the watermark
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,11 @@

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.runtime.operators.TableStreamOperator;
import org.apache.flink.table.runtime.util.StreamRecordCollector;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -40,11 +44,12 @@
import java.util.List;

/**
* Function to execute the actual compaction task assigned by the compaction plan task.
* Operator to execute the actual compaction task assigned by the compaction plan task.
* In order to execute scalable, the input should shuffle by the compact event {@link CompactionPlanEvent}.
*/
public class CompactFunction extends ProcessFunction<CompactionPlanEvent, CompactionCommitEvent> {
private static final Logger LOG = LoggerFactory.getLogger(CompactFunction.class);
public class CompactOperator extends TableStreamOperator<CompactionCommitEvent>
implements OneInputStreamOperator<CompactionPlanEvent, CompactionCommitEvent> {
private static final Logger LOG = LoggerFactory.getLogger(CompactOperator.class);

/**
* Config options.
Expand All @@ -71,22 +76,34 @@ public class CompactFunction extends ProcessFunction<CompactionPlanEvent, Compac
*/
private transient NonThrownExecutor executor;

public CompactFunction(Configuration conf) {
/**
* Output records collector.
*/
private transient StreamRecordCollector<CompactionCommitEvent> collector;

public CompactOperator(Configuration conf) {
this.conf = conf;
this.asyncCompaction = OptionsResolver.needsAsyncCompaction(conf);
}

@Override
public void open(Configuration parameters) throws Exception {
public void open() throws Exception {
this.taskID = getRuntimeContext().getIndexOfThisSubtask();
this.writeClient = FlinkWriteClients.createWriteClient(conf, getRuntimeContext());
if (this.asyncCompaction) {
this.executor = NonThrownExecutor.builder(LOG).build();
}
this.collector = new StreamRecordCollector<>(output);
}

@Override
public void processWatermark(Watermark mark) {
// no need to propagate the watermark
}

@Override
public void processElement(CompactionPlanEvent event, Context context, Collector<CompactionCommitEvent> collector) throws Exception {
public void processElement(StreamRecord<CompactionPlanEvent> record) throws Exception {
final CompactionPlanEvent event = record.getValue();
final String instantTime = event.getCompactionInstantTime();
final CompactionOperation compactionOperation = event.getOperation();
if (asyncCompaction) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import java.util.List;

/**
* Represents a commit event from the compaction task {@link CompactFunction}.
* Represents a commit event from the compaction task {@link CompactOperator}.
*/
public class CompactionCommitEvent implements Serializable {
private static final long serialVersionUID = 1L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import org.apache.flink.client.deployment.application.ApplicationExecutionException;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.operators.ProcessOperator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -312,7 +311,7 @@ private void compact() throws Exception {
.rebalance()
.transform("compact_task",
TypeInformation.of(CompactionCommitEvent.class),
new ProcessOperator<>(new CompactFunction(conf)))
new CompactOperator(conf))
.setParallelism(compactionParallelism)
.addSink(new CompactionCommitSink(conf))
.name("compaction_commit")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
import org.apache.hudi.sink.clustering.ClusteringPlanEvent;
import org.apache.hudi.sink.clustering.ClusteringPlanOperator;
import org.apache.hudi.sink.common.WriteOperatorFactory;
import org.apache.hudi.sink.compact.CompactFunction;
import org.apache.hudi.sink.compact.CompactOperator;
import org.apache.hudi.sink.compact.CompactionCommitEvent;
import org.apache.hudi.sink.compact.CompactionCommitSink;
import org.apache.hudi.sink.compact.CompactionPlanEvent;
Expand All @@ -57,7 +57,6 @@
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
import org.apache.flink.streaming.api.operators.ProcessOperator;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
Expand Down Expand Up @@ -372,7 +371,7 @@ public static DataStreamSink<CompactionCommitEvent> compact(Configuration conf,
.keyBy(plan -> plan.getOperation().getFileGroupId().getFileId())
.transform("compact_task",
TypeInformation.of(CompactionCommitEvent.class),
new ProcessOperator<>(new CompactFunction(conf)))
new CompactOperator(conf))
.setParallelism(conf.getInteger(FlinkOptions.COMPACTION_TASKS))
.addSink(new CompactionCommitSink(conf))
.name("compact_commit")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.operators.ProcessOperator;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
Expand Down Expand Up @@ -158,7 +157,7 @@ public void testHoodieFlinkCompactor(boolean enableChangelog) throws Exception {
.rebalance()
.transform("compact_task",
TypeInformation.of(CompactionCommitEvent.class),
new ProcessOperator<>(new CompactFunction(conf)))
new CompactOperator(conf))
.setParallelism(FlinkMiniCluster.DEFAULT_PARALLELISM)
.addSink(new CompactionCommitSink(conf))
.name("clean_commits")
Expand Down Expand Up @@ -282,7 +281,7 @@ public void testHoodieFlinkCompactorWithPlanSelectStrategy(boolean enableChangel
.rebalance()
.transform("compact_task",
TypeInformation.of(CompactionCommitEvent.class),
new ProcessOperator<>(new CompactFunction(conf)))
new CompactOperator(conf))
.setParallelism(1)
.addSink(new CompactionCommitSink(conf))
.name("compaction_commit")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,14 +69,14 @@ public class ClusteringFunctionWrapper {
private ClusteringCommitSink commitSink;

public ClusteringFunctionWrapper(Configuration conf, StreamTask<?, ?> streamTask, StreamConfig streamConfig) {
this.conf = conf;
this.ioManager = new IOManagerAsync();
MockEnvironment environment = new MockEnvironmentBuilder()
.setTaskName("mockTask")
.setManagedMemorySize(4 * MemoryManager.DEFAULT_PAGE_SIZE)
.setIOManager(ioManager)
.build();
this.runtimeContext = new MockStreamingRuntimeContext(false, 1, 0, environment);
this.conf = conf;
this.streamTask = streamTask;
this.streamConfig = streamConfig;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
package org.apache.hudi.sink.utils;

import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.sink.compact.CompactFunction;
import org.apache.hudi.sink.compact.CompactOperator;
import org.apache.hudi.sink.compact.CompactionCommitEvent;
import org.apache.hudi.sink.compact.CompactionCommitSink;
import org.apache.hudi.sink.compact.CompactionPlanEvent;
Expand All @@ -33,55 +33,67 @@
import org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorContext;
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.util.Collector;

import java.util.ArrayList;
import java.util.List;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;

/**
* A wrapper class to manipulate the {@link org.apache.hudi.sink.compact.CompactFunction} instance for testing.
* A wrapper class to manipulate the {@link CompactOperator} instance for testing.
*/
public class CompactFunctionWrapper {
private final Configuration conf;

private final IOManager ioManager;
private final StreamingRuntimeContext runtimeContext;

private final StreamTask<?, ?> streamTask;
private final StreamConfig streamConfig;

/**
* Function that generates the {@link HoodieCompactionPlan}.
*/
private CompactionPlanOperator compactionPlanOperator;
/**
* Output to collect the compaction commit events.
*/
private CollectorOutput<CompactionCommitEvent> commitEventOutput;
/**
* Function that executes the compaction task.
*/
private CompactFunction compactFunction;
private CompactOperator compactOperator;
/**
* Stream sink to handle compaction commits.
*/
private CompactionCommitSink commitSink;

public CompactFunctionWrapper(Configuration conf) throws Exception {
public CompactFunctionWrapper(Configuration conf, StreamTask<?, ?> streamTask, StreamConfig streamConfig) {
this.conf = conf;
this.ioManager = new IOManagerAsync();
MockEnvironment environment = new MockEnvironmentBuilder()
.setTaskName("mockTask")
.setManagedMemorySize(4 * MemoryManager.DEFAULT_PAGE_SIZE)
.setIOManager(ioManager)
.build();
this.runtimeContext = new MockStreamingRuntimeContext(false, 1, 0, environment);
this.conf = conf;
this.streamTask = streamTask;
this.streamConfig = streamConfig;
}

public void openFunction() throws Exception {
compactionPlanOperator = new CompactionPlanOperator(conf);
compactionPlanOperator.open();

compactFunction = new CompactFunction(conf);
compactFunction.setRuntimeContext(runtimeContext);
compactFunction.open(conf);
compactOperator = new CompactOperator(conf);
// CAUTION: deprecated API used.
compactOperator.setProcessingTimeService(new TestProcessingTimeService());
commitEventOutput = new CollectorOutput<>();
compactOperator.setup(streamTask, streamConfig, commitEventOutput);
compactOperator.open();
final NonThrownExecutor syncExecutor = new MockCoordinatorExecutor(
new MockOperatorCoordinatorContext(new OperatorID(), 1));
compactFunction.setExecutor(syncExecutor);
compactOperator.setExecutor(syncExecutor);

commitSink = new CompactionCommitSink(conf);
commitSink.setRuntimeContext(runtimeContext);
Expand All @@ -94,22 +106,11 @@ public void compact(long checkpointID) throws Exception {
compactionPlanOperator.setOutput(output);
compactionPlanOperator.notifyCheckpointComplete(checkpointID);
// collect the CompactCommitEvents
List<CompactionCommitEvent> compactCommitEvents = new ArrayList<>();
for (CompactionPlanEvent event : output.getRecords()) {
compactFunction.processElement(event, null, new Collector<CompactionCommitEvent>() {
@Override
public void collect(CompactionCommitEvent event) {
compactCommitEvents.add(event);
}

@Override
public void close() {

}
});
compactOperator.processElement(new StreamRecord<>(event));
}
// handle and commit the compaction
for (CompactionCommitEvent event : compactCommitEvents) {
for (CompactionCommitEvent event : commitEventOutput.getRecords()) {
commitSink.invoke(event, null);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,6 @@ public StreamWriteFunctionWrapper(String tablePath, Configuration conf) throws E
// one function
this.coordinatorContext = new MockOperatorCoordinatorContext(new OperatorID(), 1);
this.coordinator = new StreamWriteOperatorCoordinator(conf, this.coordinatorContext);
this.compactFunctionWrapper = new CompactFunctionWrapper(this.conf);
this.bucketAssignFunctionContext = new MockBucketAssignFunctionContext();
this.stateInitializationContext = new MockStateInitializationContext();
this.asyncCompaction = OptionsResolver.needsAsyncCompaction(conf);
Expand All @@ -127,6 +126,7 @@ public StreamWriteFunctionWrapper(String tablePath, Configuration conf) throws E
.setConfig(new StreamConfig(conf))
.setExecutionConfig(new ExecutionConfig().enableObjectReuse())
.build();
this.compactFunctionWrapper = new CompactFunctionWrapper(this.conf, this.streamTask, this.streamConfig);
}

public void openFunction() throws Exception {
Expand Down

0 comments on commit 49e6976

Please sign in to comment.