From e647096286d241f1a490800f2f225df203dc72aa Mon Sep 17 00:00:00 2001 From: Y Ethan Guo Date: Fri, 9 Dec 2022 21:41:48 -0800 Subject: [PATCH] [HUDI-5338] Adjust coalesce behavior within NONE sort mode for bulk insert (#7396) This PR adjusts NONE sort mode for bulk insert so that, by default, coalesce is not applied, matching the default parquet write behavior. The NONE sort mode still applies coalesce for clustering as the clustering operation relies on the bulk insert and the specified number of output Spark partitions to write a specific number of files. --- .../hudi/table/BulkInsertPartitioner.java | 18 +++-- .../MultipleSparkJobExecutionStrategy.java | 4 +- .../BulkInsertInternalPartitionerFactory.java | 15 +++- ...ertInternalPartitionerWithRowsFactory.java | 7 +- .../bulkinsert/NonSortPartitioner.java | 33 ++++++++- .../NonSortPartitionerWithRows.java | 34 ++++++++- .../TestBulkInsertInternalPartitioner.java | 73 ++++++++++++++----- ...tBulkInsertInternalPartitionerForRows.java | 69 ++++++++++++------ 8 files changed, 194 insertions(+), 59 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/BulkInsertPartitioner.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/BulkInsertPartitioner.java index 89360c247403d..844e9f4f0f8ac 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/BulkInsertPartitioner.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/BulkInsertPartitioner.java @@ -20,22 +20,28 @@ import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.util.Option; +import org.apache.hudi.execution.bulkinsert.BulkInsertSortMode; import org.apache.hudi.io.WriteHandleFactory; import java.io.Serializable; /** - * Repartition input records into at least expected number of output partitions. It should give below guarantees - - * Output partition will have records from only one hoodie partition. - Average records per output - * partitions should be almost equal to (#inputRecords / #outputPartitions) to avoid possible skews. + * Partitions the input records for bulk insert operation. + *

+ * The actual implementation of {@link BulkInsertPartitioner} is determined by the bulk insert + * sort mode, {@link BulkInsertSortMode}, specified by + * {@code HoodieWriteConfig.BULK_INSERT_SORT_MODE} (`hoodie.bulkinsert.sort.mode`). */ public interface BulkInsertPartitioner extends Serializable { /** - * Repartitions the input records into at least expected number of output partitions. + * Partitions the input records based on the number of output partitions as a hint. + *

+ * Note that, the number of output partitions may or may not be enforced, depending on the + * specific implementation. * - * @param records Input Hoodie records - * @param outputPartitions Expected number of output partitions + * @param records Input Hoodie records. + * @param outputPartitions Expected number of output partitions as a hint. * @return */ I repartitionRecords(I records, int outputPartitions); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java index eea362fe9b86c..b2308bd20a5df 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java @@ -214,9 +214,9 @@ private BulkInsertPartitioner getPartitioner(Map strategy } }).orElse(isRowPartitioner ? BulkInsertInternalPartitionerWithRowsFactory.get( - getWriteConfig().getBulkInsertSortMode(), getHoodieTable().isPartitioned()) + getWriteConfig().getBulkInsertSortMode(), getHoodieTable().isPartitioned(), true) : BulkInsertInternalPartitionerFactory.get( - getWriteConfig().getBulkInsertSortMode(), getHoodieTable().isPartitioned())); + getWriteConfig().getBulkInsertSortMode(), getHoodieTable().isPartitioned(), true)); } /** diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BulkInsertInternalPartitionerFactory.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BulkInsertInternalPartitionerFactory.java index 84bd79e3a22a2..900d2729f105b 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BulkInsertInternalPartitionerFactory.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BulkInsertInternalPartitionerFactory.java @@ -30,13 +30,24 @@ public abstract class BulkInsertInternalPartitionerFactory { public static BulkInsertPartitioner get(HoodieTable table, HoodieWriteConfig config) { - return get(config.getBulkInsertSortMode(), table.isPartitioned()); + return get(config.getBulkInsertSortMode(), table.isPartitioned(), false); + } + + public static BulkInsertPartitioner get( + HoodieTable table, HoodieWriteConfig config, boolean enforceNumOutputPartitions) { + return get(config.getBulkInsertSortMode(), table.isPartitioned(), enforceNumOutputPartitions); } public static BulkInsertPartitioner get(BulkInsertSortMode sortMode, boolean isTablePartitioned) { + return get(sortMode, isTablePartitioned, false); + } + + public static BulkInsertPartitioner get(BulkInsertSortMode sortMode, + boolean isTablePartitioned, + boolean enforceNumOutputPartitions) { switch (sortMode) { case NONE: - return new NonSortPartitioner(); + return new NonSortPartitioner(enforceNumOutputPartitions); case GLOBAL_SORT: return new GlobalSortPartitioner(); case PARTITION_SORT: diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BulkInsertInternalPartitionerWithRowsFactory.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BulkInsertInternalPartitionerWithRowsFactory.java index 720c03948d45b..218eae0dc94ca 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BulkInsertInternalPartitionerWithRowsFactory.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BulkInsertInternalPartitionerWithRowsFactory.java @@ -31,9 +31,14 @@ public abstract class BulkInsertInternalPartitionerWithRowsFactory { public static BulkInsertPartitioner> get(BulkInsertSortMode sortMode, boolean isTablePartitioned) { + return get(sortMode, isTablePartitioned, false); + } + + public static BulkInsertPartitioner> get( + BulkInsertSortMode sortMode, boolean isTablePartitioned, boolean enforceNumOutputPartitions) { switch (sortMode) { case NONE: - return new NonSortPartitionerWithRows(); + return new NonSortPartitionerWithRows(enforceNumOutputPartitions); case GLOBAL_SORT: return new GlobalSortPartitionerWithRows(); case PARTITION_SORT: diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/NonSortPartitioner.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/NonSortPartitioner.java index 19c90ecb1a012..67cd599731c13 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/NonSortPartitioner.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/NonSortPartitioner.java @@ -25,18 +25,45 @@ import org.apache.spark.api.java.JavaRDD; /** - * A built-in partitioner that only does coalesce for input records for bulk insert operation, - * corresponding to the {@code BulkInsertSortMode.NONE} mode. + * A built-in partitioner that avoids expensive sorting for the input records for bulk insert + * operation, by doing either of the following: + *

+ * - If enforcing the outputSparkPartitions, only does coalesce for input records; + *

+ * - Otherwise, returns input records as is. + *

+ * Corresponds to the {@code BulkInsertSortMode.NONE} mode. * * @param HoodieRecordPayload type */ public class NonSortPartitioner implements BulkInsertPartitioner>> { + private final boolean enforceNumOutputPartitions; + + /** + * Default constructor without enforcing the number of output partitions. + */ + public NonSortPartitioner() { + this(false); + } + + /** + * Constructor with `enforceNumOutputPartitions` config. + * + * @param enforceNumOutputPartitions Whether to enforce the number of output partitions. + */ + public NonSortPartitioner(boolean enforceNumOutputPartitions) { + this.enforceNumOutputPartitions = enforceNumOutputPartitions; + } + @Override public JavaRDD> repartitionRecords(JavaRDD> records, int outputSparkPartitions) { - return records.coalesce(outputSparkPartitions); + if (enforceNumOutputPartitions) { + return records.coalesce(outputSparkPartitions); + } + return records; } @Override diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/NonSortPartitionerWithRows.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/NonSortPartitionerWithRows.java index e1c34a8f84062..10ec275064ffe 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/NonSortPartitionerWithRows.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/NonSortPartitionerWithRows.java @@ -24,15 +24,41 @@ import org.apache.spark.sql.Row; /** - * A built-in partitioner that only does coalesce for input Rows for bulk insert operation, - * corresponding to the {@code BulkInsertSortMode.NONE} mode. - * + * A built-in partitioner that avoids expensive sorting for the input Rows for bulk insert + * operation, by doing either of the following: + *

+ * - If enforcing the outputSparkPartitions, only does coalesce for input Rows; + *

+ * - Otherwise, returns input Rows as is. + *

+ * Corresponds to the {@code BulkInsertSortMode.NONE} mode. */ public class NonSortPartitionerWithRows implements BulkInsertPartitioner> { + private final boolean enforceNumOutputPartitions; + + /** + * Default constructor without enforcing the number of output partitions. + */ + public NonSortPartitionerWithRows() { + this(false); + } + + /** + * Constructor with `enforceNumOutputPartitions` config. + * + * @param enforceNumOutputPartitions Whether to enforce the number of output partitions. + */ + public NonSortPartitionerWithRows(boolean enforceNumOutputPartitions) { + this.enforceNumOutputPartitions = enforceNumOutputPartitions; + } + @Override public Dataset repartitionRecords(Dataset rows, int outputSparkPartitions) { - return rows.coalesce(outputSparkPartitions); + if (enforceNumOutputPartitions) { + return rows.coalesce(outputSparkPartitions); + } + return rows; } @Override diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitioner.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitioner.java index d04059e59f9e4..7bc64b5445763 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitioner.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitioner.java @@ -78,14 +78,21 @@ private static JavaRDD generateTripleTestRecordsForBulkInsert(Java } private static Stream configParams() { + // Parameters: + // BulkInsertSortMode sortMode, + // boolean isTablePartitioned, + // boolean enforceNumOutputPartitions, + // boolean isGloballySorted, + // boolean isLocallySorted Object[][] data = new Object[][] { - {BulkInsertSortMode.GLOBAL_SORT, true, true, true}, - {BulkInsertSortMode.PARTITION_SORT, true, false, true}, - {BulkInsertSortMode.PARTITION_PATH_REPARTITION, true, false, false}, - {BulkInsertSortMode.PARTITION_PATH_REPARTITION, false, false, false}, - {BulkInsertSortMode.PARTITION_PATH_REPARTITION_AND_SORT, true, false, false}, - {BulkInsertSortMode.PARTITION_PATH_REPARTITION_AND_SORT, false, false, false}, - {BulkInsertSortMode.NONE, true, false, false} + {BulkInsertSortMode.GLOBAL_SORT, true, true, true, true}, + {BulkInsertSortMode.PARTITION_SORT, true, true, false, true}, + {BulkInsertSortMode.PARTITION_PATH_REPARTITION, true, true, false, false}, + {BulkInsertSortMode.PARTITION_PATH_REPARTITION, false, true, false, false}, + {BulkInsertSortMode.PARTITION_PATH_REPARTITION_AND_SORT, true, true, false, false}, + {BulkInsertSortMode.PARTITION_PATH_REPARTITION_AND_SORT, false, true, false, false}, + {BulkInsertSortMode.NONE, true, true, false, false}, + {BulkInsertSortMode.NONE, true, false, false, false} }; return Stream.of(data).map(Arguments::of); } @@ -99,20 +106,33 @@ private void verifyRecordAscendingOrder(List records, - boolean isGloballySorted, boolean isLocallySorted, + boolean enforceNumOutputPartitions, + boolean isGloballySorted, + boolean isLocallySorted, Map expectedPartitionNumRecords) { - testBulkInsertInternalPartitioner(partitioner, records, isGloballySorted, isLocallySorted, expectedPartitionNumRecords, Option.empty()); + testBulkInsertInternalPartitioner( + partitioner, + records, + enforceNumOutputPartitions, + isGloballySorted, + isLocallySorted, + expectedPartitionNumRecords, + Option.empty()); } private void testBulkInsertInternalPartitioner(BulkInsertPartitioner partitioner, JavaRDD records, - boolean isGloballySorted, boolean isLocallySorted, + boolean enforceNumOutputPartitions, + boolean isGloballySorted, + boolean isLocallySorted, Map expectedPartitionNumRecords, Option>> comparator) { int numPartitions = 2; JavaRDD> actualRecords = (JavaRDD>) partitioner.repartitionRecords(records, numPartitions); - assertEquals(numPartitions, actualRecords.getNumPartitions()); + assertEquals( + enforceNumOutputPartitions ? numPartitions : records.getNumPartitions(), + actualRecords.getNumPartitions()); List> collectedActualRecords = actualRecords.collect(); if (isGloballySorted) { // Verify global order @@ -137,18 +157,31 @@ private void testBulkInsertInternalPartitioner(BulkInsertPartitioner partitioner assertEquals(expectedPartitionNumRecords, actualPartitionNumRecords); } - @ParameterizedTest(name = "[{index}] {0} isTablePartitioned={1}") + @ParameterizedTest(name = "[{index}] {0} isTablePartitioned={1} enforceNumOutputPartitions={2}") @MethodSource("configParams") public void testBulkInsertInternalPartitioner(BulkInsertSortMode sortMode, boolean isTablePartitioned, + boolean enforceNumOutputPartitions, boolean isGloballySorted, boolean isLocallySorted) { JavaRDD records1 = generateTestRecordsForBulkInsert(jsc); JavaRDD records2 = generateTripleTestRecordsForBulkInsert(jsc); - testBulkInsertInternalPartitioner(BulkInsertInternalPartitionerFactory.get(sortMode, isTablePartitioned), - records1, isGloballySorted, isLocallySorted, generateExpectedPartitionNumRecords(records1)); - testBulkInsertInternalPartitioner(BulkInsertInternalPartitionerFactory.get(sortMode, isTablePartitioned), - records2, isGloballySorted, isLocallySorted, generateExpectedPartitionNumRecords(records2)); + testBulkInsertInternalPartitioner( + BulkInsertInternalPartitionerFactory.get( + sortMode, isTablePartitioned, enforceNumOutputPartitions), + records1, + enforceNumOutputPartitions, + isGloballySorted, + isLocallySorted, + generateExpectedPartitionNumRecords(records1)); + testBulkInsertInternalPartitioner( + BulkInsertInternalPartitionerFactory.get( + sortMode, isTablePartitioned, enforceNumOutputPartitions), + records2, + enforceNumOutputPartitions, + isGloballySorted, + isLocallySorted, + generateExpectedPartitionNumRecords(records2)); } @Test @@ -160,9 +193,9 @@ public void testCustomColumnSortPartitioner() { JavaRDD records1 = generateTestRecordsForBulkInsert(jsc); JavaRDD records2 = generateTripleTestRecordsForBulkInsert(jsc); testBulkInsertInternalPartitioner(new RDDCustomColumnsSortPartitioner(sortColumns, HoodieTestDataGenerator.AVRO_SCHEMA, false), - records1, true, true, generateExpectedPartitionNumRecords(records1), Option.of(columnComparator)); + records1, true, true, true, generateExpectedPartitionNumRecords(records1), Option.of(columnComparator)); testBulkInsertInternalPartitioner(new RDDCustomColumnsSortPartitioner(sortColumns, HoodieTestDataGenerator.AVRO_SCHEMA, false), - records2, true, true, generateExpectedPartitionNumRecords(records2), Option.of(columnComparator)); + records2, true, true, true, generateExpectedPartitionNumRecords(records2), Option.of(columnComparator)); HoodieWriteConfig config = HoodieWriteConfig .newBuilder() @@ -172,9 +205,9 @@ public void testCustomColumnSortPartitioner() { .withUserDefinedBulkInsertPartitionerSortColumns(sortColumnString) .build(); testBulkInsertInternalPartitioner(new RDDCustomColumnsSortPartitioner(config), - records1, true, true, generateExpectedPartitionNumRecords(records1), Option.of(columnComparator)); + records1, true, true, true, generateExpectedPartitionNumRecords(records1), Option.of(columnComparator)); testBulkInsertInternalPartitioner(new RDDCustomColumnsSortPartitioner(config), - records2, true, true, generateExpectedPartitionNumRecords(records2), Option.of(columnComparator)); + records2, true, true, true, generateExpectedPartitionNumRecords(records2), Option.of(columnComparator)); } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitionerForRows.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitionerForRows.java index 6223c8bb9eb5b..de827f7a450ce 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitionerForRows.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitionerForRows.java @@ -66,32 +66,53 @@ public void tearDown() throws Exception { } private static Stream configParams() { + // Parameters: + // BulkInsertSortMode sortMode, + // boolean isTablePartitioned, + // boolean enforceNumOutputPartitions, + // boolean isGloballySorted, + // boolean isLocallySorted Object[][] data = new Object[][] { - {BulkInsertSortMode.GLOBAL_SORT, true, true, true}, - {BulkInsertSortMode.PARTITION_SORT, true, false, true}, - {BulkInsertSortMode.PARTITION_PATH_REPARTITION, true, false, false}, - {BulkInsertSortMode.PARTITION_PATH_REPARTITION, false, false, false}, - {BulkInsertSortMode.PARTITION_PATH_REPARTITION_AND_SORT, true, false, false}, - {BulkInsertSortMode.PARTITION_PATH_REPARTITION_AND_SORT, false, false, false}, - - {BulkInsertSortMode.NONE, true, false, false} + {BulkInsertSortMode.GLOBAL_SORT, true, true, true, true}, + {BulkInsertSortMode.PARTITION_SORT, true, true, false, true}, + {BulkInsertSortMode.PARTITION_PATH_REPARTITION, true, true, false, false}, + {BulkInsertSortMode.PARTITION_PATH_REPARTITION, false, true, false, false}, + {BulkInsertSortMode.PARTITION_PATH_REPARTITION_AND_SORT, true, true, false, false}, + {BulkInsertSortMode.PARTITION_PATH_REPARTITION_AND_SORT, false, true, false, false}, + {BulkInsertSortMode.NONE, true, true, false, false}, + {BulkInsertSortMode.NONE, true, false, false, false} }; return Stream.of(data).map(Arguments::of); } - @ParameterizedTest(name = "[{index}] {0} isTablePartitioned={1}") + @ParameterizedTest(name = "[{index}] {0} isTablePartitioned={1} enforceNumOutputPartitions={2}") @MethodSource("configParams") public void testBulkInsertInternalPartitioner(BulkInsertSortMode sortMode, boolean isTablePartitioned, + boolean enforceNumOutputPartitions, boolean isGloballySorted, boolean isLocallySorted) throws Exception { Dataset records1 = generateTestRecords(); Dataset records2 = generateTestRecords(); - testBulkInsertInternalPartitioner(BulkInsertInternalPartitionerWithRowsFactory.get(sortMode, isTablePartitioned), - records1, isGloballySorted, isLocallySorted, generateExpectedPartitionNumRecords(records1), Option.empty()); - testBulkInsertInternalPartitioner(BulkInsertInternalPartitionerWithRowsFactory.get(sortMode, isTablePartitioned), - records2, isGloballySorted, isLocallySorted, generateExpectedPartitionNumRecords(records2), Option.empty()); + testBulkInsertInternalPartitioner( + BulkInsertInternalPartitionerWithRowsFactory.get( + sortMode, isTablePartitioned, enforceNumOutputPartitions), + records1, + enforceNumOutputPartitions, + isGloballySorted, + isLocallySorted, + generateExpectedPartitionNumRecords(records1), + Option.empty()); + testBulkInsertInternalPartitioner( + BulkInsertInternalPartitionerWithRowsFactory.get( + sortMode, isTablePartitioned, enforceNumOutputPartitions), + records2, + enforceNumOutputPartitions, + isGloballySorted, + isLocallySorted, + generateExpectedPartitionNumRecords(records2), + Option.empty()); } @Test @@ -103,9 +124,9 @@ public void testCustomColumnSortPartitionerWithRows() { Comparator comparator = getCustomColumnComparator(sortColumns); testBulkInsertInternalPartitioner(new RowCustomColumnsSortPartitioner(sortColumns), - records1, false, true, generateExpectedPartitionNumRecords(records1), Option.of(comparator)); + records1, true, false, true, generateExpectedPartitionNumRecords(records1), Option.of(comparator)); testBulkInsertInternalPartitioner(new RowCustomColumnsSortPartitioner(sortColumns), - records2, false, true, generateExpectedPartitionNumRecords(records2), Option.of(comparator)); + records2, true, false, true, generateExpectedPartitionNumRecords(records2), Option.of(comparator)); HoodieWriteConfig config = HoodieWriteConfig .newBuilder() @@ -114,18 +135,24 @@ public void testCustomColumnSortPartitionerWithRows() { .withUserDefinedBulkInsertPartitionerSortColumns(sortColumnString) .build(); testBulkInsertInternalPartitioner(new RowCustomColumnsSortPartitioner(config), - records1, false, true, generateExpectedPartitionNumRecords(records1), Option.of(comparator)); + records1, true, false, true, generateExpectedPartitionNumRecords(records1), Option.of(comparator)); testBulkInsertInternalPartitioner(new RowCustomColumnsSortPartitioner(config), - records2, false, true, generateExpectedPartitionNumRecords(records2), Option.of(comparator)); + records2, true, false, true, generateExpectedPartitionNumRecords(records2), Option.of(comparator)); } private void testBulkInsertInternalPartitioner(BulkInsertPartitioner partitioner, - Dataset rows, - boolean isGloballySorted, boolean isLocallySorted, - Map expectedPartitionNumRecords, - Option> comparator) { + Dataset rows, + boolean enforceNumOutputPartitions, + boolean isGloballySorted, + boolean isLocallySorted, + Map expectedPartitionNumRecords, + Option> comparator) { int numPartitions = 2; Dataset actualRecords = (Dataset) partitioner.repartitionRecords(rows, numPartitions); + assertEquals( + enforceNumOutputPartitions ? numPartitions : rows.rdd().getNumPartitions(), + actualRecords.rdd().getNumPartitions()); + List collectedActualRecords = actualRecords.collectAsList(); if (isGloballySorted) { // Verify global order