Skip to content

Commit

Permalink
[HUDI-5342] Add new bulk insert sort modes repartitioning data by par…
Browse files Browse the repository at this point in the history
…tition path (apache#7402)

This PR adds two new bulk insert sort modes, PARTITION_PATH_REPARTITION and PARTITION_PATH_REPARTITION_AND_SORT, which does the following

For a physically partitioned table, repartition the input records based on the partition path, limiting the shuffle parallelism to specified outputSparkPartitions. For PARTITION_PATH_REPARTITION_AND_SORT, an additional step of sorting the records based on the partition path within each Spark partition is done.
For a physically non-partitioned table, simply does coalesce for the input rows with outputSparkPartitions.
New unit tests are added to verify the added functionality.
  • Loading branch information
yihua authored and Alexey Kudinkin committed Dec 14, 2022
1 parent 8c98c76 commit 533ceb4
Show file tree
Hide file tree
Showing 17 changed files with 444 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,17 @@ public class HoodieWriteConfig extends HoodieConfig {
+ "GLOBAL_SORT: this ensures best file sizes, with lowest memory overhead at cost of sorting. "
+ "PARTITION_SORT: Strikes a balance by only sorting within a partition, still keeping the memory overhead of writing "
+ "lowest and best effort file sizing. "
+ "PARTITION_PATH_REPARTITION: this ensures that the data for a single physical partition in the table is written "
+ "by the same Spark executor, best for input data evenly distributed across different partition paths. "
+ "This can cause imbalance among Spark executors if the input data is skewed, i.e., most records are intended for "
+ "a handful of partition paths among all. "
+ "PARTITION_PATH_REPARTITION_AND_SORT: this ensures that the data for a single physical partition in the table is written "
+ "by the same Spark executor, best for input data evenly distributed across different partition paths. "
+ "Compared to PARTITION_PATH_REPARTITION, this sort mode does an additional step of sorting the records "
+ "based on the partition path within a single Spark partition, given that data for multiple physical partitions "
+ "can be sent to the same Spark partition and executor. "
+ "This can cause imbalance among Spark executors if the input data is skewed, i.e., most records are intended for "
+ "a handful of partition paths among all. "
+ "NONE: No sorting. Fastest and matches `spark.write.parquet()` in terms of number of files, overheads");

public static final ConfigProperty<String> EMBEDDED_TIMELINE_SERVER_ENABLE = ConfigProperty
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
* Bulk insert sort mode.
*/
public enum BulkInsertSortMode {
NONE,
GLOBAL_SORT,
PARTITION_SORT
NONE,
GLOBAL_SORT,
PARTITION_SORT,
PARTITION_PATH_REPARTITION,
PARTITION_PATH_REPARTITION_AND_SORT
}
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,13 @@ public HoodieTableMetaClient getMetaClient() {
return metaClient;
}

/**
* @return if the table is physically partitioned, based on the partition fields stored in the table config.
*/
public boolean isPartitioned() {
return getMetaClient().getTableConfig().isTablePartitioned();
}

public Configuration getHadoopConf() {
return metaClient.getHadoopConf();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ private <I> BulkInsertPartitioner<I> getPartitioner(Map<String, String> strategy
Option<String[]> orderByColumnsOpt =
Option.ofNullable(strategyParams.get(PLAN_STRATEGY_SORT_COLUMNS.key()))
.map(listStr -> listStr.split(","));

return orderByColumnsOpt.map(orderByColumns -> {
HoodieClusteringConfig.LayoutOptimizationStrategy layoutOptStrategy = getWriteConfig().getLayoutOptimizationStrategy();
switch (layoutOptStrategy) {
Expand All @@ -203,17 +203,20 @@ private <I> BulkInsertPartitioner<I> getPartitioner(Map<String, String> strategy
return isRowPartitioner
? new RowSpatialCurveSortPartitioner(getWriteConfig())
: new RDDSpatialCurveSortPartitioner((HoodieSparkEngineContext) getEngineContext(), orderByColumns, layoutOptStrategy,
getWriteConfig().getLayoutOptimizationCurveBuildMethod(), HoodieAvroUtils.addMetadataFields(schema));
getWriteConfig().getLayoutOptimizationCurveBuildMethod(), HoodieAvroUtils.addMetadataFields(schema));
case LINEAR:
return isRowPartitioner
? new RowCustomColumnsSortPartitioner(orderByColumns)
: new RDDCustomColumnsSortPartitioner(orderByColumns, HoodieAvroUtils.addMetadataFields(schema),
getWriteConfig().isConsistentLogicalTimestampEnabled());
getWriteConfig().isConsistentLogicalTimestampEnabled());
default:
throw new UnsupportedOperationException(String.format("Layout optimization strategy '%s' is not supported", layoutOptStrategy));
}
}).orElse(isRowPartitioner ? BulkInsertInternalPartitionerWithRowsFactory.get(getWriteConfig().getBulkInsertSortMode()) :
BulkInsertInternalPartitionerFactory.get(getWriteConfig().getBulkInsertSortMode()));
}).orElse(isRowPartitioner
? BulkInsertInternalPartitionerWithRowsFactory.get(
getWriteConfig().getBulkInsertSortMode(), getHoodieTable().isPartitioned())
: BulkInsertInternalPartitionerFactory.get(
getWriteConfig().getBulkInsertSortMode(), getHoodieTable().isPartitioned()));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,33 @@

