Skip to content

Commit

Permalink
[HUDI-5338] Adjust coalesce behavior within NONE sort mode for bulk i…
Browse files Browse the repository at this point in the history
…nsert (apache#7396)

This PR adjusts NONE sort mode for bulk insert so that, by default, coalesce is not applied, matching the default parquet write behavior. The NONE sort mode still applies coalesce for clustering as the clustering operation relies on the bulk insert and the specified number of output Spark partitions to write a specific number of files.
  • Loading branch information
yihua authored and Alexey Kudinkin committed Dec 14, 2022
1 parent 73ad817 commit 65468d9
Show file tree
Hide file tree
Showing 8 changed files with 194 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,28 @@

import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.execution.bulkinsert.BulkInsertSortMode;
import org.apache.hudi.io.WriteHandleFactory;

import java.io.Serializable;

/**
* Repartition input records into at least expected number of output partitions. It should give below guarantees -
* Output partition will have records from only one hoodie partition. - Average records per output
* partitions should be almost equal to (#inputRecords / #outputPartitions) to avoid possible skews.
* Partitions the input records for bulk insert operation.
* <p>
* The actual implementation of {@link BulkInsertPartitioner} is determined by the bulk insert
* sort mode, {@link BulkInsertSortMode}, specified by
* {@code HoodieWriteConfig.BULK_INSERT_SORT_MODE} (`hoodie.bulkinsert.sort.mode`).
*/
public interface BulkInsertPartitioner<I> extends Serializable {

/**
* Repartitions the input records into at least expected number of output partitions.
* Partitions the input records based on the number of output partitions as a hint.
* <p>
* Note that, the number of output partitions may or may not be enforced, depending on the
* specific implementation.
*
* @param records Input Hoodie records
* @param outputPartitions Expected number of output partitions
* @param records Input Hoodie records.
* @param outputPartitions Expected number of output partitions as a hint.
* @return
*/
I repartitionRecords(I records, int outputPartitions);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,9 +214,9 @@ private <I> BulkInsertPartitioner<I> getPartitioner(Map<String, String> strategy
}
}).orElse(isRowPartitioner
? BulkInsertInternalPartitionerWithRowsFactory.get(
getWriteConfig().getBulkInsertSortMode(), getHoodieTable().isPartitioned())
getWriteConfig().getBulkInsertSortMode(), getHoodieTable().isPartitioned(), true)
: BulkInsertInternalPartitionerFactory.get(
getWriteConfig().getBulkInsertSortMode(), getHoodieTable().isPartitioned()));
getWriteConfig().getBulkInsertSortMode(), getHoodieTable().isPartitioned(), true));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,24 @@
public abstract class BulkInsertInternalPartitionerFactory {

public static BulkInsertPartitioner get(HoodieTable table, HoodieWriteConfig config) {
return get(config.getBulkInsertSortMode(), table.isPartitioned());
return get(config.getBulkInsertSortMode(), table.isPartitioned(), false);
}

public static BulkInsertPartitioner get(
HoodieTable table, HoodieWriteConfig config, boolean enforceNumOutputPartitions) {
return get(config.getBulkInsertSortMode(), table.isPartitioned(), enforceNumOutputPartitions);
}

public static BulkInsertPartitioner get(BulkInsertSortMode sortMode, boolean isTablePartitioned) {
return get(sortMode, isTablePartitioned, false);
}

public static BulkInsertPartitioner get(BulkInsertSortMode sortMode,
boolean isTablePartitioned,
boolean enforceNumOutputPartitions) {
switch (sortMode) {
case NONE:
return new NonSortPartitioner();
return new NonSortPartitioner(enforceNumOutputPartitions);
case GLOBAL_SORT:
return new GlobalSortPartitioner();
case PARTITION_SORT:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,14 @@ public abstract class BulkInsertInternalPartitionerWithRowsFactory {

public static BulkInsertPartitioner<Dataset<Row>> get(BulkInsertSortMode sortMode,
boolean isTablePartitioned) {
return get(sortMode, isTablePartitioned, false);
}

public static BulkInsertPartitioner<Dataset<Row>> get(
BulkInsertSortMode sortMode, boolean isTablePartitioned, boolean enforceNumOutputPartitions) {
switch (sortMode) {
case NONE:
return new NonSortPartitionerWithRows();
return new NonSortPartitionerWithRows(enforceNumOutputPartitions);
case GLOBAL_SORT:
return new GlobalSortPartitionerWithRows();
case PARTITION_SORT:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,45 @@
import org.apache.spark.api.java.JavaRDD;

/**
* A built-in partitioner that only does coalesce for input records for bulk insert operation,
* corresponding to the {@code BulkInsertSortMode.NONE} mode.
* A built-in partitioner that avoids expensive sorting for the input records for bulk insert
* operation, by doing either of the following:
* <p>
* - If enforcing the outputSparkPartitions, only does coalesce for input records;
* <p>
* - Otherwise, returns input records as is.
* <p>
* Corresponds to the {@code BulkInsertSortMode.NONE} mode.
*
* @param <T> HoodieRecordPayload type
*/
public class NonSortPartitioner<T extends HoodieRecordPayload>
implements BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>> {

private final boolean enforceNumOutputPartitions;

/**
* Default constructor without enforcing the number of output partitions.
*/
public NonSortPartitioner() {
this(false);
}

/**
* Constructor with `enforceNumOutputPartitions` config.
*
* @param enforceNumOutputPartitions Whether to enforce the number of output partitions.
*/
public NonSortPartitioner(boolean enforceNumOutputPartitions) {
this.enforceNumOutputPartitions = enforceNumOutputPartitions;
}

@Override
public JavaRDD<HoodieRecord<T>> repartitionRecords(JavaRDD<HoodieRecord<T>> records,
int outputSparkPartitions) {
return records.coalesce(outputSparkPartitions);
if (enforceNumOutputPartitions) {
return records.coalesce(outputSparkPartitions);
}
return records;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,41 @@
import org.apache.spark.sql.Row;

/**
* A built-in partitioner that only does coalesce for input Rows for bulk insert operation,
* corresponding to the {@code BulkInsertSortMode.NONE} mode.
*
* A built-in partitioner that avoids expensive sorting for the input Rows for bulk insert
* operation, by doing either of the following:
* <p>
* - If enforcing the outputSparkPartitions, only does coalesce for input Rows;
* <p>
* - Otherwise, returns input Rows as is.
* <p>
* Corresponds to the {@code BulkInsertSortMode.NONE} mode.
*/
public class NonSortPartitionerWithRows implements BulkInsertPartitioner<Dataset<Row>> {

private final boolean enforceNumOutputPartitions;

/**
* Default constructor without enforcing the number of output partitions.
*/
public NonSortPartitionerWithRows() {
this(false);
}

/**
* Constructor with `enforceNumOutputPartitions` config.
*
* @param enforceNumOutputPartitions Whether to enforce the number of output partitions.
*/
public NonSortPartitionerWithRows(boolean enforceNumOutputPartitions) {
this.enforceNumOutputPartitions = enforceNumOutputPartitions;
}

@Override
public Dataset<Row> repartitionRecords(Dataset<Row> rows, int outputSparkPartitions) {
return rows.coalesce(outputSparkPartitions);
if (enforceNumOutputPartitions) {
return rows.coalesce(outputSparkPartitions);
}
return rows;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,14 +78,21 @@ private static JavaRDD<HoodieRecord> generateTripleTestRecordsForBulkInsert(Java
}

private static Stream<Arguments> configParams() {
// Parameters:
// BulkInsertSortMode sortMode,
// boolean isTablePartitioned,
// boolean enforceNumOutputPartitions,
// boolean isGloballySorted,
// boolean isLocallySorted
Object[][] data = new Object[][] {
{BulkInsertSortMode.GLOBAL_SORT, true, true, true},
{BulkInsertSortMode.PARTITION_SORT, true, false, true},
{BulkInsertSortMode.PARTITION_PATH_REPARTITION, true, false, false},
{BulkInsertSortMode.PARTITION_PATH_REPARTITION, false, false, false},
{BulkInsertSortMode.PARTITION_PATH_REPARTITION_AND_SORT, true, false, false},
{BulkInsertSortMode.PARTITION_PATH_REPARTITION_AND_SORT, false, false, false},
{BulkInsertSortMode.NONE, true, false, false}
{BulkInsertSortMode.GLOBAL_SORT, true, true, true, true},
{BulkInsertSortMode.PARTITION_SORT, true, true, false, true},
{BulkInsertSortMode.PARTITION_PATH_REPARTITION, true, true, false, false},
{BulkInsertSortMode.PARTITION_PATH_REPARTITION, false, true, false, false},
{BulkInsertSortMode.PARTITION_PATH_REPARTITION_AND_SORT, true, true, false, false},
{BulkInsertSortMode.PARTITION_PATH_REPARTITION_AND_SORT, false, true, false, false},
{BulkInsertSortMode.NONE, true, true, false, false},
{BulkInsertSortMode.NONE, true, false, false, false}
};
return Stream.of(data).map(Arguments::of);
}
Expand All @@ -99,20 +106,33 @@ private void verifyRecordAscendingOrder(List<HoodieRecord<? extends HoodieRecord

private void testBulkInsertInternalPartitioner(BulkInsertPartitioner partitioner,
JavaRDD<HoodieRecord> records,
boolean isGloballySorted, boolean isLocallySorted,
boolean enforceNumOutputPartitions,
boolean isGloballySorted,
boolean isLocallySorted,
Map<String, Long> expectedPartitionNumRecords) {
testBulkInsertInternalPartitioner(partitioner, records, isGloballySorted, isLocallySorted, expectedPartitionNumRecords, Option.empty());
testBulkInsertInternalPartitioner(
partitioner,
records,
enforceNumOutputPartitions,
isGloballySorted,
isLocallySorted,
expectedPartitionNumRecords,
Option.empty());
}

private void testBulkInsertInternalPartitioner(BulkInsertPartitioner partitioner,
JavaRDD<HoodieRecord> records,
boolean isGloballySorted, boolean isLocallySorted,
boolean enforceNumOutputPartitions,
boolean isGloballySorted,
boolean isLocallySorted,
Map<String, Long> expectedPartitionNumRecords,
Option<Comparator<HoodieRecord<? extends HoodieRecordPayload>>> comparator) {
int numPartitions = 2;
JavaRDD<HoodieRecord<? extends HoodieRecordPayload>> actualRecords =
(JavaRDD<HoodieRecord<? extends HoodieRecordPayload>>) partitioner.repartitionRecords(records, numPartitions);
assertEquals(numPartitions, actualRecords.getNumPartitions());
assertEquals(
enforceNumOutputPartitions ? numPartitions : records.getNumPartitions(),
actualRecords.getNumPartitions());
List<HoodieRecord<? extends HoodieRecordPayload>> collectedActualRecords = actualRecords.collect();
if (isGloballySorted) {
// Verify global order
Expand All @@ -137,18 +157,31 @@ private void testBulkInsertInternalPartitioner(BulkInsertPartitioner partitioner
assertEquals(expectedPartitionNumRecords, actualPartitionNumRecords);
}

@ParameterizedTest(name = "[{index}] {0} isTablePartitioned={1}")
@ParameterizedTest(name = "[{index}] {0} isTablePartitioned={1} enforceNumOutputPartitions={2}")
@MethodSource("configParams")
public void testBulkInsertInternalPartitioner(BulkInsertSortMode sortMode,
boolean isTablePartitioned,
boolean enforceNumOutputPartitions,
boolean isGloballySorted,
boolean isLocallySorted) {
JavaRDD<HoodieRecord> records1 = generateTestRecordsForBulkInsert(jsc);
JavaRDD<HoodieRecord> records2 = generateTripleTestRecordsForBulkInsert(jsc);
testBulkInsertInternalPartitioner(BulkInsertInternalPartitionerFactory.get(sortMode, isTablePartitioned),
records1, isGloballySorted, isLocallySorted, generateExpectedPartitionNumRecords(records1));
testBulkInsertInternalPartitioner(BulkInsertInternalPartitionerFactory.get(sortMode, isTablePartitioned),
records2, isGloballySorted, isLocallySorted, generateExpectedPartitionNumRecords(records2));
testBulkInsertInternalPartitioner(
BulkInsertInternalPartitionerFactory.get(
sortMode, isTablePartitioned, enforceNumOutputPartitions),
records1,
enforceNumOutputPartitions,
isGloballySorted,
isLocallySorted,
generateExpectedPartitionNumRecords(records1));
testBulkInsertInternalPartitioner(
BulkInsertInternalPartitionerFactory.get(
sortMode, isTablePartitioned, enforceNumOutputPartitions),
records2,
enforceNumOutputPartitions,
isGloballySorted,
isLocallySorted,
generateExpectedPartitionNumRecords(records2));
}

@Test
Expand All @@ -160,9 +193,9 @@ public void testCustomColumnSortPartitioner() {
JavaRDD<HoodieRecord> records1 = generateTestRecordsForBulkInsert(jsc);
JavaRDD<HoodieRecord> records2 = generateTripleTestRecordsForBulkInsert(jsc);
testBulkInsertInternalPartitioner(new RDDCustomColumnsSortPartitioner(sortColumns, HoodieTestDataGenerator.AVRO_SCHEMA, false),
records1, true, true, generateExpectedPartitionNumRecords(records1), Option.of(columnComparator));
records1, true, true, true, generateExpectedPartitionNumRecords(records1), Option.of(columnComparator));
testBulkInsertInternalPartitioner(new RDDCustomColumnsSortPartitioner(sortColumns, HoodieTestDataGenerator.AVRO_SCHEMA, false),
records2, true, true, generateExpectedPartitionNumRecords(records2), Option.of(columnComparator));
records2, true, true, true, generateExpectedPartitionNumRecords(records2), Option.of(columnComparator));

HoodieWriteConfig config = HoodieWriteConfig
.newBuilder()
Expand All @@ -172,9 +205,9 @@ public void testCustomColumnSortPartitioner() {
.withUserDefinedBulkInsertPartitionerSortColumns(sortColumnString)
.build();
testBulkInsertInternalPartitioner(new RDDCustomColumnsSortPartitioner(config),
records1, true, true, generateExpectedPartitionNumRecords(records1), Option.of(columnComparator));
records1, true, true, true, generateExpectedPartitionNumRecords(records1), Option.of(columnComparator));
testBulkInsertInternalPartitioner(new RDDCustomColumnsSortPartitioner(config),
records2, true, true, generateExpectedPartitionNumRecords(records2), Option.of(columnComparator));
records2, true, true, true, generateExpectedPartitionNumRecords(records2), Option.of(columnComparator));

}

Expand Down
Loading

0 comments on commit 65468d9

Please sign in to comment.