diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/BulkInsertPartitioner.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/BulkInsertPartitioner.java index 89360c247403d..844e9f4f0f8ac 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/BulkInsertPartitioner.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/BulkInsertPartitioner.java @@ -20,22 +20,28 @@ import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.util.Option; +import org.apache.hudi.execution.bulkinsert.BulkInsertSortMode; import org.apache.hudi.io.WriteHandleFactory; import java.io.Serializable; /** - * Repartition input records into at least expected number of output partitions. It should give below guarantees - - * Output partition will have records from only one hoodie partition. - Average records per output - * partitions should be almost equal to (#inputRecords / #outputPartitions) to avoid possible skews. + * Partitions the input records for bulk insert operation. + *

+ * The actual implementation of {@link BulkInsertPartitioner} is determined by the bulk insert + * sort mode, {@link BulkInsertSortMode}, specified by + * {@code HoodieWriteConfig.BULK_INSERT_SORT_MODE} (`hoodie.bulkinsert.sort.mode`). */ public interface BulkInsertPartitioner extends Serializable { /** - * Repartitions the input records into at least expected number of output partitions. + * Partitions the input records based on the number of output partitions as a hint. + *

+ * Note that, the number of output partitions may or may not be enforced, depending on the + * specific implementation. * - * @param records Input Hoodie records - * @param outputPartitions Expected number of output partitions + * @param records Input Hoodie records. + * @param outputPartitions Expected number of output partitions as a hint. * @return */ I repartitionRecords(I records, int outputPartitions); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java index eea362fe9b86c..b2308bd20a5df 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java @@ -214,9 +214,9 @@ private BulkInsertPartitioner getPartitioner(Map strategy } }).orElse(isRowPartitioner ? BulkInsertInternalPartitionerWithRowsFactory.get( - getWriteConfig().getBulkInsertSortMode(), getHoodieTable().isPartitioned()) + getWriteConfig().getBulkInsertSortMode(), getHoodieTable().isPartitioned(), true) : BulkInsertInternalPartitionerFactory.get( - getWriteConfig().getBulkInsertSortMode(), getHoodieTable().isPartitioned())); + getWriteConfig().getBulkInsertSortMode(), getHoodieTable().isPartitioned(), true)); } /** diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BulkInsertInternalPartitionerFactory.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BulkInsertInternalPartitionerFactory.java index 84bd79e3a22a2..900d2729f105b 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BulkInsertInternalPartitionerFactory.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BulkInsertInternalPartitionerFactory.java @@ -30,13 +30,24 @@ public abstract class BulkInsertInternalPartitionerFactory { public static BulkInsertPartitioner get(HoodieTable table, HoodieWriteConfig config) { - return get(config.getBulkInsertSortMode(), table.isPartitioned()); + return get(config.getBulkInsertSortMode(), table.isPartitioned(), false); + } + + public static BulkInsertPartitioner get( + HoodieTable table, HoodieWriteConfig config, boolean enforceNumOutputPartitions) { + return get(config.getBulkInsertSortMode(), table.isPartitioned(), enforceNumOutputPartitions); } public static BulkInsertPartitioner get(BulkInsertSortMode sortMode, boolean isTablePartitioned) { + return get(sortMode, isTablePartitioned, false); + } + + public static BulkInsertPartitioner get(BulkInsertSortMode sortMode, + boolean isTablePartitioned, + boolean enforceNumOutputPartitions) { switch (sortMode) { case NONE: - return new NonSortPartitioner(); + return new NonSortPartitioner(enforceNumOutputPartitions); case GLOBAL_SORT: return new GlobalSortPartitioner(); case PARTITION_SORT: diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BulkInsertInternalPartitionerWithRowsFactory.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BulkInsertInternalPartitionerWithRowsFactory.java index 720c03948d45b..218eae0dc94ca 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BulkInsertInternalPartitionerWithRowsFactory.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BulkInsertInternalPartitionerWithRowsFactory.java @@ -31,9 +31,14 @@ public abstract class BulkInsertInternalPartitionerWithRowsFactory { public static BulkInsertPartitioner> get(BulkInsertSortMode sortMode, boolean isTablePartitioned) { + return get(sortMode, isTablePartitioned, false); + } + + public static BulkInsertPartitioner> get( + BulkInsertSortMode sortMode, boolean isTablePartitioned, boolean enforceNumOutputPartitions) { switch (sortMode) { case NONE: - return new NonSortPartitionerWithRows(); + return new NonSortPartitionerWithRows(enforceNumOutputPartitions); case GLOBAL_SORT: return new GlobalSortPartitionerWithRows(); case PARTITION_SORT: diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/NonSortPartitioner.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/NonSortPartitioner.java index 19c90ecb1a012..67cd599731c13 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/NonSortPartitioner.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/NonSortPartitioner.java @@ -25,18 +25,45 @@ import org.apache.spark.api.java.JavaRDD; /** - * A built-in partitioner that only does coalesce for input records for bulk insert operation, - * corresponding to the {@code BulkInsertSortMode.NONE} mode. + * A built-in partitioner that avoids expensive sorting for the input records for bulk insert + * operation, by doing either of the following: + *

+ * - If enforcing the outputSparkPartitions, only does coalesce for input records; + *

+ * - Otherwise, returns input records as is. + *

+ * Corresponds to the {@code BulkInsertSortMode.NONE} mode. * * @param HoodieRecordPayload type */ public class NonSortPartitioner implements BulkInsertPartitioner>> { + private final boolean enforceNumOutputPartitions; + + /** + * Default constructor without enforcing the number of output partitions. + */ + public NonSortPartitioner() { + this(false); + } + + /** + * Constructor with `enforceNumOutputPartitions` config. + * + * @param enforceNumOutputPartitions Whether to enforce the number of output partitions. + */ + public NonSortPartitioner(boolean enforceNumOutputPartitions) { + this.enforceNumOutputPartitions = enforceNumOutputPartitions; + } + @Override public JavaRDD> repartitionRecords(JavaRDD> records, int outputSparkPartitions) { - return records.coalesce(outputSparkPartitions); + if (enforceNumOutputPartitions) { + return records.coalesce(outputSparkPartitions); + } + return records; } @Override diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/NonSortPartitionerWithRows.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/NonSortPartitionerWithRows.java index e1c34a8f84062..10ec275064ffe 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/NonSortPartitionerWithRows.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/NonSortPartitionerWithRows.java @@ -24,15 +24,41 @@ import org.apache.spark.sql.Row; /** - * A built-in partitioner that only does coalesce for input Rows for bulk insert operation, - * corresponding to the {@code BulkInsertSortMode.NONE} mode. - * + * A built-in partitioner that avoids expensive sorting for the input Rows for bulk insert + * operation, by doing either of the following: + *

+ * - If enforcing the outputSparkPartitions, only does coalesce for input Rows; + *

+ * - Otherwise, returns input Rows as is. + *

+ * Corresponds to the {@code BulkInsertSortMode.NONE} mode. */ public class NonSortPartitionerWithRows implements BulkInsertPartitioner> { + private final boolean enforceNumOutputPartitions; + + /** + * Default constructor without enforcing the number of output partitions. + */ + public NonSortPartitionerWithRows() { + this(false); + } + + /** + * Constructor with `enforceNumOutputPartitions` config. + * + * @param enforceNumOutputPartitions Whether to enforce the number of output partitions. + */ + public NonSortPartitionerWithRows(boolean enforceNumOutputPartitions) { + this.enforceNumOutputPartitions = enforceNumOutputPartitions; + } + @Override public Dataset repartitionRecords(Dataset rows, int outputSparkPartitions) { - return rows.coalesce(outputSparkPartitions); + if (enforceNumOutputPartitions) { + return rows.coalesce(outputSparkPartitions); + } + return rows; } @Override diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitioner.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitioner.java index d04059e59f9e4..7bc64b5445763 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitioner.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitioner.java @@ -78,14 +78,21 @@ private static JavaRDD generateTripleTestRecordsForBulkInsert(Java } private static Stream configParams() { + // Parameters: + // BulkInsertSortMode sortMode, + // boolean isTablePartitioned, + // boolean enforceNumOutputPartitions, + // boolean isGloballySorted, + // boolean isLocallySorted Object[][] data = new Object[][] { - {BulkInsertSortMode.GLOBAL_SORT, true, true, true}, - {BulkInsertSortMode.PARTITION_SORT, true, false, true}, - {BulkInsertSortMode.PARTITION_PATH_REPARTITION, true, false, false}, - {BulkInsertSortMode.PARTITION_PATH_REPARTITION, false, false, false}, - {BulkInsertSortMode.PARTITION_PATH_REPARTITION_AND_SORT, true, false, false}, - {BulkInsertSortMode.PARTITION_PATH_REPARTITION_AND_SORT, false, false, false}, - {BulkInsertSortMode.NONE, true, false, false} + {BulkInsertSortMode.GLOBAL_SORT, true, true, true, true}, + {BulkInsertSortMode.PARTITION_SORT, true, true, false, true}, + {BulkInsertSortMode.PARTITION_PATH_REPARTITION, true, true, false, false}, + {BulkInsertSortMode.PARTITION_PATH_REPARTITION, false, true, false, false}, + {BulkInsertSortMode.PARTITION_PATH_REPARTITION_AND_SORT, true, true, false, false}, + {BulkInsertSortMode.PARTITION_PATH_REPARTITION_AND_SORT, false, true, false, false}, + {BulkInsertSortMode.NONE, true, true, false, false}, + {BulkInsertSortMode.NONE, true, false, false, false} }; return Stream.of(data).map(Arguments::of); } @@ -99,20 +106,33 @@ private void verifyRecordAscendingOrder(List records, - boolean isGloballySorted, boolean isLocallySorted, + boolean enforceNumOutputPartitions, + boolean isGloballySorted, + boolean isLocallySorted, Map expectedPartitionNumRecords) { - testBulkInsertInternalPartitioner(partitioner, records, isGloballySorted, isLocallySorted, expectedPartitionNumRecords, Option.empty()); + testBulkInsertInternalPartitioner( + partitioner, + records, + enforceNumOutputPartitions, + isGloballySorted, + isLocallySorted, + expectedPartitionNumRecords, + Option.empty()); } private void testBulkInsertInternalPartitioner(BulkInsertPartitioner partitioner, JavaRDD records, - boolean isGloballySorted, boolean isLocallySorted, + boolean enforceNumOutputPartitions, + boolean isGloballySorted, + boolean isLocallySorted, Map expectedPartitionNumRecords, Option>> comparator) { int numPartitions = 2; JavaRDD> actualRecords = (JavaRDD>) partitioner.repartitionRecords(records, numPartitions); - assertEquals(numPartitions, actualRecords.getNumPartitions()); + assertEquals( + enforceNumOutputPartitions ? numPartitions : records.getNumPartitions(), + actualRecords.getNumPartitions()); List> collectedActualRecords = actualRecords.collect(); if (isGloballySorted) { // Verify global order @@ -137,18 +157,31 @@ private void testBulkInsertInternalPartitioner(BulkInsertPartitioner partitioner assertEquals(expectedPartitionNumRecords, actualPartitionNumRecords); } - @ParameterizedTest(name = "[{index}] {0} isTablePartitioned={1}") + @ParameterizedTest(name = "[{index}] {0} isTablePartitioned={1} enforceNumOutputPartitions={2}") @MethodSource("configParams") public void testBulkInsertInternalPartitioner(BulkInsertSortMode sortMode, boolean isTablePartitioned, + boolean enforceNumOutputPartitions, boolean isGloballySorted, boolean isLocallySorted) { JavaRDD records1 = generateTestRecordsForBulkInsert(jsc); JavaRDD records2 = generateTripleTestRecordsForBulkInsert(jsc); - testBulkInsertInternalPartitioner(BulkInsertInternalPartitionerFactory.get(sortMode, isTablePartitioned), - records1, isGloballySorted, isLocallySorted, generateExpectedPartitionNumRecords(records1)); - testBulkInsertInternalPartitioner(BulkInsertInternalPartitionerFactory.get(sortMode, isTablePartitioned), - records2, isGloballySorted, isLocallySorted, generateExpectedPartitionNumRecords(records2)); + testBulkInsertInternalPartitioner( + BulkInsertInternalPartitionerFactory.get( + sortMode, isTablePartitioned, enforceNumOutputPartitions), + records1, + enforceNumOutputPartitions, + isGloballySorted, + isLocallySorted, + generateExpectedPartitionNumRecords(records1)); + testBulkInsertInternalPartitioner( + BulkInsertInternalPartitionerFactory.get( + sortMode, isTablePartitioned, enforceNumOutputPartitions), + records2, + enforceNumOutputPartitions, + isGloballySorted, + isLocallySorted, + generateExpectedPartitionNumRecords(records2)); } @Test @@ -160,9 +193,9 @@ public void testCustomColumnSortPartitioner() { JavaRDD records1 = generateTestRecordsForBulkInsert(jsc); JavaRDD records2 = generateTripleTestRecordsForBulkInsert(jsc); testBulkInsertInternalPartitioner(new RDDCustomColumnsSortPartitioner(sortColumns, HoodieTestDataGenerator.AVRO_SCHEMA, false), - records1, true, true, generateExpectedPartitionNumRecords(records1), Option.of(columnComparator)); + records1, true, true, true, generateExpectedPartitionNumRecords(records1), Option.of(columnComparator)); testBulkInsertInternalPartitioner(new RDDCustomColumnsSortPartitioner(sortColumns, HoodieTestDataGenerator.AVRO_SCHEMA, false), - records2, true, true, generateExpectedPartitionNumRecords(records2), Option.of(columnComparator)); + records2, true, true, true, generateExpectedPartitionNumRecords(records2), Option.of(columnComparator)); HoodieWriteConfig config = HoodieWriteConfig .newBuilder() @@ -172,9 +205,9 @@ public void testCustomColumnSortPartitioner() { .withUserDefinedBulkInsertPartitionerSortColumns(sortColumnString) .build(); testBulkInsertInternalPartitioner(new RDDCustomColumnsSortPartitioner(config), - records1, true, true, generateExpectedPartitionNumRecords(records1), Option.of(columnComparator)); + records1, true, true, true, generateExpectedPartitionNumRecords(records1), Option.of(columnComparator)); testBulkInsertInternalPartitioner(new RDDCustomColumnsSortPartitioner(config), - records2, true, true, generateExpectedPartitionNumRecords(records2), Option.of(columnComparator)); + records2, true, true, true, generateExpectedPartitionNumRecords(records2), Option.of(columnComparator)); } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitionerForRows.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitionerForRows.java index 6223c8bb9eb5b..de827f7a450ce 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitionerForRows.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitionerForRows.java @@ -66,32 +66,53 @@ public void tearDown() throws Exception { } private static Stream configParams() { + // Parameters: + // BulkInsertSortMode sortMode, + // boolean isTablePartitioned, + // boolean enforceNumOutputPartitions, + // boolean isGloballySorted, + // boolean isLocallySorted Object[][] data = new Object[][] { - {BulkInsertSortMode.GLOBAL_SORT, true, true, true}, - {BulkInsertSortMode.PARTITION_SORT, true, false, true}, - {BulkInsertSortMode.PARTITION_PATH_REPARTITION, true, false, false}, - {BulkInsertSortMode.PARTITION_PATH_REPARTITION, false, false, false}, - {BulkInsertSortMode.PARTITION_PATH_REPARTITION_AND_SORT, true, false, false}, - {BulkInsertSortMode.PARTITION_PATH_REPARTITION_AND_SORT, false, false, false}, - - {BulkInsertSortMode.NONE, true, false, false} + {BulkInsertSortMode.GLOBAL_SORT, true, true, true, true}, + {BulkInsertSortMode.PARTITION_SORT, true, true, false, true}, + {BulkInsertSortMode.PARTITION_PATH_REPARTITION, true, true, false, false}, + {BulkInsertSortMode.PARTITION_PATH_REPARTITION, false, true, false, false}, + {BulkInsertSortMode.PARTITION_PATH_REPARTITION_AND_SORT, true, true, false, false}, + {BulkInsertSortMode.PARTITION_PATH_REPARTITION_AND_SORT, false, true, false, false}, + {BulkInsertSortMode.NONE, true, true, false, false}, + {BulkInsertSortMode.NONE, true, false, false, false} }; return Stream.of(data).map(Arguments::of); } - @ParameterizedTest(name = "[{index}] {0} isTablePartitioned={1}") + @ParameterizedTest(name = "[{index}] {0} isTablePartitioned={1} enforceNumOutputPartitions={2}") @MethodSource("configParams") public void testBulkInsertInternalPartitioner(BulkInsertSortMode sortMode, boolean isTablePartitioned, + boolean enforceNumOutputPartitions, boolean isGloballySorted, boolean isLocallySorted) throws Exception { Dataset records1 = generateTestRecords(); Dataset records2 = generateTestRecords(); - testBulkInsertInternalPartitioner(BulkInsertInternalPartitionerWithRowsFactory.get(sortMode, isTablePartitioned), - records1, isGloballySorted, isLocallySorted, generateExpectedPartitionNumRecords(records1), Option.empty()); - testBulkInsertInternalPartitioner(BulkInsertInternalPartitionerWithRowsFactory.get(sortMode, isTablePartitioned), - records2, isGloballySorted, isLocallySorted, generateExpectedPartitionNumRecords(records2), Option.empty()); + testBulkInsertInternalPartitioner( + BulkInsertInternalPartitionerWithRowsFactory.get( + sortMode, isTablePartitioned, enforceNumOutputPartitions), + records1, + enforceNumOutputPartitions, + isGloballySorted, + isLocallySorted, + generateExpectedPartitionNumRecords(records1), + Option.empty()); + testBulkInsertInternalPartitioner( + BulkInsertInternalPartitionerWithRowsFactory.get( + sortMode, isTablePartitioned, enforceNumOutputPartitions), + records2, + enforceNumOutputPartitions, + isGloballySorted, + isLocallySorted, + generateExpectedPartitionNumRecords(records2), + Option.empty()); } @Test @@ -103,9 +124,9 @@ public void testCustomColumnSortPartitionerWithRows() { Comparator comparator = getCustomColumnComparator(sortColumns); testBulkInsertInternalPartitioner(new RowCustomColumnsSortPartitioner(sortColumns), - records1, false, true, generateExpectedPartitionNumRecords(records1), Option.of(comparator)); + records1, true, false, true, generateExpectedPartitionNumRecords(records1), Option.of(comparator)); testBulkInsertInternalPartitioner(new RowCustomColumnsSortPartitioner(sortColumns), - records2, false, true, generateExpectedPartitionNumRecords(records2), Option.of(comparator)); + records2, true, false, true, generateExpectedPartitionNumRecords(records2), Option.of(comparator)); HoodieWriteConfig config = HoodieWriteConfig .newBuilder() @@ -114,18 +135,24 @@ public void testCustomColumnSortPartitionerWithRows() { .withUserDefinedBulkInsertPartitionerSortColumns(sortColumnString) .build(); testBulkInsertInternalPartitioner(new RowCustomColumnsSortPartitioner(config), - records1, false, true, generateExpectedPartitionNumRecords(records1), Option.of(comparator)); + records1, true, false, true, generateExpectedPartitionNumRecords(records1), Option.of(comparator)); testBulkInsertInternalPartitioner(new RowCustomColumnsSortPartitioner(config), - records2, false, true, generateExpectedPartitionNumRecords(records2), Option.of(comparator)); + records2, true, false, true, generateExpectedPartitionNumRecords(records2), Option.of(comparator)); } private void testBulkInsertInternalPartitioner(BulkInsertPartitioner partitioner, - Dataset rows, - boolean isGloballySorted, boolean isLocallySorted, - Map expectedPartitionNumRecords, - Option> comparator) { + Dataset rows, + boolean enforceNumOutputPartitions, + boolean isGloballySorted, + boolean isLocallySorted, + Map expectedPartitionNumRecords, + Option> comparator) { int numPartitions = 2; Dataset actualRecords = (Dataset) partitioner.repartitionRecords(rows, numPartitions); + assertEquals( + enforceNumOutputPartitions ? numPartitions : rows.rdd().getNumPartitions(), + actualRecords.rdd().getNumPartitions()); + List collectedActualRecords = actualRecords.collectAsList(); if (isGloballySorted) { // Verify global order