From 75de5b83931ca6d73c2ae68aeaa2b9bacd2b663f Mon Sep 17 00:00:00 2001 From: YueZhang <69956021+zhangyue19921010@users.noreply.github.com> Date: Thu, 3 Nov 2022 08:02:18 +0800 Subject: [PATCH] [HUDI-3963] Use Lock-Free Message Queue Disruptor Improving Hoodie Writing Efficiency (#5416) https://issues.apache.org/jira/browse/HUDI-3963 RFC design : #5567 Add Lock-Free executor to improve hoodie writing throughput and optimize execution efficiency. Disruptor linked: https://lmax-exchange.github.io/disruptor/user-guide/index.html#_introduction. Existing BoundedInMemory is the default. Users can enable on a need basis. Co-authored-by: yuezhang --- .../apache/hudi/config/HoodieWriteConfig.java | 51 +++ .../execution/CopyOnWriteInsertHandler.java | 4 +- .../bootstrap/BootstrapRecordConsumer.java | 8 +- .../table/action/commit/BaseMergeHelper.java | 8 +- .../hudi/execution/ExplicitWriteHandler.java | 4 +- .../execution/SparkLazyInsertIterable.java | 13 +- .../hudi/util/QueueBasedExecutorFactory.java | 52 +++ .../TestBoundedInMemoryExecutorInSpark.java | 37 +- .../execution/TestBoundedInMemoryQueue.java | 26 +- .../TestDisruptorExecutionInSpark.java | 159 +++++++++ .../execution/TestDisruptorMessageQueue.java | 337 ++++++++++++++++++ hudi-common/pom.xml | 6 + .../common/util/CustomizedThreadFactory.java | 22 +- .../common/util/ParquetReaderIterator.java | 4 +- .../util/queue/BoundedInMemoryExecutor.java | 143 +++----- ...java => BoundedInMemoryQueueIterable.java} | 24 +- .../common/util/queue/DisruptorExecutor.java | 129 +++++++ .../util/queue/DisruptorMessageQueue.java | 136 +++++++ .../util/queue/DisruptorWaitStrategyType.java | 64 ++++ .../hudi/common/util/queue/ExecutorType.java | 49 +++ .../queue/FunctionBasedQueueProducer.java | 8 +- .../common/util/queue/HoodieConsumer.java | 30 ++ .../common/util/queue/HoodieExecutor.java | 44 +++ .../common/util/queue/HoodieExecutorBase.java | 143 ++++++++ .../queue/HoodieIterableMessageQueue.java | 29 ++ .../common/util/queue/HoodieMessageQueue.java | 53 +++ ...QueueProducer.java => HoodieProducer.java} | 8 +- ...r.java => IteratorBasedQueueConsumer.java} | 12 +- .../queue/IteratorBasedQueueProducer.java | 4 +- .../util/queue/WaitStrategyFactory.java | 54 +++ .../apache/hudi/table/format/FormatUtils.java | 6 +- .../RealtimeUnmergedRecordReader.java | 6 +- .../BoundInMemoryExecutorBenchmark.scala | 135 +++++++ pom.xml | 1 + 34 files changed, 1631 insertions(+), 178 deletions(-) create mode 100644 hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/util/QueueBasedExecutorFactory.java create mode 100644 hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestDisruptorExecutionInSpark.java create mode 100644 hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestDisruptorMessageQueue.java rename hudi-common/src/main/java/org/apache/hudi/common/util/queue/{BoundedInMemoryQueue.java => BoundedInMemoryQueueIterable.java} (94%) create mode 100644 hudi-common/src/main/java/org/apache/hudi/common/util/queue/DisruptorExecutor.java create mode 100644 hudi-common/src/main/java/org/apache/hudi/common/util/queue/DisruptorMessageQueue.java create mode 100644 hudi-common/src/main/java/org/apache/hudi/common/util/queue/DisruptorWaitStrategyType.java create mode 100644 hudi-common/src/main/java/org/apache/hudi/common/util/queue/ExecutorType.java create mode 100644 hudi-common/src/main/java/org/apache/hudi/common/util/queue/HoodieConsumer.java create mode 100644 hudi-common/src/main/java/org/apache/hudi/common/util/queue/HoodieExecutor.java create mode 100644 hudi-common/src/main/java/org/apache/hudi/common/util/queue/HoodieExecutorBase.java create mode 100644 hudi-common/src/main/java/org/apache/hudi/common/util/queue/HoodieIterableMessageQueue.java create mode 100644 hudi-common/src/main/java/org/apache/hudi/common/util/queue/HoodieMessageQueue.java rename hudi-common/src/main/java/org/apache/hudi/common/util/queue/{BoundedInMemoryQueueProducer.java => HoodieProducer.java} (76%) rename hudi-common/src/main/java/org/apache/hudi/common/util/queue/{BoundedInMemoryQueueConsumer.java => IteratorBasedQueueConsumer.java} (81%) create mode 100644 hudi-common/src/main/java/org/apache/hudi/common/util/queue/WaitStrategyFactory.java create mode 100644 hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/BoundInMemoryExecutorBenchmark.scala diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index 0c0966bc5f82f..514a4e38dc72c 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -49,6 +49,7 @@ import org.apache.hudi.common.util.ReflectionUtils; import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.common.util.queue.ExecutorType; import org.apache.hudi.config.metrics.HoodieMetricsCloudWatchConfig; import org.apache.hudi.config.metrics.HoodieMetricsConfig; import org.apache.hudi.config.metrics.HoodieMetricsDatadogConfig; @@ -84,12 +85,14 @@ import java.io.InputStream; import java.util.Arrays; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Objects; import java.util.Properties; import java.util.function.Supplier; import java.util.stream.Collectors; +import static org.apache.hudi.common.util.queue.ExecutorType.BOUNDED_IN_MEMORY; import static org.apache.hudi.config.HoodieCleanConfig.CLEANER_POLICY; /** @@ -132,6 +135,14 @@ public class HoodieWriteConfig extends HoodieConfig { .withDocumentation("Key generator class, that implements `org.apache.hudi.keygen.KeyGenerator` " + "extract a key out of incoming records."); + public static final ConfigProperty EXECUTOR_TYPE = ConfigProperty + .key("hoodie.write.executor.type") + .defaultValue(BOUNDED_IN_MEMORY.name()) + .withDocumentation("Set executor which orchestrates concurrent producers and consumers communicating through a message queue." + + "default value is BOUNDED_IN_MEMORY which use a bounded in-memory queue using LinkedBlockingQueue." + + "Also users could use DISRUPTOR, which use disruptor as a lock free message queue " + + "to gain better writing performance if lock was the bottleneck. Although DISRUPTOR_EXECUTOR is still an experimental feature."); + public static final ConfigProperty KEYGENERATOR_TYPE = ConfigProperty .key("hoodie.datasource.write.keygenerator.type") .defaultValue(KeyGeneratorType.SIMPLE.name()) @@ -233,6 +244,19 @@ public class HoodieWriteConfig extends HoodieConfig { .defaultValue(String.valueOf(4 * 1024 * 1024)) .withDocumentation("Size of in-memory buffer used for parallelizing network reads and lake storage writes."); + public static final ConfigProperty WRITE_DISRUPTOR_BUFFER_SIZE = ConfigProperty + .key("hoodie.write.executor.disruptor.buffer.size") + .defaultValue(String.valueOf(1024)) + .withDocumentation("The size of the Disruptor Executor ring buffer, must be power of 2"); + + public static final ConfigProperty WRITE_WAIT_STRATEGY = ConfigProperty + .key("hoodie.write.executor.disruptor.wait.strategy") + .defaultValue("BLOCKING_WAIT") + .withDocumentation("Strategy employed for making Disruptor Executor wait on a cursor. Other options are " + + "SLEEPING_WAIT, it attempts to be conservative with CPU usage by using a simple busy wait loop" + + "YIELDING_WAIT, it is designed for cases where there is the option to burn CPU cycles with the goal of improving latency" + + "BUSY_SPIN_WAIT, it can be used in low-latency systems, but puts the highest constraints on the deployment environment"); + public static final ConfigProperty COMBINE_BEFORE_INSERT = ConfigProperty .key("hoodie.combine.before.insert") .defaultValue("false") @@ -975,6 +999,10 @@ public String getKeyGeneratorClass() { return getString(KEYGENERATOR_CLASS_NAME); } + public ExecutorType getExecutorType() { + return ExecutorType.valueOf(getString(EXECUTOR_TYPE).toUpperCase(Locale.ROOT)); + } + public boolean isCDCEnabled() { return getBooleanOrDefault( HoodieTableConfig.CDC_ENABLED, HoodieTableConfig.CDC_ENABLED.defaultValue()); @@ -1040,6 +1068,14 @@ public int getWriteBufferLimitBytes() { return Integer.parseInt(getStringOrDefault(WRITE_BUFFER_LIMIT_BYTES_VALUE)); } + public Option getWriteExecutorWaitStrategy() { + return Option.of(getString(WRITE_WAIT_STRATEGY)); + } + + public Option getDisruptorWriteBufferSize() { + return Option.of(Integer.parseInt(getStringOrDefault(WRITE_DISRUPTOR_BUFFER_SIZE))); + } + public boolean shouldCombineBeforeInsert() { return getBoolean(COMBINE_BEFORE_INSERT); } @@ -2287,6 +2323,11 @@ public Builder withKeyGenerator(String keyGeneratorClass) { return this; } + public Builder withExecutorType(String executorClass) { + writeConfig.setValue(EXECUTOR_TYPE, executorClass); + return this; + } + public Builder withTimelineLayoutVersion(int version) { writeConfig.setValue(TIMELINE_LAYOUT_VERSION_NUM, String.valueOf(version)); return this; @@ -2333,6 +2374,16 @@ public Builder withWriteBufferLimitBytes(int writeBufferLimit) { return this; } + public Builder withWriteWaitStrategy(String waitStrategy) { + writeConfig.setValue(WRITE_WAIT_STRATEGY, String.valueOf(waitStrategy)); + return this; + } + + public Builder withWriteBufferSize(int size) { + writeConfig.setValue(WRITE_DISRUPTOR_BUFFER_SIZE, String.valueOf(size)); + return this; + } + public Builder combineInput(boolean onInsert, boolean onUpsert) { writeConfig.setValue(COMBINE_BEFORE_INSERT, String.valueOf(onInsert)); writeConfig.setValue(COMBINE_BEFORE_UPSERT, String.valueOf(onUpsert)); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/CopyOnWriteInsertHandler.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/CopyOnWriteInsertHandler.java index 5e1f832b7f239..abb1122289453 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/CopyOnWriteInsertHandler.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/CopyOnWriteInsertHandler.java @@ -22,7 +22,7 @@ import org.apache.hudi.common.engine.TaskContextSupplier; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; -import org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer; +import org.apache.hudi.common.util.queue.IteratorBasedQueueConsumer; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.execution.HoodieLazyInsertIterable.HoodieInsertValueGenResult; import org.apache.hudi.io.HoodieWriteHandle; @@ -38,7 +38,7 @@ * Consumes stream of hoodie records from in-memory queue and writes to one or more create-handles. */ public class CopyOnWriteInsertHandler - extends BoundedInMemoryQueueConsumer, List> { + extends IteratorBasedQueueConsumer, List> { private HoodieWriteConfig config; private String instantTime; diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/bootstrap/BootstrapRecordConsumer.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/bootstrap/BootstrapRecordConsumer.java index 8966a5d51c7cb..76968c6108d96 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/bootstrap/BootstrapRecordConsumer.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/bootstrap/BootstrapRecordConsumer.java @@ -20,7 +20,7 @@ import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; -import org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer; +import org.apache.hudi.common.util.queue.IteratorBasedQueueConsumer; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.io.HoodieBootstrapHandle; @@ -29,7 +29,7 @@ /** * Consumer that dequeues records from queue and sends to Merge Handle for writing. */ -public class BootstrapRecordConsumer extends BoundedInMemoryQueueConsumer { +public class BootstrapRecordConsumer extends IteratorBasedQueueConsumer { private final HoodieBootstrapHandle bootstrapHandle; @@ -38,7 +38,7 @@ public BootstrapRecordConsumer(HoodieBootstrapHandle bootstrapHandle) { } @Override - protected void consumeOneRecord(HoodieRecord record) { + public void consumeOneRecord(HoodieRecord record) { try { bootstrapHandle.write(record, ((HoodieRecordPayload) record.getData()) .getInsertValue(bootstrapHandle.getWriterSchemaWithMetaFields())); @@ -48,7 +48,7 @@ protected void consumeOneRecord(HoodieRecord record) { } @Override - protected void finish() {} + public void finish() {} @Override protected Void getResult() { diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseMergeHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseMergeHelper.java index 5ead348140aa3..bd1c01958bb61 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseMergeHelper.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseMergeHelper.java @@ -22,7 +22,7 @@ import org.apache.hudi.client.utils.MergingIterator; import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieRecordPayload; -import org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer; +import org.apache.hudi.common.util.queue.IteratorBasedQueueConsumer; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.io.HoodieMergeHandle; import org.apache.hudi.io.storage.HoodieFileReader; @@ -109,7 +109,7 @@ protected Iterator getMergingIterator(HoodieTable tab /** * Consumer that dequeues records from queue and sends to Merge Handle. */ - protected static class UpdateHandler extends BoundedInMemoryQueueConsumer { + protected static class UpdateHandler extends IteratorBasedQueueConsumer { private final HoodieMergeHandle upsertHandle; @@ -118,12 +118,12 @@ protected UpdateHandler(HoodieMergeHandle upsertHandle) { } @Override - protected void consumeOneRecord(GenericRecord record) { + public void consumeOneRecord(GenericRecord record) { upsertHandle.write(record); } @Override - protected void finish() {} + public void finish() {} @Override protected Void getResult() { diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/execution/ExplicitWriteHandler.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/execution/ExplicitWriteHandler.java index 46eff587575cc..9dab2170e09c1 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/execution/ExplicitWriteHandler.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/execution/ExplicitWriteHandler.java @@ -21,7 +21,7 @@ import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; -import org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer; +import org.apache.hudi.common.util.queue.IteratorBasedQueueConsumer; import org.apache.hudi.io.HoodieWriteHandle; import java.util.ArrayList; @@ -31,7 +31,7 @@ * Consumes stream of hoodie records from in-memory queue and writes to one explicit create handle. */ public class ExplicitWriteHandler - extends BoundedInMemoryQueueConsumer, List> { + extends IteratorBasedQueueConsumer, List> { private final List statuses = new ArrayList<>(); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/SparkLazyInsertIterable.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/SparkLazyInsertIterable.java index df5bd2d3f458c..d2555f95980b7 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/SparkLazyInsertIterable.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/SparkLazyInsertIterable.java @@ -23,13 +23,14 @@ import org.apache.hudi.common.engine.TaskContextSupplier; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; -import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor; +import org.apache.hudi.common.util.queue.HoodieExecutor; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.io.WriteHandleFactory; import org.apache.hudi.table.HoodieTable; import org.apache.avro.Schema; +import org.apache.hudi.util.QueueBasedExecutorFactory; import java.util.Iterator; import java.util.List; @@ -77,16 +78,16 @@ public SparkLazyInsertIterable(Iterator> recordItr, @Override protected List computeNext() { // Executor service used for launching writer thread. - BoundedInMemoryExecutor, HoodieInsertValueGenResult, List> bufferedIteratorExecutor = - null; + HoodieExecutor> bufferedIteratorExecutor = null; try { Schema schema = new Schema.Parser().parse(hoodieConfig.getSchema()); if (useWriterSchema) { schema = HoodieAvroUtils.addMetadataFields(schema); } - bufferedIteratorExecutor = - new BoundedInMemoryExecutor<>(hoodieConfig.getWriteBufferLimitBytes(), inputItr, getInsertHandler(), - getTransformFunction(schema, hoodieConfig), hoodieTable.getPreExecuteRunnable()); + + bufferedIteratorExecutor = QueueBasedExecutorFactory.create(hoodieConfig, inputItr, getInsertHandler(), + getTransformFunction(schema, hoodieConfig), hoodieTable.getPreExecuteRunnable()); + final List result = bufferedIteratorExecutor.execute(); assert result != null && !result.isEmpty() && !bufferedIteratorExecutor.isRemaining(); return result; diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/util/QueueBasedExecutorFactory.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/util/QueueBasedExecutorFactory.java new file mode 100644 index 0000000000000..ba8ddbd1ec1f1 --- /dev/null +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/util/QueueBasedExecutorFactory.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.util; + +import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor; +import org.apache.hudi.common.util.queue.DisruptorExecutor; +import org.apache.hudi.common.util.queue.ExecutorType; +import org.apache.hudi.common.util.queue.HoodieExecutor; +import org.apache.hudi.common.util.queue.IteratorBasedQueueConsumer; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieException; + +import java.util.Iterator; +import java.util.function.Function; + +public class QueueBasedExecutorFactory { + + /** + * Create a new hoodie executor instance on demand. + */ + public static HoodieExecutor create(HoodieWriteConfig hoodieConfig, Iterator inputItr, IteratorBasedQueueConsumer consumer, + Function transformFunction, Runnable preExecuteRunnable) { + ExecutorType executorType = hoodieConfig.getExecutorType(); + + switch (executorType) { + case BOUNDED_IN_MEMORY: + return new BoundedInMemoryExecutor<>(hoodieConfig.getWriteBufferLimitBytes(), inputItr, consumer, + transformFunction, preExecuteRunnable); + case DISRUPTOR: + return new DisruptorExecutor<>(hoodieConfig.getDisruptorWriteBufferSize(), inputItr, consumer, + transformFunction, hoodieConfig.getWriteExecutorWaitStrategy(), preExecuteRunnable); + default: + throw new HoodieException("Unsupported Executor Type " + executorType); + } + } +} diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryExecutorInSpark.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryExecutorInSpark.java index a714d60d0033a..040634da4c335 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryExecutorInSpark.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryExecutorInSpark.java @@ -23,7 +23,7 @@ import org.apache.hudi.common.testutils.HoodieTestDataGenerator; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor; -import org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer; +import org.apache.hudi.common.util.queue.IteratorBasedQueueConsumer; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.testutils.HoodieClientTestHarness; @@ -71,24 +71,21 @@ private Runnable getPreExecuteRunnable() { @Test public void testExecutor() { - final List hoodieRecords = dataGen.generateInserts(instantTime, 100); + final int recordNumber = 100; + final List hoodieRecords = dataGen.generateInserts(instantTime, recordNumber); HoodieWriteConfig hoodieWriteConfig = mock(HoodieWriteConfig.class); when(hoodieWriteConfig.getWriteBufferLimitBytes()).thenReturn(1024); - BoundedInMemoryQueueConsumer, Integer> consumer = - new BoundedInMemoryQueueConsumer, Integer>() { + IteratorBasedQueueConsumer, Integer> consumer = + new IteratorBasedQueueConsumer, Integer>() { private int count = 0; @Override - protected void consumeOneRecord(HoodieLazyInsertIterable.HoodieInsertValueGenResult record) { + public void consumeOneRecord(HoodieLazyInsertIterable.HoodieInsertValueGenResult record) { count++; } - @Override - protected void finish() { - } - @Override protected Integer getResult() { return count; @@ -100,7 +97,7 @@ protected Integer getResult() { executor = new BoundedInMemoryExecutor(hoodieWriteConfig.getWriteBufferLimitBytes(), hoodieRecords.iterator(), consumer, getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA), getPreExecuteRunnable()); int result = executor.execute(); - // It should buffer and write 100 records + assertEquals(100, result); // There should be no remaining records in the buffer assertFalse(executor.isRemaining()); @@ -118,11 +115,11 @@ public void testInterruptExecutor() { HoodieWriteConfig hoodieWriteConfig = mock(HoodieWriteConfig.class); when(hoodieWriteConfig.getWriteBufferLimitBytes()).thenReturn(1024); - BoundedInMemoryQueueConsumer, Integer> consumer = - new BoundedInMemoryQueueConsumer, Integer>() { + IteratorBasedQueueConsumer, Integer> consumer = + new IteratorBasedQueueConsumer, Integer>() { @Override - protected void consumeOneRecord(HoodieLazyInsertIterable.HoodieInsertValueGenResult record) { + public void consumeOneRecord(HoodieLazyInsertIterable.HoodieInsertValueGenResult record) { try { while (true) { Thread.sleep(1000); @@ -132,10 +129,6 @@ protected void consumeOneRecord(HoodieLazyInsertIterable.HoodieInsertValueGenRes } } - @Override - protected void finish() { - } - @Override protected Integer getResult() { return 0; @@ -176,14 +169,10 @@ public GenericRecord next() { } }; - BoundedInMemoryQueueConsumer, Integer> consumer = - new BoundedInMemoryQueueConsumer, Integer>() { - @Override - protected void consumeOneRecord(HoodieLazyInsertIterable.HoodieInsertValueGenResult record) { - } - + IteratorBasedQueueConsumer, Integer> consumer = + new IteratorBasedQueueConsumer, Integer>() { @Override - protected void finish() { + public void consumeOneRecord(HoodieLazyInsertIterable.HoodieInsertValueGenResult record) { } @Override diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryQueue.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryQueue.java index 4707a68072e9a..c36554bb64a7e 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryQueue.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryQueue.java @@ -26,9 +26,9 @@ import org.apache.hudi.common.util.FileIOUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.SizeEstimator; -import org.apache.hudi.common.util.queue.BoundedInMemoryQueue; -import org.apache.hudi.common.util.queue.BoundedInMemoryQueueProducer; +import org.apache.hudi.common.util.queue.BoundedInMemoryQueueIterable; import org.apache.hudi.common.util.queue.FunctionBasedQueueProducer; +import org.apache.hudi.common.util.queue.HoodieProducer; import org.apache.hudi.common.util.queue.IteratorBasedQueueProducer; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.testutils.HoodieClientTestHarness; @@ -83,8 +83,8 @@ public void tearDown() throws Exception { public void testRecordReading() throws Exception { final int numRecords = 128; final List hoodieRecords = dataGen.generateInserts(instantTime, numRecords); - final BoundedInMemoryQueue queue = - new BoundedInMemoryQueue(FileIOUtils.KB, getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA)); + final BoundedInMemoryQueueIterable queue = + new BoundedInMemoryQueueIterable(FileIOUtils.KB, getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA)); // Produce Future resFuture = executorService.submit(() -> { new IteratorBasedQueueProducer<>(hoodieRecords.iterator()).produce(queue); @@ -123,8 +123,8 @@ public void testCompositeProducerRecordReading() throws Exception { final int numProducers = 40; final List> recs = new ArrayList<>(); - final BoundedInMemoryQueue queue = - new BoundedInMemoryQueue(FileIOUtils.KB, getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA)); + final BoundedInMemoryQueueIterable queue = + new BoundedInMemoryQueueIterable(FileIOUtils.KB, getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA)); // Record Key to Map> keyToProducerAndIndexMap = new HashMap<>(); @@ -140,7 +140,7 @@ public void testCompositeProducerRecordReading() throws Exception { recs.add(pRecs); } - List> producers = new ArrayList<>(); + List> producers = new ArrayList<>(); for (int i = 0; i < recs.size(); i++) { final List r = recs.get(i); // Alternate between pull and push based iterators @@ -222,8 +222,8 @@ public void testMemoryLimitForBuffering() throws Exception { getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA).apply((HoodieAvroRecord) hoodieRecords.get(0)); final long objSize = sizeEstimator.sizeEstimate(payload); final long memoryLimitInBytes = recordLimit * objSize; - final BoundedInMemoryQueue queue = - new BoundedInMemoryQueue(memoryLimitInBytes, getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA)); + final BoundedInMemoryQueueIterable queue = + new BoundedInMemoryQueueIterable(memoryLimitInBytes, getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA)); // Produce executorService.submit(() -> { @@ -275,8 +275,8 @@ public void testException() throws Exception { // first let us throw exception from queueIterator reader and test that queueing thread // stops and throws // correct exception back. - BoundedInMemoryQueue>> queue1 = - new BoundedInMemoryQueue(memoryLimitInBytes, getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA)); + BoundedInMemoryQueueIterable>> queue1 = + new BoundedInMemoryQueueIterable(memoryLimitInBytes, getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA)); // Produce Future resFuture = executorService.submit(() -> { @@ -303,8 +303,8 @@ public void testException() throws Exception { final Iterator mockHoodieRecordsIterator = mock(Iterator.class); when(mockHoodieRecordsIterator.hasNext()).thenReturn(true); when(mockHoodieRecordsIterator.next()).thenThrow(expectedException); - BoundedInMemoryQueue>> queue2 = - new BoundedInMemoryQueue(memoryLimitInBytes, getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA)); + BoundedInMemoryQueueIterable>> queue2 = + new BoundedInMemoryQueueIterable(memoryLimitInBytes, getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA)); // Produce Future res = executorService.submit(() -> { diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestDisruptorExecutionInSpark.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestDisruptorExecutionInSpark.java new file mode 100644 index 0000000000000..2351f2bbed6df --- /dev/null +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestDisruptorExecutionInSpark.java @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.execution; + +import static org.apache.hudi.execution.HoodieLazyInsertIterable.getTransformFunction; + +import org.apache.avro.generic.IndexedRecord; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.testutils.HoodieTestDataGenerator; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.queue.IteratorBasedQueueConsumer; +import org.apache.hudi.common.util.queue.DisruptorExecutor; +import org.apache.hudi.common.util.queue.WaitStrategyFactory; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.testutils.HoodieClientTestHarness; +import org.apache.spark.TaskContext; +import org.apache.spark.TaskContext$; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; + +import java.util.ArrayList; +import java.util.List; + +import scala.Tuple2; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class TestDisruptorExecutionInSpark extends HoodieClientTestHarness { + + private final String instantTime = HoodieActiveTimeline.createNewInstantTime(); + + @BeforeEach + public void setUp() throws Exception { + initTestDataGenerator(); + initExecutorServiceWithFixedThreadPool(2); + } + + @AfterEach + public void tearDown() throws Exception { + cleanupResources(); + } + + private Runnable getPreExecuteRunnable() { + final TaskContext taskContext = TaskContext.get(); + return () -> TaskContext$.MODULE$.setTaskContext(taskContext); + } + + @Test + public void testExecutor() { + + final List hoodieRecords = dataGen.generateInserts(instantTime, 128); + final List consumedRecords = new ArrayList<>(); + + HoodieWriteConfig hoodieWriteConfig = mock(HoodieWriteConfig.class); + when(hoodieWriteConfig.getDisruptorWriteBufferSize()).thenReturn(Option.of(8)); + IteratorBasedQueueConsumer, Integer> consumer = + new IteratorBasedQueueConsumer, Integer>() { + + private int count = 0; + + @Override + public void consumeOneRecord(HoodieLazyInsertIterable.HoodieInsertValueGenResult record) { + consumedRecords.add(record.record); + count++; + } + + @Override + protected Integer getResult() { + return count; + } + }; + DisruptorExecutor>, Integer> exec = null; + + try { + exec = new DisruptorExecutor(hoodieWriteConfig.getDisruptorWriteBufferSize(), hoodieRecords.iterator(), consumer, + getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA), Option.of(WaitStrategyFactory.DEFAULT_STRATEGY), getPreExecuteRunnable()); + int result = exec.execute(); + // It should buffer and write 100 records + assertEquals(128, result); + // There should be no remaining records in the buffer + assertFalse(exec.isRemaining()); + + // collect all records and assert that consumed records are identical to produced ones + // assert there's no tampering, and that the ordering is preserved + assertEquals(hoodieRecords, consumedRecords); + for (int i = 0; i < hoodieRecords.size(); i++) { + assertEquals(hoodieRecords.get(i), consumedRecords.get(i)); + } + + } finally { + if (exec != null) { + exec.shutdownNow(); + } + } + } + + @Test + @Timeout(value = 60) + public void testInterruptExecutor() { + final List hoodieRecords = dataGen.generateInserts(instantTime, 100); + + HoodieWriteConfig hoodieWriteConfig = mock(HoodieWriteConfig.class); + when(hoodieWriteConfig.getDisruptorWriteBufferSize()).thenReturn(Option.of(1024)); + IteratorBasedQueueConsumer, Integer> consumer = + new IteratorBasedQueueConsumer, Integer>() { + + @Override + public void consumeOneRecord(HoodieLazyInsertIterable.HoodieInsertValueGenResult record) { + try { + Thread.currentThread().wait(); + } catch (InterruptedException ie) { + // ignore here + } + } + + @Override + protected Integer getResult() { + return 0; + } + }; + + DisruptorExecutor>, Integer> + executor = new DisruptorExecutor(hoodieWriteConfig.getDisruptorWriteBufferSize(), hoodieRecords.iterator(), consumer, + getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA), Option.of(WaitStrategyFactory.DEFAULT_STRATEGY), getPreExecuteRunnable()); + + try { + Thread.currentThread().interrupt(); + assertThrows(HoodieException.class, executor::execute); + assertTrue(Thread.interrupted()); + } catch (Exception e) { + // ignore here + } + } +} diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestDisruptorMessageQueue.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestDisruptorMessageQueue.java new file mode 100644 index 0000000000000..d296d56440031 --- /dev/null +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestDisruptorMessageQueue.java @@ -0,0 +1,337 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.execution; + +import static org.apache.hudi.execution.HoodieLazyInsertIterable.getTransformFunction; + +import org.apache.avro.generic.IndexedRecord; +import org.apache.hudi.common.model.HoodieAvroRecord; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.testutils.HoodieTestDataGenerator; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.common.util.queue.DisruptorMessageQueue; +import org.apache.hudi.common.util.queue.FunctionBasedQueueProducer; +import org.apache.hudi.common.util.queue.HoodieProducer; +import org.apache.hudi.common.util.queue.IteratorBasedQueueConsumer; +import org.apache.hudi.common.util.queue.DisruptorExecutor; +import org.apache.hudi.common.util.queue.IteratorBasedQueueProducer; +import org.apache.hudi.common.util.queue.WaitStrategyFactory; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.testutils.HoodieClientTestHarness; +import org.apache.spark.TaskContext; +import org.apache.spark.TaskContext$; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; + +import java.io.IOException; +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import scala.Tuple2; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class TestDisruptorMessageQueue extends HoodieClientTestHarness { + + private final String instantTime = HoodieActiveTimeline.createNewInstantTime(); + + @BeforeEach + public void setUp() throws Exception { + initTestDataGenerator(); + initExecutorServiceWithFixedThreadPool(2); + } + + @AfterEach + public void tearDown() throws Exception { + cleanupResources(); + } + + private Runnable getPreExecuteRunnable() { + final TaskContext taskContext = TaskContext.get(); + return () -> TaskContext$.MODULE$.setTaskContext(taskContext); + } + + // Test to ensure that we are reading all records from queue iterator in the same order + // without any exceptions. + @SuppressWarnings("unchecked") + @Test + @Timeout(value = 60) + public void testRecordReading() throws Exception { + + final List hoodieRecords = dataGen.generateInserts(instantTime, 100); + ArrayList beforeRecord = new ArrayList<>(); + ArrayList beforeIndexedRecord = new ArrayList<>(); + ArrayList afterRecord = new ArrayList<>(); + ArrayList afterIndexedRecord = new ArrayList<>(); + + hoodieRecords.forEach(record -> { + final HoodieAvroRecord originalRecord = (HoodieAvroRecord) record; + beforeRecord.add(originalRecord); + try { + final Option originalInsertValue = + originalRecord.getData().getInsertValue(HoodieTestDataGenerator.AVRO_SCHEMA); + beforeIndexedRecord.add(originalInsertValue.get()); + } catch (IOException e) { + // ignore exception here. + } + }); + + HoodieWriteConfig hoodieWriteConfig = mock(HoodieWriteConfig.class); + when(hoodieWriteConfig.getDisruptorWriteBufferSize()).thenReturn(Option.of(16)); + IteratorBasedQueueConsumer, Integer> consumer = + new IteratorBasedQueueConsumer, Integer>() { + + private int count = 0; + + @Override + public void consumeOneRecord(HoodieLazyInsertIterable.HoodieInsertValueGenResult record) { + count++; + afterRecord.add((HoodieAvroRecord) record.record); + try { + IndexedRecord indexedRecord = (IndexedRecord)((HoodieAvroRecord) record.record) + .getData().getInsertValue(HoodieTestDataGenerator.AVRO_SCHEMA).get(); + afterIndexedRecord.add(indexedRecord); + } catch (IOException e) { + //ignore exception here. + } + } + + @Override + protected Integer getResult() { + return count; + } + }; + + DisruptorExecutor>, Integer> exec = null; + + try { + exec = new DisruptorExecutor(hoodieWriteConfig.getDisruptorWriteBufferSize(), hoodieRecords.iterator(), consumer, + getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA), Option.of(WaitStrategyFactory.DEFAULT_STRATEGY), getPreExecuteRunnable()); + int result = exec.execute(); + // It should buffer and write 100 records + assertEquals(100, result); + // There should be no remaining records in the buffer + assertFalse(exec.isRemaining()); + + assertEquals(beforeRecord, afterRecord); + assertEquals(beforeIndexedRecord, afterIndexedRecord); + + } finally { + if (exec != null) { + exec.shutdownNow(); + } + } + } + + /** + * Test to ensure that we are reading all records from queue iterator when we have multiple producers. + */ + @SuppressWarnings("unchecked") + @Test + @Timeout(value = 60) + public void testCompositeProducerRecordReading() throws Exception { + final int numRecords = 1000; + final int numProducers = 40; + final List> recs = new ArrayList<>(); + + final DisruptorMessageQueue queue = + new DisruptorMessageQueue(Option.of(1024), getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA), + Option.of("BLOCKING_WAIT"), numProducers, new Runnable() { + @Override + public void run() { + // do nothing. + } + }); + + // Record Key to + Map> keyToProducerAndIndexMap = new HashMap<>(); + + for (int i = 0; i < numProducers; i++) { + List pRecs = dataGen.generateInserts(instantTime, numRecords); + int j = 0; + for (HoodieRecord r : pRecs) { + assertFalse(keyToProducerAndIndexMap.containsKey(r.getRecordKey())); + keyToProducerAndIndexMap.put(r.getRecordKey(), Pair.of(i, j)); + j++; + } + recs.add(pRecs); + } + + List producers = new ArrayList<>(); + for (int i = 0; i < recs.size(); i++) { + final List r = recs.get(i); + // Alternate between pull and push based iterators + if (i % 2 == 0) { + HoodieProducer producer = new IteratorBasedQueueProducer<>(r.iterator()); + producers.add(producer); + } else { + HoodieProducer producer = new FunctionBasedQueueProducer<>((buf) -> { + Iterator itr = r.iterator(); + while (itr.hasNext()) { + try { + buf.insertRecord(itr.next()); + } catch (Exception e) { + throw new HoodieException(e); + } + } + return true; + }); + producers.add(producer); + } + } + + // Used to ensure that consumer sees the records generated by a single producer in FIFO order + Map lastSeenMap = + IntStream.range(0, numProducers).boxed().collect(Collectors.toMap(Function.identity(), x -> -1)); + Map countMap = + IntStream.range(0, numProducers).boxed().collect(Collectors.toMap(Function.identity(), x -> 0)); + + // setup consumer and start disruptor + IteratorBasedQueueConsumer, Integer> consumer = + new IteratorBasedQueueConsumer, Integer>() { + + @Override + public void consumeOneRecord(HoodieLazyInsertIterable.HoodieInsertValueGenResult payload) { + // Read recs and ensure we have covered all producer recs. + final HoodieRecord rec = payload.record; + Pair producerPos = keyToProducerAndIndexMap.get(rec.getRecordKey()); + Integer lastSeenPos = lastSeenMap.get(producerPos.getLeft()); + countMap.put(producerPos.getLeft(), countMap.get(producerPos.getLeft()) + 1); + lastSeenMap.put(producerPos.getLeft(), lastSeenPos + 1); + // Ensure we are seeing the next record generated + assertEquals(lastSeenPos + 1, producerPos.getRight().intValue()); + } + + @Override + protected Integer getResult() { + return 0; + } + }; + + Method setHandlersFunc = queue.getClass().getDeclaredMethod("setHandlers", IteratorBasedQueueConsumer.class); + setHandlersFunc.setAccessible(true); + setHandlersFunc.invoke(queue, consumer); + + Method startFunc = queue.getClass().getDeclaredMethod("start"); + startFunc.setAccessible(true); + startFunc.invoke(queue); + + // start to produce records + CompletableFuture producerFuture = CompletableFuture.allOf(producers.stream().map(producer -> { + return CompletableFuture.supplyAsync(() -> { + try { + producer.produce(queue); + } catch (Throwable e) { + throw new HoodieException("Error producing records in disruptor executor", e); + } + return true; + }, executorService); + }).toArray(CompletableFuture[]::new)); + + producerFuture.get(); + + // wait for all the records consumed. + queue.close(); + + for (int i = 0; i < numProducers; i++) { + // Ensure we have seen all the records for each producers + assertEquals(Integer.valueOf(numRecords), countMap.get(i)); + } + } + + /** + * Test to ensure that one of the producers exception will stop current ingestion. + */ + @SuppressWarnings("unchecked") + @Test + @Timeout(value = 60) + public void testException() throws Exception { + final int numRecords = 1000; + final int numProducers = 40; + + final DisruptorMessageQueue queue = + new DisruptorMessageQueue(Option.of(1024), getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA), + Option.of("BLOCKING_WAIT"), numProducers, new Runnable() { + @Override + public void run() { + // do nothing. + } + }); + + List pRecs = dataGen.generateInserts(instantTime, numRecords); + + // create 2 producers + // producer1 : common producer + // producer2 : exception producer + List producers = new ArrayList<>(); + for (int i = 0; i < 2; i++) { + if (i % 2 == 0) { + producers.add(new IteratorBasedQueueProducer<>(pRecs.iterator())); + } else { + producers.add(new FunctionBasedQueueProducer<>((buf) -> { + throw new HoodieException("Exception when produce records!!!"); + })); + } + } + + + IteratorBasedQueueConsumer, Integer> consumer = + new IteratorBasedQueueConsumer, Integer>() { + + int count = 0; + @Override + public void consumeOneRecord(HoodieLazyInsertIterable.HoodieInsertValueGenResult payload) { + // Read recs and ensure we have covered all producer recs. + final HoodieRecord rec = payload.record; + count++; + } + + @Override + protected Integer getResult() { + return count; + } + }; + + DisruptorExecutor>, Integer> exec = new DisruptorExecutor(Option.of(1024), + producers, Option.of(consumer), getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA), + Option.of(WaitStrategyFactory.DEFAULT_STRATEGY), getPreExecuteRunnable()); + + final Throwable thrown = assertThrows(HoodieException.class, exec::execute, + "exception is expected"); + assertTrue(thrown.getMessage().contains("Error producing records in disruptor executor")); + } +} diff --git a/hudi-common/pom.xml b/hudi-common/pom.xml index 87b8e5e0beea3..15becc3c169ff 100644 --- a/hudi-common/pom.xml +++ b/hudi-common/pom.xml @@ -299,5 +299,11 @@ joda-time test + + + com.lmax + disruptor + ${disruptor.version} + diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/CustomizedThreadFactory.java b/hudi-common/src/main/java/org/apache/hudi/common/util/CustomizedThreadFactory.java index 738be514b2cbf..a13f3a804f983 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/CustomizedThreadFactory.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/CustomizedThreadFactory.java @@ -33,6 +33,8 @@ public class CustomizedThreadFactory implements ThreadFactory { private final String threadName; private final boolean daemon; + private Runnable preExecuteRunnable; + public CustomizedThreadFactory() { this("pool-" + POOL_NUM.getAndIncrement(), false); } @@ -41,6 +43,16 @@ public CustomizedThreadFactory(String threadNamePrefix) { this(threadNamePrefix, false); } + public CustomizedThreadFactory(String threadNamePrefix, Runnable preExecuteRunnable) { + this(threadNamePrefix, false, preExecuteRunnable); + } + + public CustomizedThreadFactory(String threadNamePrefix, boolean daemon, Runnable preExecuteRunnable) { + this.threadName = threadNamePrefix + "-thread-"; + this.daemon = daemon; + this.preExecuteRunnable = preExecuteRunnable; + } + public CustomizedThreadFactory(String threadNamePrefix, boolean daemon) { this.threadName = threadNamePrefix + "-thread-"; this.daemon = daemon; @@ -48,7 +60,15 @@ public CustomizedThreadFactory(String threadNamePrefix, boolean daemon) { @Override public Thread newThread(@NotNull Runnable r) { - Thread runThread = new Thread(r); + Thread runThread = preExecuteRunnable == null ? new Thread(r) : new Thread(new Runnable() { + + @Override + public void run() { + preExecuteRunnable.run(); + r.run(); + } + }); + runThread.setDaemon(daemon); runThread.setName(threadName + threadNum.getAndIncrement()); return runThread; diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetReaderIterator.java b/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetReaderIterator.java index 03bd471b606f1..347bcdf77a6ac 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetReaderIterator.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetReaderIterator.java @@ -18,7 +18,7 @@ package org.apache.hudi.common.util; -import org.apache.hudi.common.util.queue.BoundedInMemoryQueue; +import org.apache.hudi.common.util.queue.BoundedInMemoryQueueIterable; import org.apache.hudi.exception.HoodieException; import org.apache.parquet.hadoop.ParquetReader; @@ -27,7 +27,7 @@ /** * This class wraps a parquet reader and provides an iterator based api to read from a parquet file. This is used in - * {@link BoundedInMemoryQueue} + * {@link BoundedInMemoryQueueIterable} */ public class ParquetReaderIterator implements ClosableIterator { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryExecutor.java b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryExecutor.java index 46ef5dc40caf8..ce5898c7c3101 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryExecutor.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryExecutor.java @@ -18,119 +18,98 @@ package org.apache.hudi.common.util.queue; -import org.apache.hudi.common.util.CustomizedThreadFactory; import org.apache.hudi.common.util.DefaultSizeEstimator; import org.apache.hudi.common.util.Functions; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.SizeEstimator; import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.exception.HoodieIOException; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; +import java.io.IOException; import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorCompletionService; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; import java.util.function.Function; -import java.util.stream.Collectors; /** - * Executor which orchestrates concurrent producers and consumers communicating through a bounded in-memory queue. This + * Executor which orchestrates concurrent producers and consumers communicating through 'BoundedInMemoryQueue'. This * class takes as input the size limit, queue producer(s), consumer and transformer and exposes API to orchestrate * concurrent execution of these actors communicating through a central bounded queue */ -public class BoundedInMemoryExecutor { +public class BoundedInMemoryExecutor extends HoodieExecutorBase { private static final Logger LOG = LogManager.getLogger(BoundedInMemoryExecutor.class); - private static final long TERMINATE_WAITING_TIME_SECS = 60L; - // Executor service used for launching write thread. - private final ExecutorService producerExecutorService; - // Executor service used for launching read thread. - private final ExecutorService consumerExecutorService; - // Used for buffering records which is controlled by HoodieWriteConfig#WRITE_BUFFER_LIMIT_BYTES. - private final BoundedInMemoryQueue queue; - // Producers - private final List> producers; - // Consumer - private final Option> consumer; - // pre-execute function to implement environment specific behavior before executors (producers/consumer) run - private final Runnable preExecuteRunnable; + private final HoodieMessageQueue queue; public BoundedInMemoryExecutor(final long bufferLimitInBytes, final Iterator inputItr, - BoundedInMemoryQueueConsumer consumer, Function transformFunction, Runnable preExecuteRunnable) { + IteratorBasedQueueConsumer consumer, Function transformFunction, Runnable preExecuteRunnable) { this(bufferLimitInBytes, new IteratorBasedQueueProducer<>(inputItr), Option.of(consumer), transformFunction, preExecuteRunnable); } - public BoundedInMemoryExecutor(final long bufferLimitInBytes, BoundedInMemoryQueueProducer producer, - Option> consumer, final Function transformFunction) { + public BoundedInMemoryExecutor(final long bufferLimitInBytes, HoodieProducer producer, + Option> consumer, final Function transformFunction) { this(bufferLimitInBytes, producer, consumer, transformFunction, Functions.noop()); } - public BoundedInMemoryExecutor(final long bufferLimitInBytes, BoundedInMemoryQueueProducer producer, - Option> consumer, final Function transformFunction, Runnable preExecuteRunnable) { + public BoundedInMemoryExecutor(final long bufferLimitInBytes, HoodieProducer producer, + Option> consumer, final Function transformFunction, Runnable preExecuteRunnable) { this(bufferLimitInBytes, Collections.singletonList(producer), consumer, transformFunction, new DefaultSizeEstimator<>(), preExecuteRunnable); } - public BoundedInMemoryExecutor(final long bufferLimitInBytes, List> producers, - Option> consumer, final Function transformFunction, + public BoundedInMemoryExecutor(final long bufferLimitInBytes, List> producers, + Option> consumer, final Function transformFunction, final SizeEstimator sizeEstimator, Runnable preExecuteRunnable) { - this.producers = producers; - this.consumer = consumer; - this.preExecuteRunnable = preExecuteRunnable; - // Ensure fixed thread for each producer thread - this.producerExecutorService = Executors.newFixedThreadPool(producers.size(), new CustomizedThreadFactory("producer")); - // Ensure single thread for consumer - this.consumerExecutorService = Executors.newSingleThreadExecutor(new CustomizedThreadFactory("consumer")); - this.queue = new BoundedInMemoryQueue<>(bufferLimitInBytes, transformFunction, sizeEstimator); + super(producers, consumer, preExecuteRunnable); + this.queue = new BoundedInMemoryQueueIterable<>(bufferLimitInBytes, transformFunction, sizeEstimator); } /** - * Start all Producers. + * Start all producers at once. */ - public ExecutorCompletionService startProducers() { + @Override + public CompletableFuture startProducers() { // Latch to control when and which producer thread will close the queue final CountDownLatch latch = new CountDownLatch(producers.size()); - final ExecutorCompletionService completionService = - new ExecutorCompletionService(producerExecutorService); - producers.stream().map(producer -> { - return completionService.submit(() -> { + + return CompletableFuture.allOf(producers.stream().map(producer -> { + return CompletableFuture.supplyAsync(() -> { try { - preExecuteRunnable.run(); producer.produce(queue); } catch (Throwable e) { LOG.error("error producing records", e); queue.markAsFailed(e); - throw e; + throw new HoodieException("Error producing records in bounded in memory executor", e); } finally { synchronized (latch) { latch.countDown(); if (latch.getCount() == 0) { // Mark production as done so that consumer will be able to exit - queue.close(); + try { + queue.close(); + } catch (IOException e) { + throw new HoodieIOException("Catch Exception when closing BoundedInMemoryQueue.", e); + } } } } return true; - }); - }).collect(Collectors.toList()); - return completionService; + }, producerExecutorService); + }).toArray(CompletableFuture[]::new)); } /** * Start only consumer. */ - private Future startConsumer() { + @Override + protected CompletableFuture startConsumer() { return consumer.map(consumer -> { - return consumerExecutorService.submit(() -> { + return CompletableFuture.supplyAsync(() -> { LOG.info("starting consumer thread"); - preExecuteRunnable.run(); try { E result = consumer.consume(queue); LOG.info("Queue Consumption is done; notifying producer threads"); @@ -138,61 +117,41 @@ private Future startConsumer() { } catch (Exception e) { LOG.error("error consuming records", e); queue.markAsFailed(e); - throw e; + throw new HoodieException(e); } - }); + }, consumerExecutorService); }).orElse(CompletableFuture.completedFuture(null)); } - /** - * Main API to run both production and consumption. - */ - public E execute() { - try { - startProducers(); - Future future = startConsumer(); - // Wait for consumer to be done - return future.get(); - } catch (InterruptedException ie) { - shutdownNow(); - Thread.currentThread().interrupt(); - throw new HoodieException(ie); - } catch (Exception e) { - throw new HoodieException(e); - } + @Override + public boolean isRemaining() { + return getQueue().iterator().hasNext(); } - public boolean isRemaining() { - return queue.iterator().hasNext(); + @Override + protected void postAction() { + super.close(); } + @Override public void shutdownNow() { producerExecutorService.shutdownNow(); consumerExecutorService.shutdownNow(); // close queue to force producer stop - queue.close(); - } - - public boolean awaitTermination() { - // if current thread has been interrupted before awaitTermination was called, we still give - // executor a chance to proceeding. So clear the interrupt flag and reset it if needed before return. - boolean interruptedBefore = Thread.interrupted(); - boolean producerTerminated = false; - boolean consumerTerminated = false; try { - producerTerminated = producerExecutorService.awaitTermination(TERMINATE_WAITING_TIME_SECS, TimeUnit.SECONDS); - consumerTerminated = consumerExecutorService.awaitTermination(TERMINATE_WAITING_TIME_SECS, TimeUnit.SECONDS); - } catch (InterruptedException ie) { - // fail silently for any other interruption + queue.close(); + } catch (IOException e) { + throw new HoodieIOException("catch IOException while closing HoodieMessageQueue", e); } - // reset interrupt flag if needed - if (interruptedBefore) { - Thread.currentThread().interrupt(); - } - return producerTerminated && consumerTerminated; } - public BoundedInMemoryQueue getQueue() { - return queue; + @Override + public BoundedInMemoryQueueIterable getQueue() { + return (BoundedInMemoryQueueIterable)queue; + } + + @Override + protected void setup() { + // do nothing. } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryQueue.java b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryQueueIterable.java similarity index 94% rename from hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryQueue.java rename to hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryQueueIterable.java index dfe33b49ec0c7..47b8c81fc4600 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryQueue.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryQueueIterable.java @@ -49,7 +49,7 @@ * @param input payload data type * @param output payload data type */ -public class BoundedInMemoryQueue implements Iterable { +public class BoundedInMemoryQueueIterable extends HoodieIterableMessageQueue { /** Interval used for polling records in the queue. **/ public static final int RECORD_POLL_INTERVAL_SEC = 1; @@ -60,7 +60,7 @@ public class BoundedInMemoryQueue implements Iterable { /** Maximum records that will be cached. **/ private static final int RECORD_CACHING_LIMIT = 128 * 1024; - private static final Logger LOG = LogManager.getLogger(BoundedInMemoryQueue.class); + private static final Logger LOG = LogManager.getLogger(BoundedInMemoryQueueIterable.class); /** * It indicates number of records to cache. We will be using sampled record's average size to @@ -116,7 +116,7 @@ public class BoundedInMemoryQueue implements Iterable { * @param memoryLimit MemoryLimit in bytes * @param transformFunction Transformer Function to convert input payload type to stored payload type */ - public BoundedInMemoryQueue(final long memoryLimit, final Function transformFunction) { + public BoundedInMemoryQueueIterable(final long memoryLimit, final Function transformFunction) { this(memoryLimit, transformFunction, new DefaultSizeEstimator() {}); } @@ -127,15 +127,16 @@ public BoundedInMemoryQueue(final long memoryLimit, final Function transfo * @param transformFunction Transformer Function to convert input payload type to stored payload type * @param payloadSizeEstimator Payload Size Estimator */ - public BoundedInMemoryQueue(final long memoryLimit, final Function transformFunction, - final SizeEstimator payloadSizeEstimator) { + public BoundedInMemoryQueueIterable(final long memoryLimit, final Function transformFunction, + final SizeEstimator payloadSizeEstimator) { this.memoryLimit = memoryLimit; this.transformFunction = transformFunction; this.payloadSizeEstimator = payloadSizeEstimator; this.iterator = new QueueIterator(); } - public int size() { + @Override + public long size() { return this.queue.size(); } @@ -174,6 +175,7 @@ private void adjustBufferSizeIfNeeded(final O payload) throws InterruptedExcepti * * @param t Item to be queued */ + @Override public void insertRecord(I t) throws Exception { // If already closed, throw exception if (isWriteDone.get()) { @@ -203,7 +205,8 @@ private boolean expectMoreRecords() { * Reader interface but never exposed to outside world as this is a single consumer queue. Reading is done through a * singleton iterator for this queue. */ - private Option readNextRecord() { + @Override + public Option readNextRecord() { if (this.isReadDone.get()) { return Option.empty(); } @@ -237,6 +240,7 @@ private Option readNextRecord() { /** * Puts an empty entry to queue to denote termination. */ + @Override public void close() { // done queueing records notifying queue-reader. isWriteDone.set(true); @@ -252,6 +256,7 @@ private void throwExceptionIfFailed() { /** * API to allow producers and consumer to communicate termination due to failure. */ + @Override public void markAsFailed(Throwable e) { this.hasFailed.set(e); // release the permits so that if the queueing thread is waiting for permits then it will @@ -259,6 +264,11 @@ public void markAsFailed(Throwable e) { this.rateLimiter.release(RECORD_CACHING_LIMIT + 1); } + @Override + public boolean isEmpty() { + return this.queue.size() == 0; + } + @Override public Iterator iterator() { return iterator; diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/DisruptorExecutor.java b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/DisruptorExecutor.java new file mode 100644 index 0000000000000..7ea5de07c0dca --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/DisruptorExecutor.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.util.queue; + +import org.apache.hudi.common.util.Option; +import org.apache.hudi.exception.HoodieException; + +import org.apache.hudi.exception.HoodieIOException; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.io.IOException; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.function.Function; + +/** + * Executor which orchestrates concurrent producers and consumers communicating through 'DisruptorMessageQueue'. This + * class takes as queue producer(s), consumer and transformer and exposes API to orchestrate + * concurrent execution of these actors communicating through disruptor + */ +public class DisruptorExecutor extends HoodieExecutorBase { + + private static final Logger LOG = LogManager.getLogger(DisruptorExecutor.class); + private final HoodieMessageQueue queue; + + public DisruptorExecutor(final Option bufferSize, final Iterator inputItr, + IteratorBasedQueueConsumer consumer, Function transformFunction, Option waitStrategy, Runnable preExecuteRunnable) { + this(bufferSize, new IteratorBasedQueueProducer<>(inputItr), Option.of(consumer), transformFunction, waitStrategy, preExecuteRunnable); + } + + public DisruptorExecutor(final Option bufferSize, HoodieProducer producer, + Option> consumer, final Function transformFunction, Option waitStrategy, Runnable preExecuteRunnable) { + this(bufferSize, Collections.singletonList(producer), consumer, transformFunction, waitStrategy, preExecuteRunnable); + } + + public DisruptorExecutor(final Option bufferSize, List> producers, + Option> consumer, final Function transformFunction, + final Option waitStrategy, Runnable preExecuteRunnable) { + super(producers, consumer, preExecuteRunnable); + this.queue = new DisruptorMessageQueue<>(bufferSize, transformFunction, waitStrategy, producers.size(), preExecuteRunnable); + } + + /** + * Start all Producers. + */ + @Override + public CompletableFuture startProducers() { + return CompletableFuture.allOf(producers.stream().map(producer -> { + return CompletableFuture.supplyAsync(() -> { + try { + producer.produce(queue); + } catch (Throwable e) { + LOG.error("error producing records", e); + throw new HoodieException("Error producing records in disruptor executor", e); + } + return true; + }, producerExecutorService); + }).toArray(CompletableFuture[]::new)); + } + + @Override + protected void setup() { + ((DisruptorMessageQueue)queue).setHandlers(consumer.get()); + ((DisruptorMessageQueue)queue).start(); + } + + @Override + protected void postAction() { + try { + super.close(); + queue.close(); + } catch (IOException e) { + throw new HoodieIOException("Catch IOException while closing DisruptorMessageQueue", e); + } + } + + @Override + protected CompletableFuture startConsumer() { + return producerFuture.thenApplyAsync(res -> { + try { + queue.close(); + consumer.get().finish(); + return consumer.get().getResult(); + } catch (IOException e) { + throw new HoodieIOException("Catch Exception when closing", e); + } + }, consumerExecutorService); + } + + @Override + public boolean isRemaining() { + return !queue.isEmpty(); + } + + @Override + public void shutdownNow() { + producerExecutorService.shutdownNow(); + consumerExecutorService.shutdownNow(); + try { + queue.close(); + } catch (IOException e) { + throw new HoodieIOException("Catch IOException while closing DisruptorMessageQueue"); + } + } + + @Override + public DisruptorMessageQueue getQueue() { + return (DisruptorMessageQueue)queue; + } +} diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/DisruptorMessageQueue.java b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/DisruptorMessageQueue.java new file mode 100644 index 0000000000000..eccd881af1431 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/DisruptorMessageQueue.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.util.queue; + +import com.lmax.disruptor.EventFactory; +import com.lmax.disruptor.EventHandler; +import com.lmax.disruptor.EventTranslator; +import com.lmax.disruptor.RingBuffer; +import com.lmax.disruptor.WaitStrategy; +import com.lmax.disruptor.dsl.Disruptor; +import com.lmax.disruptor.dsl.ProducerType; +import org.apache.hudi.common.util.CustomizedThreadFactory; +import org.apache.hudi.common.util.Option; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Function; + +public class DisruptorMessageQueue implements HoodieMessageQueue { + + private static final Logger LOG = LogManager.getLogger(DisruptorMessageQueue.class); + + private final Disruptor queue; + private final Function transformFunction; + private final RingBuffer ringBuffer; + private final Lock closeLocker = new ReentrantLock(); + + private boolean isDisruptorClosed = false; + + public DisruptorMessageQueue(Option bufferSize, Function transformFunction, Option waitStrategyName, int totalProducers, Runnable preExecuteRunnable) { + WaitStrategy waitStrategy = WaitStrategyFactory.build(waitStrategyName); + CustomizedThreadFactory threadFactory = new CustomizedThreadFactory("disruptor", true, preExecuteRunnable); + + this.queue = new Disruptor<>(new HoodieDisruptorEventFactory(), bufferSize.get(), threadFactory, totalProducers > 1 ? ProducerType.MULTI : ProducerType.SINGLE, waitStrategy); + this.ringBuffer = queue.getRingBuffer(); + this.transformFunction = transformFunction; + } + + @Override + public long size() { + return ringBuffer.getBufferSize() - ringBuffer.remainingCapacity(); + } + + @Override + public void insertRecord(I value) throws Exception { + O applied = transformFunction.apply(value); + EventTranslator translator = (event, sequence) -> event.set(applied); + queue.getRingBuffer().publishEvent(translator); + } + + @Override + public Option readNextRecord() { + throw new UnsupportedOperationException("Should not call readNextRecord here. And let DisruptorMessageHandler to handle consuming logic"); + } + + @Override + public void markAsFailed(Throwable e) { + // do nothing. + } + + @Override + public boolean isEmpty() { + return ringBuffer.getBufferSize() == ringBuffer.remainingCapacity(); + } + + @Override + public void close() { + closeLocker.lock(); + if (!isDisruptorClosed) { + queue.shutdown(); + isDisruptorClosed = true; + } + closeLocker.unlock(); + } + + protected void setHandlers(IteratorBasedQueueConsumer consumer) { + queue.handleEventsWith(new EventHandler() { + + @Override + public void onEvent(HoodieDisruptorEvent event, long sequence, boolean endOfBatch) throws Exception { + consumer.consumeOneRecord(event.get()); + } + }); + } + + protected void start() { + queue.start(); + } + + /** + * HoodieDisruptorEventFactory is used to create/preallocate HoodieDisruptorEvent. + * + */ + class HoodieDisruptorEventFactory implements EventFactory { + + @Override + public HoodieDisruptorEvent newInstance() { + return new HoodieDisruptorEvent(); + } + } + + /** + * The unit of data passed from producer to consumer in disruptor world. + */ + class HoodieDisruptorEvent { + + private O value; + + public void set(O value) { + this.value = value; + } + + public O get() { + return this.value; + } + } +} + diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/DisruptorWaitStrategyType.java b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/DisruptorWaitStrategyType.java new file mode 100644 index 0000000000000..1a8e86835d0cf --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/DisruptorWaitStrategyType.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.util.queue; + +import org.apache.hudi.keygen.constant.KeyGeneratorType; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +public enum DisruptorWaitStrategyType { + + /** + * The BlockingWaitStrategy is the slowest of the available wait strategies, but is the most conservative with the respect to CPU usage + * and will give the most consistent behaviour across the widest variety of deployment options. + */ + BLOCKING_WAIT, + + /** + * Like the `BlockingWaitStrategy` the `SleepingWaitStrategy` it attempts to be conservative with CPU usage by using a simple busy wait loop. + * The difference is that the `SleepingWaitStrategy` uses a call to `LockSupport.parkNanos(1)` in the middle of the loop. + * On a typical Linux system this will pause the thread for around 60µs. + */ + SLEEPING_WAIT, + + /** + * The `YieldingWaitStrategy` is one of two WaitStrategies that can be use in low-latency systems. + * It is designed for cases where there is the option to burn CPU cycles with the goal of improving latency. + * The `YieldingWaitStrategy` will busy spin, waiting for the sequence to increment to the appropriate value. + * Inside the body of the loop `Thread#yield()` will be called allowing other queued threads to run. + * This is the recommended wait strategy when you need very high performance, and the number of `EventHandler` threads is lower than the total number of logical cores, + * e.g. you have hyper-threading enabled. + */ + YIELDING_WAIT, + + /** + * The `BusySpinWaitStrategy` is the highest performing WaitStrategy. + * Like the `YieldingWaitStrategy`, it can be used in low-latency systems, but puts the highest constraints on the deployment environment. + */ + BUSY_SPIN_WAIT; + + public static List getNames() { + List names = new ArrayList<>(KeyGeneratorType.values().length); + Arrays.stream(KeyGeneratorType.values()) + .forEach(x -> names.add(x.name())); + return names; + } +} diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/ExecutorType.java b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/ExecutorType.java new file mode 100644 index 0000000000000..05ecb1746c28d --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/ExecutorType.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.util.queue; + +import org.apache.hudi.keygen.constant.KeyGeneratorType; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +/** + * Types of {@link org.apache.hudi.common.util.queue.HoodieExecutor}. + */ +public enum ExecutorType { + + /** + * Executor which orchestrates concurrent producers and consumers communicating through a bounded in-memory message queue using LinkedBlockingQueue. + */ + BOUNDED_IN_MEMORY, + + /** + * Executor which orchestrates concurrent producers and consumers communicating through disruptor as a lock free message queue + * to gain better writing performance. Although DisruptorExecutor is still an experimental feature. + */ + DISRUPTOR; + + public static List getNames() { + List names = new ArrayList<>(ExecutorType.values().length); + Arrays.stream(KeyGeneratorType.values()) + .forEach(x -> names.add(x.name())); + return names; + } +} diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/FunctionBasedQueueProducer.java b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/FunctionBasedQueueProducer.java index 549683754cbd8..df199158784bb 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/FunctionBasedQueueProducer.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/FunctionBasedQueueProducer.java @@ -28,18 +28,18 @@ * * @param Type of entry produced for queue */ -public class FunctionBasedQueueProducer implements BoundedInMemoryQueueProducer { +public class FunctionBasedQueueProducer implements HoodieProducer { private static final Logger LOG = LogManager.getLogger(FunctionBasedQueueProducer.class); - private final Function, Boolean> producerFunction; + private final Function, Boolean> producerFunction; - public FunctionBasedQueueProducer(Function, Boolean> producerFunction) { + public FunctionBasedQueueProducer(Function, Boolean> producerFunction) { this.producerFunction = producerFunction; } @Override - public void produce(BoundedInMemoryQueue queue) { + public void produce(HoodieMessageQueue queue) { LOG.info("starting function which will enqueue records"); producerFunction.apply(queue); LOG.info("finished function which will enqueue records"); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/HoodieConsumer.java b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/HoodieConsumer.java new file mode 100644 index 0000000000000..fb67fab6c74c7 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/HoodieConsumer.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.util.queue; + +/** + * HoodieConsumer is used to consume records/messages from hoodie inner message queue and write into DFS. + */ +public interface HoodieConsumer { + + /** + * Consume records from inner message queue. + */ + O consume(HoodieMessageQueue queue) throws Exception; +} diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/HoodieExecutor.java b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/HoodieExecutor.java new file mode 100644 index 0000000000000..7d51441edec29 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/HoodieExecutor.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.util.queue; + +import java.io.Closeable; + +/** + * HoodieExecutor which orchestrates concurrent producers and consumers communicating through a bounded in message queue. + */ +public interface HoodieExecutor extends Closeable { + + /** + * Main API to + * 1. Set up and run all the production + * 2. Set up and run all the consumption. + * 3. Shutdown and return the result. + */ + E execute(); + + boolean isRemaining(); + + /** + * Shutdown all the consumers and producers. + */ + void shutdownNow(); + + boolean awaitTermination(); +} diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/HoodieExecutorBase.java b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/HoodieExecutorBase.java new file mode 100644 index 0000000000000..8dc07e35e4901 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/HoodieExecutorBase.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.util.queue; + +import org.apache.hudi.common.util.CustomizedThreadFactory; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.exception.HoodieException; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +/** + * HoodieExecutorBase holds common elements producerExecutorService, consumerExecutorService, producers and a single consumer. + * Also HoodieExecutorBase control the lifecycle of producerExecutorService and consumerExecutorService. + */ +public abstract class HoodieExecutorBase implements HoodieExecutor { + + private static final Logger LOG = LogManager.getLogger(HoodieExecutorBase.class); + + private static final long TERMINATE_WAITING_TIME_SECS = 60L; + // Executor service used for launching write thread. + protected final ExecutorService producerExecutorService; + // Executor service used for launching read thread. + protected final ExecutorService consumerExecutorService; + // Producers + protected final List> producers; + // Consumer + protected final Option> consumer; + // pre-execute function to implement environment specific behavior before executors (producers/consumer) run + protected final Runnable preExecuteRunnable; + + CompletableFuture producerFuture; + + public HoodieExecutorBase(List> producers, Option> consumer, + Runnable preExecuteRunnable) { + this.producers = producers; + this.consumer = consumer; + this.preExecuteRunnable = preExecuteRunnable; + // Ensure fixed thread for each producer thread + this.producerExecutorService = Executors.newFixedThreadPool(producers.size(), new CustomizedThreadFactory("executor-queue-producer", preExecuteRunnable)); + // Ensure single thread for consumer + this.consumerExecutorService = Executors.newSingleThreadExecutor(new CustomizedThreadFactory("executor-queue-consumer", preExecuteRunnable)); + } + + /** + * Start all Producers. + */ + public abstract CompletableFuture startProducers(); + + /** + * Start consumer. + */ + protected abstract CompletableFuture startConsumer(); + + /** + * Closing/cleaning up the executor's resources after consuming finished. + */ + protected abstract void postAction(); + + /** + * get bounded in message queue. + */ + public abstract HoodieMessageQueue getQueue(); + + /** + * set all the resources for current HoodieExecutor before start to produce and consume records. + */ + protected abstract void setup(); + + @Override + public boolean awaitTermination() { + // if current thread has been interrupted before awaitTermination was called, we still give + // executor a chance to proceeding. So clear the interrupt flag and reset it if needed before return. + boolean interruptedBefore = Thread.interrupted(); + boolean producerTerminated = false; + boolean consumerTerminated = false; + try { + producerTerminated = producerExecutorService.awaitTermination(TERMINATE_WAITING_TIME_SECS, TimeUnit.SECONDS); + consumerTerminated = consumerExecutorService.awaitTermination(TERMINATE_WAITING_TIME_SECS, TimeUnit.SECONDS); + } catch (InterruptedException ie) { + // fail silently for any other interruption + } + // reset interrupt flag if needed + if (interruptedBefore) { + Thread.currentThread().interrupt(); + } + return producerTerminated && consumerTerminated; + } + + @Override + public void close() { + if (!producerExecutorService.isShutdown()) { + producerExecutorService.shutdown(); + } + if (!consumerExecutorService.isShutdown()) { + consumerExecutorService.shutdown(); + } + } + + /** + * Main API to run both production and consumption. + */ + @Override + public E execute() { + try { + ValidationUtils.checkState(this.consumer.isPresent()); + setup(); + producerFuture = startProducers(); + CompletableFuture future = startConsumer(); + return future.get(); + } catch (InterruptedException ie) { + shutdownNow(); + Thread.currentThread().interrupt(); + throw new HoodieException(ie); + } catch (Exception e) { + throw new HoodieException(e); + } finally { + postAction(); + } + } +} diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/HoodieIterableMessageQueue.java b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/HoodieIterableMessageQueue.java new file mode 100644 index 0000000000000..71ef39f2c1883 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/HoodieIterableMessageQueue.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.util.queue; + +import java.util.Iterator; + +/** + * IteratorBasedHoodieMessageQueue implements HoodieMessageQueue with Iterable + */ +public abstract class HoodieIterableMessageQueue implements HoodieMessageQueue, Iterable { + + public abstract Iterator iterator(); +} diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/HoodieMessageQueue.java b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/HoodieMessageQueue.java new file mode 100644 index 0000000000000..ae226f8adb0a3 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/HoodieMessageQueue.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.util.queue; + +import org.apache.hudi.common.util.Option; +import java.io.Closeable; + +/** + * HoodieMessageQueue holds an internal message queue, and control the behavior of + * 1. insert record into internal message queue. + * 2. get record from internal message queue. + * 3. close internal message queue. + */ +public interface HoodieMessageQueue extends Closeable { + + /** + * Returns the number of elements in this queue. + */ + long size(); + + /** + * Insert a record into inner message queue. + */ + void insertRecord(I t) throws Exception; + + /** + * Read records from inner message queue. + */ + Option readNextRecord(); + + /** + * API to allow producers and consumer to communicate termination due to failure. + */ + void markAsFailed(Throwable e); + + boolean isEmpty(); +} diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryQueueProducer.java b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/HoodieProducer.java similarity index 76% rename from hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryQueueProducer.java rename to hudi-common/src/main/java/org/apache/hudi/common/util/queue/HoodieProducer.java index ecea9f2193c76..f56dd4cce9944 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryQueueProducer.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/HoodieProducer.java @@ -19,16 +19,16 @@ package org.apache.hudi.common.util.queue; /** - * Producer for {@link BoundedInMemoryQueue}. Memory Bounded Buffer supports multiple producers single consumer pattern. + * Producer for {@link HoodieMessageQueue}. Memory Bounded Buffer supports multiple producers single consumer pattern. * * @param Input type for buffer items produced */ -public interface BoundedInMemoryQueueProducer { +public interface HoodieProducer { /** - * API to enqueue entries to memory bounded queue. + * API to enqueue entries to bounded queue. * * @param queue In Memory bounded queue */ - void produce(BoundedInMemoryQueue queue) throws Exception; + void produce(HoodieMessageQueue queue) throws Exception; } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryQueueConsumer.java b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/IteratorBasedQueueConsumer.java similarity index 81% rename from hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryQueueConsumer.java rename to hudi-common/src/main/java/org/apache/hudi/common/util/queue/IteratorBasedQueueConsumer.java index c34842fbe3d01..713d6504645e0 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryQueueConsumer.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/IteratorBasedQueueConsumer.java @@ -23,15 +23,17 @@ /** * Consume entries from queue and execute callback function. */ -public abstract class BoundedInMemoryQueueConsumer { +public abstract class IteratorBasedQueueConsumer implements HoodieConsumer { /** * API to de-queue entries to memory bounded queue. * * @param queue In Memory bounded queue */ - public O consume(BoundedInMemoryQueue queue) throws Exception { - Iterator iterator = queue.iterator(); + @Override + public O consume(HoodieMessageQueue queue) throws Exception { + + Iterator iterator = ((HoodieIterableMessageQueue) queue).iterator(); while (iterator.hasNext()) { consumeOneRecord(iterator.next()); @@ -46,12 +48,12 @@ public O consume(BoundedInMemoryQueue queue) throws Exception { /** * Consumer One record. */ - protected abstract void consumeOneRecord(I record); + public abstract void consumeOneRecord(I record); /** * Notifies implementation that we have exhausted consuming records from queue. */ - protected abstract void finish(); + public void finish(){} /** * Return result of consuming records so far. diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/IteratorBasedQueueProducer.java b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/IteratorBasedQueueProducer.java index 3d11f38e5cf50..7904fd61ebc80 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/IteratorBasedQueueProducer.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/IteratorBasedQueueProducer.java @@ -28,7 +28,7 @@ * * @param Item type produced for the buffer. */ -public class IteratorBasedQueueProducer implements BoundedInMemoryQueueProducer { +public class IteratorBasedQueueProducer implements HoodieProducer { private static final Logger LOG = LogManager.getLogger(IteratorBasedQueueProducer.class); @@ -40,7 +40,7 @@ public IteratorBasedQueueProducer(Iterator inputIterator) { } @Override - public void produce(BoundedInMemoryQueue queue) throws Exception { + public void produce(HoodieMessageQueue queue) throws Exception { LOG.info("starting to buffer records"); while (inputIterator.hasNext()) { queue.insertRecord(inputIterator.next()); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/WaitStrategyFactory.java b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/WaitStrategyFactory.java new file mode 100644 index 0000000000000..8137d2a1360e9 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/WaitStrategyFactory.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.util.queue; + +import static org.apache.hudi.common.util.queue.DisruptorWaitStrategyType.BLOCKING_WAIT; + +import com.lmax.disruptor.BlockingWaitStrategy; +import com.lmax.disruptor.BusySpinWaitStrategy; +import com.lmax.disruptor.SleepingWaitStrategy; +import com.lmax.disruptor.WaitStrategy; +import com.lmax.disruptor.YieldingWaitStrategy; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.exception.HoodieException; + +public class WaitStrategyFactory { + + public static final String DEFAULT_STRATEGY = BLOCKING_WAIT.name(); + + /** + * Build WaitStrategy for disruptor + */ + public static WaitStrategy build(Option name) { + DisruptorWaitStrategyType strategyType = name.isPresent() ? DisruptorWaitStrategyType.valueOf(name.get().toUpperCase()) : BLOCKING_WAIT; + + switch (strategyType) { + case BLOCKING_WAIT: + return new BlockingWaitStrategy(); + case SLEEPING_WAIT: + return new SleepingWaitStrategy(); + case YIELDING_WAIT: + return new YieldingWaitStrategy(); + case BUSY_SPIN_WAIT: + return new BusySpinWaitStrategy(); + default: + throw new HoodieException("Unsupported Executor Type " + name); + } + } +} diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java index f44ec67e5e2f0..2f70a10077fc0 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java @@ -28,7 +28,7 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.ExternalSpillableMap; import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor; -import org.apache.hudi.common.util.queue.BoundedInMemoryQueueProducer; +import org.apache.hudi.common.util.queue.HoodieProducer; import org.apache.hudi.common.util.queue.FunctionBasedQueueProducer; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.configuration.FlinkOptions; @@ -229,8 +229,8 @@ public Iterator> getRecordsIterator() { /** * Setup log and parquet reading in parallel. Both write to central buffer. */ - private List>> getParallelProducers() { - List>> producers = new ArrayList<>(); + private List>> getParallelProducers() { + List>> producers = new ArrayList<>(); producers.add(new FunctionBasedQueueProducer<>(buffer -> { scanner.scan(); return null; diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeUnmergedRecordReader.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeUnmergedRecordReader.java index 84c808865072a..700a87cbb7284 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeUnmergedRecordReader.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeUnmergedRecordReader.java @@ -24,8 +24,8 @@ import org.apache.hudi.common.util.Functions; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor; -import org.apache.hudi.common.util.queue.BoundedInMemoryQueueProducer; import org.apache.hudi.common.util.queue.FunctionBasedQueueProducer; +import org.apache.hudi.common.util.queue.HoodieProducer; import org.apache.hudi.common.util.queue.IteratorBasedQueueProducer; import org.apache.hudi.hadoop.RecordReaderValueIterator; import org.apache.hudi.hadoop.SafeParquetRecordReaderWrapper; @@ -104,8 +104,8 @@ public RealtimeUnmergedRecordReader(RealtimeSplit split, JobConf job, /** * Setup log and parquet reading in parallel. Both write to central buffer. */ - private List> getParallelProducers() { - List> producers = new ArrayList<>(); + private List> getParallelProducers() { + List> producers = new ArrayList<>(); producers.add(new FunctionBasedQueueProducer<>(buffer -> { logRecordScanner.scan(); return null; diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/BoundInMemoryExecutorBenchmark.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/BoundInMemoryExecutorBenchmark.scala new file mode 100644 index 0000000000000..b1d2517374c7e --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/BoundInMemoryExecutorBenchmark.scala @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.benchmark + +import org.apache.hadoop.fs.Path +import org.apache.hudi.HoodieSparkUtils +import org.apache.spark.SparkConf +import org.apache.spark.hudi.benchmark.{HoodieBenchmark, HoodieBenchmarkBase} +import org.apache.spark.sql.hudi.HoodieSparkSessionExtension +import org.apache.spark.sql.types._ +import org.apache.spark.sql.{DataFrame, RowFactory, SaveMode, SparkSession} + +import scala.util.Random + +object BoundInMemoryExecutorBenchmark extends HoodieBenchmarkBase { + + protected val spark: SparkSession = getSparkSession + + val recordNumber = 1000000 + + def getSparkSession: SparkSession = SparkSession.builder() + .master("local[*]") + .appName(this.getClass.getCanonicalName) + .withExtensions(new HoodieSparkSessionExtension) + .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") + .config("spark.sql.session.timeZone", "CTT") + .config(sparkConf()) + .getOrCreate() + + def sparkConf(): SparkConf = { + val sparkConf = new SparkConf() + if (HoodieSparkUtils.gteqSpark3_2) { + sparkConf.set("spark.sql.catalog.spark_catalog", + "org.apache.spark.sql.hudi.catalog.HoodieCatalog") + } + sparkConf + } + + private def createDataFrame(number: Int): DataFrame = { + val schema = new StructType() + .add("c1", IntegerType) + .add("c2", StringType) + + val rdd = spark.sparkContext.parallelize(0 to number, 2).map { item => + val c1 = Integer.valueOf(item) + val c2 = s"abc" + RowFactory.create(c1, c2) + } + spark.createDataFrame(rdd, schema) + } + + /** + * OpenJDK 64-Bit Server VM 1.8.0_161-b14 on Linux 3.10.0-693.21.1.el7.x86_64 + * Intel(R) Xeon(R) Platinum 8259CL CPU @ 2.50GHz + * COW Ingestion: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative + * ------------------------------------------------------------------------------------------------------------------------ + * BoundInMemory Executor 5629 5765 192 0.2 5628.9 1.0X + * Disruptor Executor 2772 2862 127 0.4 2772.2 2.0X + * + */ + private def cowTableDisruptorExecutorBenchmark(tableName: String = "executorBenchmark"): Unit = { + val df = createDataFrame(recordNumber) + withTempDir {f => + val benchmark = new HoodieBenchmark("COW Ingestion", recordNumber) + benchmark.addCase("BoundInMemory Executor") { _ => + val finalTableName = tableName + Random.nextInt(10000) + df.write.format("hudi") + .mode(SaveMode.Overwrite) + .option("hoodie.datasource.write.recordkey.field", "c1") + .option("hoodie.datasource.write.partitionpath.field", "c2") + .option("hoodie.table.name", finalTableName) + .option("hoodie.metadata.enable", "false") + .option("hoodie.clean.automatic", "false") + .option("hoodie.bulkinsert.sort.mode", "NONE") + .option("hoodie.insert.shuffle.parallelism", "2") + .option("hoodie.datasource.write.operation", "bulk_insert") + .option("hoodie.datasource.write.row.writer.enable", "false") + .option("hoodie.bulkinsert.shuffle.parallelism", "1") + .option("hoodie.upsert.shuffle.parallelism", "2") + .option("hoodie.delete.shuffle.parallelism", "2") + .option("hoodie.populate.meta.fields", "false") + .option("hoodie.table.keygenerator.class", "org.apache.hudi.keygen.SimpleKeyGenerator") + .save(new Path(f.getCanonicalPath, finalTableName).toUri.toString) + } + + benchmark.addCase("Disruptor Executor") { _ => + val finalTableName = tableName + Random.nextInt(10000) + df.write.format("hudi") + .mode(SaveMode.Overwrite) + .option("hoodie.datasource.write.recordkey.field", "c1") + .option("hoodie.datasource.write.partitionpath.field", "c2") + .option("hoodie.table.name", finalTableName) + .option("hoodie.metadata.enable", "false") + .option("hoodie.clean.automatic", "false") + .option("hoodie.bulkinsert.sort.mode", "NONE") + .option("hoodie.insert.shuffle.parallelism", "2") + .option("hoodie.datasource.write.operation", "bulk_insert") + .option("hoodie.datasource.write.row.writer.enable", "false") + .option("hoodie.bulkinsert.shuffle.parallelism", "1") + .option("hoodie.upsert.shuffle.parallelism", "2") + .option("hoodie.delete.shuffle.parallelism", "2") + .option("hoodie.write.executor.type", "DISRUPTOR") + .option("hoodie.populate.meta.fields", "false") + .option("hoodie.table.keygenerator.class", "org.apache.hudi.keygen.SimpleKeyGenerator") + + .save(new Path(f.getCanonicalPath, finalTableName).toUri.toString) + } + benchmark.run() + } + } + + override def afterAll(): Unit = { + spark.stop() + } + + override def runBenchmarkSuite(mainArgs: Array[String]): Unit = { + cowTableDisruptorExecutorBenchmark() + } +} diff --git a/pom.xml b/pom.xml index e97647b119adb..3b02c916f7989 100644 --- a/pom.xml +++ b/pom.xml @@ -196,6 +196,7 @@ org.apache.hudi. true 2.7.1 + 3.4.2 4.7 1.12.22 3.21.5