package org.apache.hudi.execution.bulkinsert;

import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.hudi.table.HoodieTable;

/**
* A factory to generate built-in partitioner to repartition input records into at least
* expected number of output spark partitions for bulk insert operation.
*/
public abstract class BulkInsertInternalPartitionerFactory {

public static BulkInsertPartitioner get(BulkInsertSortMode sortMode) {
public static BulkInsertPartitioner get(HoodieTable table, HoodieWriteConfig config) {
return get(config.getBulkInsertSortMode(), table.isPartitioned());
}

public static BulkInsertPartitioner get(BulkInsertSortMode sortMode, boolean isTablePartitioned) {
switch (sortMode) {
case NONE:
return new NonSortPartitioner();
case GLOBAL_SORT:
return new GlobalSortPartitioner();
case PARTITION_SORT:
return new RDDPartitionSortPartitioner();
case PARTITION_PATH_REPARTITION:
return new PartitionPathRepartitionPartitioner(isTablePartitioned);
case PARTITION_PATH_REPARTITION_AND_SORT:
return new PartitionPathRepartitionAndSortPartitioner(isTablePartitioned);
default:
throw new HoodieException("The bulk insert sort mode \"" + sortMode.name() + "\" is not supported.");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,19 @@
*/
public abstract class BulkInsertInternalPartitionerWithRowsFactory {

public static BulkInsertPartitioner<Dataset<Row>> get(BulkInsertSortMode sortMode) {
public static BulkInsertPartitioner<Dataset<Row>> get(BulkInsertSortMode sortMode,
boolean isTablePartitioned) {
switch (sortMode) {
case NONE:
return new NonSortPartitionerWithRows();
case GLOBAL_SORT:
return new GlobalSortPartitionerWithRows();
case PARTITION_SORT:
return new PartitionSortPartitionerWithRows();
case PARTITION_PATH_REPARTITION:
return new PartitionPathRepartitionPartitionerWithRows(isTablePartitioned);
case PARTITION_PATH_REPARTITION_AND_SORT:
return new PartitionPathRepartitionAndSortPartitionerWithRows(isTablePartitioned);
default:
throw new UnsupportedOperationException("The bulk insert sort mode \"" + sortMode.name() + "\" is not supported.");
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.hudi.execution.bulkinsert;

import org.apache.hudi.common.function.SerializableFunctionUnchecked;

import org.apache.spark.Partitioner;

import java.io.Serializable;
import java.util.Objects;

/**
* A Spark RDD partitioner implementation that determines the Spark partition
* based on the table partition path, generating targeted number of Spark partitions.
*/
public class PartitionPathRDDPartitioner extends Partitioner implements Serializable {
private final SerializableFunctionUnchecked<Object, String> partitionPathExtractor;
private final int numPartitions;

PartitionPathRDDPartitioner(SerializableFunctionUnchecked<Object, String> partitionPathExtractor, int numPartitions) {
this.partitionPathExtractor = partitionPathExtractor;
this.numPartitions = numPartitions;
}

@Override
public int numPartitions() {
return numPartitions;
}

@SuppressWarnings("unchecked")
@Override
public int getPartition(Object o) {
return Math.abs(Objects.hash(partitionPathExtractor.apply(o))) % numPartitions;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.hudi.execution.bulkinsert;

import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.table.BulkInsertPartitioner;

import org.apache.spark.api.java.JavaRDD;

import scala.Tuple2;

/**
* A built-in partitioner that does the following for input records for bulk insert operation
* <p>
* - For physically partitioned table, repartition the input records based on the partition path,
* and sort records within Spark partitions, limiting the shuffle parallelism to specified
* `outputSparkPartitions`
* <p>
* - For physically non-partitioned table, simply does coalesce for the input records with
* `outputSparkPartitions`
* <p>
* Corresponding to the {@code BulkInsertSortMode.PARTITION_PATH_REPARTITION_AND_SORT} mode.
*
* @param <T> HoodieRecordPayload type
*/
public class PartitionPathRepartitionAndSortPartitioner<T extends HoodieRecordPayload>
implements BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>> {

private final boolean isTablePartitioned;

public PartitionPathRepartitionAndSortPartitioner(boolean isTablePartitioned) {
this.isTablePartitioned = isTablePartitioned;
}

@Override
public JavaRDD<HoodieRecord<T>> repartitionRecords(JavaRDD<HoodieRecord<T>> records,
int outputSparkPartitions) {
if (isTablePartitioned) {
PartitionPathRDDPartitioner partitioner = new PartitionPathRDDPartitioner(
(partitionPath) -> (String) partitionPath, outputSparkPartitions);
return records
.mapToPair(record -> new Tuple2<>(record.getPartitionPath(), record))
.repartitionAndSortWithinPartitions(partitioner)
.values();
}
return records.coalesce(outputSparkPartitions);
}

@Override
public boolean arePartitionRecordsSorted() {
return isTablePartitioned;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.hudi.execution.bulkinsert;

import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.table.BulkInsertPartitioner;

import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

/**
* A built-in partitioner that does the following for input rows for bulk insert operation
* <p>
* - For physically partitioned table, repartition the input rows based on the partition path,
* and sort rows within Spark partitions, limiting the shuffle parallelism to specified
* `outputSparkPartitions`
* <p>
* - For physically non-partitioned table, simply does coalesce for the input rows with
* `outputSparkPartitions`
* <p>
* Corresponding to the {@code BulkInsertSortMode.PARTITION_PATH_REPARTITION_AND_SORT} mode.
*/
public class PartitionPathRepartitionAndSortPartitionerWithRows implements BulkInsertPartitioner<Dataset<Row>> {

private final boolean isTablePartitioned;

public PartitionPathRepartitionAndSortPartitionerWithRows(boolean isTablePartitioned) {
this.isTablePartitioned = isTablePartitioned;
}

@Override
public Dataset<Row> repartitionRecords(Dataset<Row> rows, int outputSparkPartitions) {
if (isTablePartitioned) {
return rows.repartition(outputSparkPartitions, new Column(HoodieRecord.PARTITION_PATH_METADATA_FIELD))
.sortWithinPartitions(new Column(HoodieRecord.PARTITION_PATH_METADATA_FIELD));
}
return rows.coalesce(outputSparkPartitions);
}

@Override
public boolean arePartitionRecordsSorted() {
return isTablePartitioned;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.hudi.execution.bulkinsert;

import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.table.BulkInsertPartitioner;

import org.apache.spark.api.java.JavaRDD;

import scala.Tuple2;

/**
* A built-in partitioner that does the following for input records for bulk insert operation
* <p>
* - For physically partitioned table, repartition the input records based on the partition path,
* limiting the shuffle parallelism to specified `outputSparkPartitions`
* <p>
* - For physically non-partitioned table, simply does coalesce for the input records with
* `outputSparkPartitions`
* <p>
* Corresponding to the {@code BulkInsertSortMode.PARTITION_PATH_REPARTITION} mode.
*
* @param <T> HoodieRecordPayload type
*/
public class PartitionPathRepartitionPartitioner<T extends HoodieRecordPayload>
implements BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>> {

private final boolean isTablePartitioned;

public PartitionPathRepartitionPartitioner(boolean isTablePartitioned) {
this.isTablePartitioned = isTablePartitioned;
}

@Override
public JavaRDD<HoodieRecord<T>> repartitionRecords(JavaRDD<HoodieRecord<T>> records,
int outputSparkPartitions) {
if (isTablePartitioned) {
PartitionPathRDDPartitioner partitioner = new PartitionPathRDDPartitioner(
(partitionPath) -> (String) partitionPath, outputSparkPartitions);
return records
.mapToPair(record -> new Tuple2<>(record.getPartitionPath(), record))
.partitionBy(partitioner)
.values();
}
return records.coalesce(outputSparkPartitions);
}

@Override
public boolean arePartitionRecordsSorted() {
return false;
}
}
Loading

0 comments on commit 533ceb4

Please sign in to comment.