Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-4717] CompactionCommitEvent message corrupted when sent by compact_task #6524

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import java.util.List;

/**
* Represents a commit event from the clustering task {@link ClusteringOperator}.
* Represents a commit event from the clustering task {@link ClusteringFunction}.
*/
public class ClusteringCommitEvent implements Serializable {
private static final long serialVersionUID = 1L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,30 +53,37 @@
import org.apache.avro.generic.IndexedRecord;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.memory.ManagedMemoryUseCase;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.flink.streaming.api.operators.YieldingOperatorFactory;
import org.apache.flink.streaming.api.operators.async.AsyncWaitOperatorFactory;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.planner.codegen.sort.SortCodeGenerator;
import org.apache.flink.table.runtime.generated.NormalizedKeyComputer;
import org.apache.flink.table.runtime.generated.RecordComparator;
import org.apache.flink.table.runtime.operators.TableStreamOperator;
import org.apache.flink.table.runtime.operators.sort.BinaryExternalSorter;
import org.apache.flink.table.runtime.typeutils.AbstractRowDataSerializer;
import org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer;
import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
import org.apache.flink.table.runtime.util.StreamRecordCollector;
import org.apache.flink.table.types.logical.RowType;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Spliterator;
Expand All @@ -87,12 +94,11 @@
import static org.apache.hudi.table.format.FormatUtils.buildAvroRecordBySchema;

/**
* Operator to execute the actual clustering task assigned by the clustering plan task.
* Function to execute the actual clustering task assigned by the clustering plan task.
* In order to execute scalable, the input should shuffle by the clustering event {@link ClusteringPlanEvent}.
*/
public class ClusteringOperator extends TableStreamOperator<ClusteringCommitEvent> implements
OneInputStreamOperator<ClusteringPlanEvent, ClusteringCommitEvent>, BoundedOneInput {
private static final Logger LOG = LoggerFactory.getLogger(ClusteringOperator.class);
public class ClusteringFunction extends RichAsyncFunction<ClusteringPlanEvent, ClusteringCommitEvent> {
private static final Logger LOG = LoggerFactory.getLogger(ClusteringFunction.class);

private final Configuration conf;
private final RowType rowType;
Expand All @@ -107,7 +113,6 @@ public class ClusteringOperator extends TableStreamOperator<ClusteringCommitEven
private transient BulkInsertWriterHelper writerHelper;

private transient BinaryExternalSorter sorter;
private transient StreamRecordCollector<ClusteringCommitEvent> collector;
private transient BinaryRowDataSerializer binarySerializer;

/**
Expand All @@ -125,16 +130,25 @@ public class ClusteringOperator extends TableStreamOperator<ClusteringCommitEven
*/
private transient NonThrownExecutor executor;

public ClusteringOperator(Configuration conf, RowType rowType) {
/**
* Parameters for this function to access the context of containing StreamOperator
*/
private transient StreamOperatorParameters<ClusteringCommitEvent> parameters;

private ClusteringFunction(Configuration conf, RowType rowType) {
this.conf = conf;
this.rowType = rowType;
this.asyncClustering = OptionsResolver.needsAsyncClustering(conf);
this.sortClusteringEnabled = OptionsResolver.sortClusteringEnabled(conf);
}

public void setStreamOperatorParameters(StreamOperatorParameters<ClusteringCommitEvent> parameters) {
this.parameters = parameters;
}

@Override
public void open() throws Exception {
super.open();
public void open(Configuration parameters) throws Exception {
super.open(parameters);

this.taskID = getRuntimeContext().getIndexOfThisSubtask();
this.writeConfig = StreamerUtil.getHoodieClientConfig(this.conf);
Expand All @@ -155,24 +169,21 @@ public void open() throws Exception {
if (this.asyncClustering) {
this.executor = NonThrownExecutor.builder(LOG).build();
}

collector = new StreamRecordCollector<>(output);
}

@Override
public void processElement(StreamRecord<ClusteringPlanEvent> element) throws Exception {
ClusteringPlanEvent event = element.getValue();
public void asyncInvoke(final ClusteringPlanEvent event, final ResultFuture<ClusteringCommitEvent> resultFuture) throws Exception {
final String instantTime = event.getClusteringInstantTime();
if (this.asyncClustering) {
// executes the compaction task asynchronously to not block the checkpoint barrier propagate.
executor.execute(
() -> doClustering(instantTime, event),
(errMsg, t) -> collector.collect(new ClusteringCommitEvent(instantTime, taskID)),
() -> doClustering(instantTime, event, resultFuture),
(errMsg, t) -> resultFuture.complete(Collections.singletonList(new ClusteringCommitEvent(instantTime, taskID))),
"Execute clustering for instant %s from task %d", instantTime, taskID);
} else {
// executes the clustering task synchronously for batch mode.
LOG.info("Execute clustering for instant {} from task {}", instantTime, taskID);
doClustering(instantTime, event);
doClustering(instantTime, event, resultFuture);
}
}

Expand All @@ -182,20 +193,17 @@ public void close() {
this.writeClient.cleanHandlesGracefully();
this.writeClient.close();
}
}

/**
* End input action for batch source.
*/
public void endInput() {
// no operation
if (this.sorter != null) {
this.sorter.close();
}
}

// -------------------------------------------------------------------------
// Utilities
// -------------------------------------------------------------------------

private void doClustering(String instantTime, ClusteringPlanEvent event) throws Exception {
private void doClustering(String instantTime, ClusteringPlanEvent event,
ResultFuture<ClusteringCommitEvent> resultFuture) throws Exception {
final ClusteringGroupInfo clusteringGroupInfo = event.getClusteringGroupInfo();

initWriterHelper(instantTime);
Expand Down Expand Up @@ -232,7 +240,7 @@ private void doClustering(String instantTime, ClusteringPlanEvent event) throws
}

List<WriteStatus> writeStatuses = this.writerHelper.getWriteStatuses(this.taskID);
collector.collect(new ClusteringCommitEvent(instantTime, writeStatuses, this.taskID));
resultFuture.complete(Collections.singletonList(new ClusteringCommitEvent(instantTime, writeStatuses, this.taskID)));
this.writerHelper = null;
}

Expand Down Expand Up @@ -334,28 +342,39 @@ private int[] getRequiredPositions() {
}

private void initSorter() {
ClassLoader cl = getContainingTask().getUserCodeClassLoader();
StreamTask<?, ?> containingTask = parameters.getContainingTask();
Environment environment = containingTask.getEnvironment();
ClassLoader cl = containingTask.getUserCodeClassLoader();
NormalizedKeyComputer computer = createSortCodeGenerator().generateNormalizedKeyComputer("SortComputer").newInstance(cl);
RecordComparator comparator = createSortCodeGenerator().generateRecordComparator("SortComparator").newInstance(cl);

MemoryManager memManager = getContainingTask().getEnvironment().getMemoryManager();
MemoryManager memManager = environment.getMemoryManager();
this.sorter =
new BinaryExternalSorter(
this.getContainingTask(),
containingTask,
memManager,
computeMemorySize(),
this.getContainingTask().getEnvironment().getIOManager(),
environment.getIOManager(),
(AbstractRowDataSerializer) binarySerializer,
binarySerializer,
computer,
comparator,
getContainingTask().getJobConfiguration());
containingTask.getJobConfiguration());
this.sorter.startThreads();

// register the metrics.
getMetricGroup().gauge("memoryUsedSizeInBytes", (Gauge<Long>) sorter::getUsedMemoryInBytes);
getMetricGroup().gauge("numSpillFiles", (Gauge<Long>) sorter::getNumSpillFiles);
getMetricGroup().gauge("spillInBytes", (Gauge<Long>) sorter::getSpillInBytes);
environment.getMetricGroup().gauge("memoryUsedSizeInBytes", (Gauge<Long>) sorter::getUsedMemoryInBytes);
environment.getMetricGroup().gauge("numSpillFiles", (Gauge<Long>) sorter::getNumSpillFiles);
environment.getMetricGroup().gauge("spillInBytes", (Gauge<Long>) sorter::getSpillInBytes);
}

private long computeMemorySize() {
Environment environment = parameters.getContainingTask().getEnvironment();
return environment.getMemoryManager().computeMemorySize(
parameters.getStreamConfig().getManagedMemoryFractionOperatorUseCaseOfSlot(
ManagedMemoryUseCase.OPERATOR, environment.getTaskManagerInfo().getConfiguration(), environment.getUserCodeClassLoader().asClassLoader()
)
);
}

private SortCodeGenerator createSortCodeGenerator() {
Expand All @@ -369,8 +388,26 @@ public void setExecutor(NonThrownExecutor executor) {
this.executor = executor;
}

@VisibleForTesting
public void setOutput(Output<StreamRecord<ClusteringCommitEvent>> output) {
this.output = output;
public static class ClusteringOperatorFactory extends AsyncWaitOperatorFactory<ClusteringPlanEvent, ClusteringCommitEvent>
implements OneInputStreamOperatorFactory<ClusteringPlanEvent, ClusteringCommitEvent>, YieldingOperatorFactory<ClusteringCommitEvent> {

public ClusteringOperatorFactory(Configuration conf, RowType rowType) {
super(new ClusteringFunction(conf, rowType), 0, 1, AsyncDataStream.OutputMode.ORDERED);
}

@SuppressWarnings("unchecked")
@Override
public <T extends StreamOperator<ClusteringCommitEvent>> T createStreamOperator(
StreamOperatorParameters<ClusteringCommitEvent> parameters) {
AbstractUdfStreamOperator<ClusteringCommitEvent, ClusteringFunction> streamOperator = super.createStreamOperator(parameters);
ClusteringFunction clusteringFunction = streamOperator.getUserFunction();
clusteringFunction.setStreamOperatorParameters(parameters);
return (T) streamOperator;
}

@Override
public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) {
return super.getStreamOperatorClass(classLoader);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ private void cluster() throws Exception {
.rebalance()
.transform("clustering_task",
TypeInformation.of(ClusteringCommitEvent.class),
new ClusteringOperator(conf, rowType))
new ClusteringFunction.ClusteringOperatorFactory(conf, rowType))
.setParallelism(clusteringParallelism);

ExecNodeUtil.setManagedMemoryWeight(dataStream.getTransformation(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,19 +31,20 @@

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Collections;
import java.util.List;

/**
* Function to execute the actual compaction task assigned by the compaction plan task.
* In order to execute scalable, the input should shuffle by the compact event {@link CompactionPlanEvent}.
*/
public class CompactFunction extends ProcessFunction<CompactionPlanEvent, CompactionCommitEvent> {
public class CompactFunction extends RichAsyncFunction<CompactionPlanEvent, CompactionCommitEvent> {
private static final Logger LOG = LoggerFactory.getLogger(CompactFunction.class);

/**
Expand Down Expand Up @@ -86,25 +87,27 @@ public void open(Configuration parameters) throws Exception {
}

@Override
public void processElement(CompactionPlanEvent event, Context context, Collector<CompactionCommitEvent> collector) throws Exception {
public void asyncInvoke(final CompactionPlanEvent event, final ResultFuture<CompactionCommitEvent> resultFuture) throws Exception {
final String instantTime = event.getCompactionInstantTime();
final CompactionOperation compactionOperation = event.getOperation();
if (asyncCompaction) {
// executes the compaction task asynchronously to not block the checkpoint barrier propagate.
executor.execute(
() -> doCompaction(instantTime, compactionOperation, collector, reloadWriteConfig()),
(errMsg, t) -> collector.collect(new CompactionCommitEvent(instantTime, compactionOperation.getFileId(), taskID)),
() -> doCompaction(instantTime, compactionOperation, resultFuture, reloadWriteConfig()),
(errMsg, t) -> resultFuture.complete(
Collections.singletonList(new CompactionCommitEvent(instantTime, compactionOperation.getFileId(), taskID))
),
"Execute compaction for instant %s from task %d", instantTime, taskID);
} else {
// executes the compaction task synchronously for batch mode.
LOG.info("Execute compaction for instant {} from task {}", instantTime, taskID);
doCompaction(instantTime, compactionOperation, collector, writeClient.getConfig());
doCompaction(instantTime, compactionOperation, resultFuture, writeClient.getConfig());
}
}

private void doCompaction(String instantTime,
CompactionOperation compactionOperation,
Collector<CompactionCommitEvent> collector,
ResultFuture<CompactionCommitEvent> resultFuture,
HoodieWriteConfig writeConfig) throws IOException {
HoodieFlinkMergeOnReadTableCompactor<?> compactor = new HoodieFlinkMergeOnReadTableCompactor<>();
List<WriteStatus> writeStatuses = compactor.compact(
Expand All @@ -117,7 +120,9 @@ private void doCompaction(String instantTime,
compactionOperation,
instantTime,
writeClient.getHoodieTable().getTaskContextSupplier());
collector.collect(new CompactionCommitEvent(instantTime, compactionOperation.getFileId(), writeStatuses, taskID));
resultFuture.complete(
Collections.singletonList(new CompactionCommitEvent(instantTime, compactionOperation.getFileId(), writeStatuses, taskID))
);
}

private HoodieWriteConfig reloadWriteConfig() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,9 @@
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.operators.ProcessOperator;
import org.apache.flink.streaming.api.operators.async.AsyncWaitOperatorFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -295,7 +296,7 @@ private void compact() throws Exception {
.rebalance()
.transform("compact_task",
TypeInformation.of(CompactionCommitEvent.class),
new ProcessOperator<>(new CompactFunction(conf)))
new AsyncWaitOperatorFactory<>(new CompactFunction(conf), 0, 1, AsyncDataStream.OutputMode.ORDERED))
.setParallelism(compactionParallelism)
.addSink(new CompactionCommitSink(conf))
.name("compaction_commit")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
import org.apache.hudi.sink.bulk.sort.SortOperatorGen;
import org.apache.hudi.sink.clustering.ClusteringCommitEvent;
import org.apache.hudi.sink.clustering.ClusteringCommitSink;
import org.apache.hudi.sink.clustering.ClusteringOperator;
import org.apache.hudi.sink.clustering.ClusteringFunction;
import org.apache.hudi.sink.clustering.ClusteringPlanEvent;
import org.apache.hudi.sink.clustering.ClusteringPlanOperator;
import org.apache.hudi.sink.common.WriteOperatorFactory;
Expand All @@ -53,11 +53,12 @@
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
import org.apache.flink.streaming.api.operators.ProcessOperator;
import org.apache.flink.streaming.api.operators.async.AsyncWaitOperatorFactory;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
Expand Down Expand Up @@ -372,7 +373,7 @@ public static DataStreamSink<CompactionCommitEvent> compact(Configuration conf,
.keyBy(plan -> plan.getOperation().getFileGroupId().getFileId())
.transform("compact_task",
TypeInformation.of(CompactionCommitEvent.class),
new ProcessOperator<>(new CompactFunction(conf)))
new AsyncWaitOperatorFactory<>(new CompactFunction(conf), 0, 1, AsyncDataStream.OutputMode.ORDERED))
.setParallelism(conf.getInteger(FlinkOptions.COMPACTION_TASKS))
.addSink(new CompactionCommitSink(conf))
.name("compact_commit")
Expand Down Expand Up @@ -412,7 +413,7 @@ public static DataStreamSink<ClusteringCommitEvent> cluster(Configuration conf,
.stream().map(ClusteringOperation::getFileId).collect(Collectors.joining()))
.transform("clustering_task",
TypeInformation.of(ClusteringCommitEvent.class),
new ClusteringOperator(conf, rowType))
new ClusteringFunction.ClusteringOperatorFactory(conf, rowType))
.setParallelism(conf.getInteger(FlinkOptions.CLUSTERING_TASKS))
.addSink(new ClusteringCommitSink(conf))
.name("clustering_commit")
Expand Down
Loading