Skip to content

Commit

Permalink
[HUDI-3963] Use Lock-Free Message Queue Disruptor Improving Hoodie Wr…
Browse files Browse the repository at this point in the history
…iting Efficiency (apache#5416)

https://issues.apache.org/jira/browse/HUDI-3963
RFC design : apache#5567

Add Lock-Free executor to improve hoodie writing throughput and optimize execution efficiency.
Disruptor linked: https://lmax-exchange.github.io/disruptor/user-guide/index.html#_introduction. Existing BoundedInMemory is the default. Users can enable on a need basis. 

Co-authored-by: yuezhang <yuezhang@freewheel.tv>
  • Loading branch information
2 people authored and fengjian committed Apr 5, 2023
1 parent ab1d14a commit 75de5b8
Show file tree
Hide file tree
Showing 34 changed files with 1,631 additions and 178 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.queue.ExecutorType;
import org.apache.hudi.config.metrics.HoodieMetricsCloudWatchConfig;
import org.apache.hudi.config.metrics.HoodieMetricsConfig;
import org.apache.hudi.config.metrics.HoodieMetricsDatadogConfig;
Expand Down Expand Up @@ -84,12 +85,14 @@
import java.io.InputStream;
import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import static org.apache.hudi.common.util.queue.ExecutorType.BOUNDED_IN_MEMORY;
import static org.apache.hudi.config.HoodieCleanConfig.CLEANER_POLICY;

/**
Expand Down Expand Up @@ -132,6 +135,14 @@ public class HoodieWriteConfig extends HoodieConfig {
.withDocumentation("Key generator class, that implements `org.apache.hudi.keygen.KeyGenerator` "
+ "extract a key out of incoming records.");

public static final ConfigProperty<String> EXECUTOR_TYPE = ConfigProperty
.key("hoodie.write.executor.type")
.defaultValue(BOUNDED_IN_MEMORY.name())
.withDocumentation("Set executor which orchestrates concurrent producers and consumers communicating through a message queue."
+ "default value is BOUNDED_IN_MEMORY which use a bounded in-memory queue using LinkedBlockingQueue."
+ "Also users could use DISRUPTOR, which use disruptor as a lock free message queue "
+ "to gain better writing performance if lock was the bottleneck. Although DISRUPTOR_EXECUTOR is still an experimental feature.");

public static final ConfigProperty<String> KEYGENERATOR_TYPE = ConfigProperty
.key("hoodie.datasource.write.keygenerator.type")
.defaultValue(KeyGeneratorType.SIMPLE.name())
Expand Down Expand Up @@ -233,6 +244,19 @@ public class HoodieWriteConfig extends HoodieConfig {
.defaultValue(String.valueOf(4 * 1024 * 1024))
.withDocumentation("Size of in-memory buffer used for parallelizing network reads and lake storage writes.");

public static final ConfigProperty<String> WRITE_DISRUPTOR_BUFFER_SIZE = ConfigProperty
.key("hoodie.write.executor.disruptor.buffer.size")
.defaultValue(String.valueOf(1024))
.withDocumentation("The size of the Disruptor Executor ring buffer, must be power of 2");

public static final ConfigProperty<String> WRITE_WAIT_STRATEGY = ConfigProperty
.key("hoodie.write.executor.disruptor.wait.strategy")
.defaultValue("BLOCKING_WAIT")
.withDocumentation("Strategy employed for making Disruptor Executor wait on a cursor. Other options are "
+ "SLEEPING_WAIT, it attempts to be conservative with CPU usage by using a simple busy wait loop"
+ "YIELDING_WAIT, it is designed for cases where there is the option to burn CPU cycles with the goal of improving latency"
+ "BUSY_SPIN_WAIT, it can be used in low-latency systems, but puts the highest constraints on the deployment environment");

public static final ConfigProperty<String> COMBINE_BEFORE_INSERT = ConfigProperty
.key("hoodie.combine.before.insert")
.defaultValue("false")
Expand Down Expand Up @@ -975,6 +999,10 @@ public String getKeyGeneratorClass() {
return getString(KEYGENERATOR_CLASS_NAME);
}

public ExecutorType getExecutorType() {
return ExecutorType.valueOf(getString(EXECUTOR_TYPE).toUpperCase(Locale.ROOT));
}

public boolean isCDCEnabled() {
return getBooleanOrDefault(
HoodieTableConfig.CDC_ENABLED, HoodieTableConfig.CDC_ENABLED.defaultValue());
Expand Down Expand Up @@ -1040,6 +1068,14 @@ public int getWriteBufferLimitBytes() {
return Integer.parseInt(getStringOrDefault(WRITE_BUFFER_LIMIT_BYTES_VALUE));
}

public Option<String> getWriteExecutorWaitStrategy() {
return Option.of(getString(WRITE_WAIT_STRATEGY));
}

public Option<Integer> getDisruptorWriteBufferSize() {
return Option.of(Integer.parseInt(getStringOrDefault(WRITE_DISRUPTOR_BUFFER_SIZE)));
}

public boolean shouldCombineBeforeInsert() {
return getBoolean(COMBINE_BEFORE_INSERT);
}
Expand Down Expand Up @@ -2287,6 +2323,11 @@ public Builder withKeyGenerator(String keyGeneratorClass) {
return this;
}

public Builder withExecutorType(String executorClass) {
writeConfig.setValue(EXECUTOR_TYPE, executorClass);
return this;
}

public Builder withTimelineLayoutVersion(int version) {
writeConfig.setValue(TIMELINE_LAYOUT_VERSION_NUM, String.valueOf(version));
return this;
Expand Down Expand Up @@ -2333,6 +2374,16 @@ public Builder withWriteBufferLimitBytes(int writeBufferLimit) {
return this;
}

public Builder withWriteWaitStrategy(String waitStrategy) {
writeConfig.setValue(WRITE_WAIT_STRATEGY, String.valueOf(waitStrategy));
return this;
}

public Builder withWriteBufferSize(int size) {
writeConfig.setValue(WRITE_DISRUPTOR_BUFFER_SIZE, String.valueOf(size));
return this;
}

public Builder combineInput(boolean onInsert, boolean onUpsert) {
writeConfig.setValue(COMBINE_BEFORE_INSERT, String.valueOf(onInsert));
writeConfig.setValue(COMBINE_BEFORE_UPSERT, String.valueOf(onUpsert));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer;
import org.apache.hudi.common.util.queue.IteratorBasedQueueConsumer;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.execution.HoodieLazyInsertIterable.HoodieInsertValueGenResult;
import org.apache.hudi.io.HoodieWriteHandle;
Expand All @@ -38,7 +38,7 @@
* Consumes stream of hoodie records from in-memory queue and writes to one or more create-handles.
*/
public class CopyOnWriteInsertHandler<T extends HoodieRecordPayload>
extends BoundedInMemoryQueueConsumer<HoodieInsertValueGenResult<HoodieRecord>, List<WriteStatus>> {
extends IteratorBasedQueueConsumer<HoodieInsertValueGenResult<HoodieRecord>, List<WriteStatus>> {

private HoodieWriteConfig config;
private String instantTime;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer;
import org.apache.hudi.common.util.queue.IteratorBasedQueueConsumer;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.io.HoodieBootstrapHandle;

Expand All @@ -29,7 +29,7 @@
/**
* Consumer that dequeues records from queue and sends to Merge Handle for writing.
*/
public class BootstrapRecordConsumer extends BoundedInMemoryQueueConsumer<HoodieRecord, Void> {
public class BootstrapRecordConsumer extends IteratorBasedQueueConsumer<HoodieRecord, Void> {

private final HoodieBootstrapHandle bootstrapHandle;

Expand All @@ -38,7 +38,7 @@ public BootstrapRecordConsumer(HoodieBootstrapHandle bootstrapHandle) {
}

@Override
protected void consumeOneRecord(HoodieRecord record) {
public void consumeOneRecord(HoodieRecord record) {
try {
bootstrapHandle.write(record, ((HoodieRecordPayload) record.getData())
.getInsertValue(bootstrapHandle.getWriterSchemaWithMetaFields()));
Expand All @@ -48,7 +48,7 @@ protected void consumeOneRecord(HoodieRecord record) {
}

@Override
protected void finish() {}
public void finish() {}

@Override
protected Void getResult() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.apache.hudi.client.utils.MergingIterator;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer;
import org.apache.hudi.common.util.queue.IteratorBasedQueueConsumer;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.io.HoodieMergeHandle;
import org.apache.hudi.io.storage.HoodieFileReader;
Expand Down Expand Up @@ -109,7 +109,7 @@ protected Iterator<GenericRecord> getMergingIterator(HoodieTable<T, I, K, O> tab
/**
* Consumer that dequeues records from queue and sends to Merge Handle.
*/
protected static class UpdateHandler extends BoundedInMemoryQueueConsumer<GenericRecord, Void> {
protected static class UpdateHandler extends IteratorBasedQueueConsumer<GenericRecord, Void> {

private final HoodieMergeHandle upsertHandle;

Expand All @@ -118,12 +118,12 @@ protected UpdateHandler(HoodieMergeHandle upsertHandle) {
}

@Override
protected void consumeOneRecord(GenericRecord record) {
public void consumeOneRecord(GenericRecord record) {
upsertHandle.write(record);
}

@Override
protected void finish() {}
public void finish() {}

@Override
protected Void getResult() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer;
import org.apache.hudi.common.util.queue.IteratorBasedQueueConsumer;
import org.apache.hudi.io.HoodieWriteHandle;

import java.util.ArrayList;
Expand All @@ -31,7 +31,7 @@
* Consumes stream of hoodie records from in-memory queue and writes to one explicit create handle.
*/
public class ExplicitWriteHandler<T extends HoodieRecordPayload>
extends BoundedInMemoryQueueConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>, List<WriteStatus>> {
extends IteratorBasedQueueConsumer<HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord>, List<WriteStatus>> {

private final List<WriteStatus> statuses = new ArrayList<>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,14 @@
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor;
import org.apache.hudi.common.util.queue.HoodieExecutor;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.io.WriteHandleFactory;
import org.apache.hudi.table.HoodieTable;

import org.apache.avro.Schema;
import org.apache.hudi.util.QueueBasedExecutorFactory;

import java.util.Iterator;
import java.util.List;
Expand Down Expand Up @@ -77,16 +78,16 @@ public SparkLazyInsertIterable(Iterator<HoodieRecord<T>> recordItr,
@Override
protected List<WriteStatus> computeNext() {
// Executor service used for launching writer thread.
BoundedInMemoryExecutor<HoodieRecord<T>, HoodieInsertValueGenResult<HoodieRecord>, List<WriteStatus>> bufferedIteratorExecutor =
null;
HoodieExecutor<?, ?, List<WriteStatus>> bufferedIteratorExecutor = null;
try {
Schema schema = new Schema.Parser().parse(hoodieConfig.getSchema());
if (useWriterSchema) {
schema = HoodieAvroUtils.addMetadataFields(schema);
}
bufferedIteratorExecutor =
new BoundedInMemoryExecutor<>(hoodieConfig.getWriteBufferLimitBytes(), inputItr, getInsertHandler(),
getTransformFunction(schema, hoodieConfig), hoodieTable.getPreExecuteRunnable());

bufferedIteratorExecutor = QueueBasedExecutorFactory.create(hoodieConfig, inputItr, getInsertHandler(),
getTransformFunction(schema, hoodieConfig), hoodieTable.getPreExecuteRunnable());

final List<WriteStatus> result = bufferedIteratorExecutor.execute();
assert result != null && !result.isEmpty() && !bufferedIteratorExecutor.isRemaining();
return result;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.util;

import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor;
import org.apache.hudi.common.util.queue.DisruptorExecutor;
import org.apache.hudi.common.util.queue.ExecutorType;
import org.apache.hudi.common.util.queue.HoodieExecutor;
import org.apache.hudi.common.util.queue.IteratorBasedQueueConsumer;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;

import java.util.Iterator;
import java.util.function.Function;

public class QueueBasedExecutorFactory {

/**
* Create a new hoodie executor instance on demand.
*/
public static <I, O, E> HoodieExecutor create(HoodieWriteConfig hoodieConfig, Iterator<I> inputItr, IteratorBasedQueueConsumer<O, E> consumer,
Function<I, O> transformFunction, Runnable preExecuteRunnable) {
ExecutorType executorType = hoodieConfig.getExecutorType();

switch (executorType) {
case BOUNDED_IN_MEMORY:
return new BoundedInMemoryExecutor<>(hoodieConfig.getWriteBufferLimitBytes(), inputItr, consumer,
transformFunction, preExecuteRunnable);
case DISRUPTOR:
return new DisruptorExecutor<>(hoodieConfig.getDisruptorWriteBufferSize(), inputItr, consumer,
transformFunction, hoodieConfig.getWriteExecutorWaitStrategy(), preExecuteRunnable);
default:
throw new HoodieException("Unsupported Executor Type " + executorType);
}
}
}
Loading

0 comments on commit 75de5b8

Please sign in to comment.