Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-5338] Adjust coalesce behavior within NONE sort mode for bulk insert #7396

Merged
merged 8 commits into from
Dec 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,28 @@

import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.execution.bulkinsert.BulkInsertSortMode;
import org.apache.hudi.io.WriteHandleFactory;

import java.io.Serializable;

/**
* Repartition input records into at least expected number of output partitions. It should give below guarantees -
* Output partition will have records from only one hoodie partition. - Average records per output
* partitions should be almost equal to (#inputRecords / #outputPartitions) to avoid possible skews.
* Partitions the input records for bulk insert operation.
* <p>
* The actual implementation of {@link BulkInsertPartitioner} is determined by the bulk insert
* sort mode, {@link BulkInsertSortMode}, specified by
* {@code HoodieWriteConfig.BULK_INSERT_SORT_MODE} (`hoodie.bulkinsert.sort.mode`).
*/
public interface BulkInsertPartitioner<I> extends Serializable {

/**
* Repartitions the input records into at least expected number of output partitions.
* Partitions the input records based on the number of output partitions as a hint.
* <p>
* Note that, the number of output partitions may or may not be enforced, depending on the
* specific implementation.
*
* @param records Input Hoodie records
* @param outputPartitions Expected number of output partitions
* @param records Input Hoodie records.
* @param outputPartitions Expected number of output partitions as a hint.
* @return
*/
I repartitionRecords(I records, int outputPartitions);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,9 +207,9 @@ private <I> BulkInsertPartitioner<I> getPartitioner(Map<String, String> strategy
}
}).orElse(isRowPartitioner
? BulkInsertInternalPartitionerWithRowsFactory.get(
getWriteConfig().getBulkInsertSortMode(), getHoodieTable().isPartitioned())
getWriteConfig().getBulkInsertSortMode(), getHoodieTable().isPartitioned(), true)
: BulkInsertInternalPartitionerFactory.get(
getWriteConfig().getBulkInsertSortMode(), getHoodieTable().isPartitioned()));
getWriteConfig().getBulkInsertSortMode(), getHoodieTable().isPartitioned(), true));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,28 @@
public abstract class BulkInsertInternalPartitionerFactory {

public static BulkInsertPartitioner get(HoodieTable table, HoodieWriteConfig config) {
return get(table, config, false);
}

public static BulkInsertPartitioner get(
HoodieTable table, HoodieWriteConfig config, boolean enforceNumOutputPartitions) {
if (config.getIndexType().equals(HoodieIndex.IndexType.BUCKET)
&& config.getBucketIndexEngineType().equals(HoodieIndex.BucketIndexEngineType.CONSISTENT_HASHING)) {
return new RDDConsistentBucketPartitioner(table);
}
return get(config.getBulkInsertSortMode(), table.isPartitioned());
return get(config.getBulkInsertSortMode(), table.isPartitioned(), enforceNumOutputPartitions);
}

public static BulkInsertPartitioner get(BulkInsertSortMode sortMode, boolean isTablePartitioned) {
return get(sortMode, isTablePartitioned, false);
}

public static BulkInsertPartitioner get(BulkInsertSortMode sortMode,
boolean isTablePartitioned,
boolean enforceNumOutputPartitions) {
switch (sortMode) {
case NONE:
return new NonSortPartitioner();
return new NonSortPartitioner(enforceNumOutputPartitions);
case GLOBAL_SORT:
return new GlobalSortPartitioner();
case PARTITION_SORT:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,14 @@ public abstract class BulkInsertInternalPartitionerWithRowsFactory {

public static BulkInsertPartitioner<Dataset<Row>> get(BulkInsertSortMode sortMode,
boolean isTablePartitioned) {
return get(sortMode, isTablePartitioned, false);
}

public static BulkInsertPartitioner<Dataset<Row>> get(
BulkInsertSortMode sortMode, boolean isTablePartitioned, boolean enforceNumOutputPartitions) {
switch (sortMode) {
case NONE:
return new NonSortPartitionerWithRows();
return new NonSortPartitionerWithRows(enforceNumOutputPartitions);
case GLOBAL_SORT:
return new GlobalSortPartitionerWithRows();
case PARTITION_SORT:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,45 @@
import org.apache.spark.api.java.JavaRDD;

/**
* A built-in partitioner that only does coalesce for input records for bulk insert operation,
* corresponding to the {@code BulkInsertSortMode.NONE} mode.
* A built-in partitioner that avoids expensive sorting for the input records for bulk insert
* operation, by doing either of the following:
* <p>
* - If enforcing the outputSparkPartitions, only does coalesce for input records;
* <p>
* - Otherwise, returns input records as is.
* <p>
* Corresponds to the {@code BulkInsertSortMode.NONE} mode.
*
* @param <T> HoodieRecordPayload type
*/
public class NonSortPartitioner<T extends HoodieRecordPayload>
implements BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>> {

private final boolean enforceNumOutputPartitions;

/**
* Default constructor without enforcing the number of output partitions.
*/
public NonSortPartitioner() {
this(false);
}

/**
* Constructor with `enforceNumOutputPartitions` config.
*
* @param enforceNumOutputPartitions Whether to enforce the number of output partitions.
*/
public NonSortPartitioner(boolean enforceNumOutputPartitions) {
this.enforceNumOutputPartitions = enforceNumOutputPartitions;
}

@Override
public JavaRDD<HoodieRecord<T>> repartitionRecords(JavaRDD<HoodieRecord<T>> records,
int outputSparkPartitions) {
yihua marked this conversation as resolved.
Show resolved Hide resolved
return records.coalesce(outputSparkPartitions);
if (enforceNumOutputPartitions) {
return records.coalesce(outputSparkPartitions);
}
return records;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,41 @@
import org.apache.spark.sql.Row;

/**
* A built-in partitioner that only does coalesce for input Rows for bulk insert operation,
* corresponding to the {@code BulkInsertSortMode.NONE} mode.
*
* A built-in partitioner that avoids expensive sorting for the input Rows for bulk insert
* operation, by doing either of the following:
* <p>
* - If enforcing the outputSparkPartitions, only does coalesce for input Rows;
* <p>
* - Otherwise, returns input Rows as is.
* <p>
* Corresponds to the {@code BulkInsertSortMode.NONE} mode.
*/
public class NonSortPartitionerWithRows implements BulkInsertPartitioner<Dataset<Row>> {

private final boolean enforceNumOutputPartitions;

/**
* Default constructor without enforcing the number of output partitions.
*/
public NonSortPartitionerWithRows() {
this(false);
}

/**
* Constructor with `enforceNumOutputPartitions` config.
*
* @param enforceNumOutputPartitions Whether to enforce the number of output partitions.
*/
public NonSortPartitionerWithRows(boolean enforceNumOutputPartitions) {
this.enforceNumOutputPartitions = enforceNumOutputPartitions;
}

@Override
public Dataset<Row> repartitionRecords(Dataset<Row> rows, int outputSparkPartitions) {
return rows.coalesce(outputSparkPartitions);
if (enforceNumOutputPartitions) {
return rows.coalesce(outputSparkPartitions);
}
return rows;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,14 +78,21 @@ private static JavaRDD<HoodieRecord> generateTripleTestRecordsForBulkInsert(Java
}

private static Stream<Arguments> configParams() {
// Parameters:
// BulkInsertSortMode sortMode,
// boolean isTablePartitioned,
// boolean enforceNumOutputPartitions,
// boolean isGloballySorted,
// boolean isLocallySorted
Object[][] data = new Object[][] {
{BulkInsertSortMode.GLOBAL_SORT, true, true, true},
{BulkInsertSortMode.PARTITION_SORT, true, false, true},
{BulkInsertSortMode.PARTITION_PATH_REPARTITION, true, false, false},
{BulkInsertSortMode.PARTITION_PATH_REPARTITION, false, false, false},
{BulkInsertSortMode.PARTITION_PATH_REPARTITION_AND_SORT, true, false, false},
{BulkInsertSortMode.PARTITION_PATH_REPARTITION_AND_SORT, false, false, false},
{BulkInsertSortMode.NONE, true, false, false}
{BulkInsertSortMode.GLOBAL_SORT, true, true, true, true},
{BulkInsertSortMode.PARTITION_SORT, true, true, false, true},
{BulkInsertSortMode.PARTITION_PATH_REPARTITION, true, true, false, false},
{BulkInsertSortMode.PARTITION_PATH_REPARTITION, false, true, false, false},
{BulkInsertSortMode.PARTITION_PATH_REPARTITION_AND_SORT, true, true, false, false},
{BulkInsertSortMode.PARTITION_PATH_REPARTITION_AND_SORT, false, true, false, false},
{BulkInsertSortMode.NONE, true, true, false, false},
{BulkInsertSortMode.NONE, true, false, false, false}
};
return Stream.of(data).map(Arguments::of);
}
Expand All @@ -99,20 +106,33 @@ private void verifyRecordAscendingOrder(List<HoodieRecord<? extends HoodieRecord

private void testBulkInsertInternalPartitioner(BulkInsertPartitioner partitioner,
JavaRDD<HoodieRecord> records,
boolean isGloballySorted, boolean isLocallySorted,
boolean enforceNumOutputPartitions,
boolean isGloballySorted,
boolean isLocallySorted,
Map<String, Long> expectedPartitionNumRecords) {
testBulkInsertInternalPartitioner(partitioner, records, isGloballySorted, isLocallySorted, expectedPartitionNumRecords, Option.empty());
testBulkInsertInternalPartitioner(
partitioner,
records,
enforceNumOutputPartitions,
isGloballySorted,
isLocallySorted,
expectedPartitionNumRecords,
Option.empty());
}

private void testBulkInsertInternalPartitioner(BulkInsertPartitioner partitioner,
JavaRDD<HoodieRecord> records,
boolean isGloballySorted, boolean isLocallySorted,
boolean enforceNumOutputPartitions,
boolean isGloballySorted,
boolean isLocallySorted,
Map<String, Long> expectedPartitionNumRecords,
Option<Comparator<HoodieRecord<? extends HoodieRecordPayload>>> comparator) {
int numPartitions = 2;
JavaRDD<HoodieRecord<? extends HoodieRecordPayload>> actualRecords =
(JavaRDD<HoodieRecord<? extends HoodieRecordPayload>>) partitioner.repartitionRecords(records, numPartitions);
assertEquals(numPartitions, actualRecords.getNumPartitions());
assertEquals(
enforceNumOutputPartitions ? numPartitions : records.getNumPartitions(),
actualRecords.getNumPartitions());
List<HoodieRecord<? extends HoodieRecordPayload>> collectedActualRecords = actualRecords.collect();
if (isGloballySorted) {
// Verify global order
Expand All @@ -137,18 +157,31 @@ private void testBulkInsertInternalPartitioner(BulkInsertPartitioner partitioner
assertEquals(expectedPartitionNumRecords, actualPartitionNumRecords);
}

@ParameterizedTest(name = "[{index}] {0} isTablePartitioned={1}")
@ParameterizedTest(name = "[{index}] {0} isTablePartitioned={1} enforceNumOutputPartitions={2}")
@MethodSource("configParams")
public void testBulkInsertInternalPartitioner(BulkInsertSortMode sortMode,
boolean isTablePartitioned,
boolean enforceNumOutputPartitions,
boolean isGloballySorted,
boolean isLocallySorted) {
JavaRDD<HoodieRecord> records1 = generateTestRecordsForBulkInsert(jsc);
JavaRDD<HoodieRecord> records2 = generateTripleTestRecordsForBulkInsert(jsc);
testBulkInsertInternalPartitioner(BulkInsertInternalPartitionerFactory.get(sortMode, isTablePartitioned),
records1, isGloballySorted, isLocallySorted, generateExpectedPartitionNumRecords(records1));
testBulkInsertInternalPartitioner(BulkInsertInternalPartitionerFactory.get(sortMode, isTablePartitioned),
records2, isGloballySorted, isLocallySorted, generateExpectedPartitionNumRecords(records2));
testBulkInsertInternalPartitioner(
BulkInsertInternalPartitionerFactory.get(
sortMode, isTablePartitioned, enforceNumOutputPartitions),
records1,
enforceNumOutputPartitions,
isGloballySorted,
isLocallySorted,
generateExpectedPartitionNumRecords(records1));
testBulkInsertInternalPartitioner(
BulkInsertInternalPartitionerFactory.get(
sortMode, isTablePartitioned, enforceNumOutputPartitions),
records2,
enforceNumOutputPartitions,
isGloballySorted,
isLocallySorted,
generateExpectedPartitionNumRecords(records2));
}

@Test
Expand All @@ -160,9 +193,9 @@ public void testCustomColumnSortPartitioner() {
JavaRDD<HoodieRecord> records1 = generateTestRecordsForBulkInsert(jsc);
JavaRDD<HoodieRecord> records2 = generateTripleTestRecordsForBulkInsert(jsc);
testBulkInsertInternalPartitioner(new RDDCustomColumnsSortPartitioner(sortColumns, HoodieTestDataGenerator.AVRO_SCHEMA, false),
records1, true, true, generateExpectedPartitionNumRecords(records1), Option.of(columnComparator));
records1, true, true, true, generateExpectedPartitionNumRecords(records1), Option.of(columnComparator));
testBulkInsertInternalPartitioner(new RDDCustomColumnsSortPartitioner(sortColumns, HoodieTestDataGenerator.AVRO_SCHEMA, false),
records2, true, true, generateExpectedPartitionNumRecords(records2), Option.of(columnComparator));
records2, true, true, true, generateExpectedPartitionNumRecords(records2), Option.of(columnComparator));

HoodieWriteConfig config = HoodieWriteConfig
.newBuilder()
Expand All @@ -172,9 +205,9 @@ public void testCustomColumnSortPartitioner() {
.withUserDefinedBulkInsertPartitionerSortColumns(sortColumnString)
.build();
testBulkInsertInternalPartitioner(new RDDCustomColumnsSortPartitioner(config),
records1, true, true, generateExpectedPartitionNumRecords(records1), Option.of(columnComparator));
records1, true, true, true, generateExpectedPartitionNumRecords(records1), Option.of(columnComparator));
testBulkInsertInternalPartitioner(new RDDCustomColumnsSortPartitioner(config),
records2, true, true, generateExpectedPartitionNumRecords(records2), Option.of(columnComparator));
records2, true, true, true, generateExpectedPartitionNumRecords(records2), Option.of(columnComparator));

}

Expand Down
Loading