Skip to content

Commit

Permalink
[HUDI-4040] Bulk insert Support CustomColumnsSortPartitioner with Row (
Browse files Browse the repository at this point in the history
…apache#5502)

* Along the lines of RDDCustomColumnsSortPartitioner but for Row
  • Loading branch information
RexAn authored and Clcanny committed May 28, 2022
1 parent e922416 commit c652bac
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import org.apache.avro.Schema;
import org.apache.spark.api.java.JavaRDD;

import java.util.Arrays;

/**
* A partitioner that does sorting based on specified column values for each RDD partition.
*
Expand Down Expand Up @@ -78,6 +80,7 @@ public boolean arePartitionRecordsSorted() {
}

private String[] getSortColumnName(HoodieWriteConfig config) {
return config.getUserDefinedBulkInsertPartitionerSortColumns().split(",");
return Arrays.stream(config.getUserDefinedBulkInsertPartitionerSortColumns().split(","))
.map(String::trim).toArray(String[]::new);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.execution.bulkinsert;

import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

import java.util.Arrays;

/**
* A partitioner that does sorting based on specified column values for each spark partitions.
*/
public class RowCustomColumnsSortPartitioner implements BulkInsertPartitioner<Dataset<Row>> {

private final String[] sortColumnNames;

public RowCustomColumnsSortPartitioner(HoodieWriteConfig config) {
this.sortColumnNames = getSortColumnName(config);
}

public RowCustomColumnsSortPartitioner(String[] columnNames) {
this.sortColumnNames = columnNames;
}

@Override
public Dataset<Row> repartitionRecords(Dataset<Row> records, int outputSparkPartitions) {
final String[] sortColumns = this.sortColumnNames;
return records.coalesce(outputSparkPartitions)
.sortWithinPartitions(HoodieRecord.PARTITION_PATH_METADATA_FIELD, sortColumns);
}

@Override
public boolean arePartitionRecordsSorted() {
return true;
}

private String[] getSortColumnName(HoodieWriteConfig config) {
return Arrays.stream(config.getUserDefinedBulkInsertPartitionerSortColumns().split(","))
.map(String::trim).toArray(String[]::new);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.hudi.testutils.HoodieClientTestHarness;
import org.apache.hudi.testutils.SparkDatasetTestUtils;
Expand All @@ -29,6 +31,7 @@
import org.apache.spark.sql.Row;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
Expand All @@ -48,6 +51,8 @@
*/
public class TestBulkInsertInternalPartitionerForRows extends HoodieClientTestHarness {

private static final Comparator<Row> KEY_COMPARATOR =
Comparator.comparing(o -> (o.getAs(HoodieRecord.PARTITION_PATH_METADATA_FIELD) + "+" + o.getAs(HoodieRecord.RECORD_KEY_METADATA_FIELD)));
@BeforeEach
public void setUp() throws Exception {
initSparkContexts("TestBulkInsertInternalPartitionerForRows");
Expand Down Expand Up @@ -77,29 +82,55 @@ public void testBulkInsertInternalPartitioner(BulkInsertSortMode sortMode,
Dataset<Row> records1 = generateTestRecords();
Dataset<Row> records2 = generateTestRecords();
testBulkInsertInternalPartitioner(BulkInsertInternalPartitionerWithRowsFactory.get(sortMode),
records1, isGloballySorted, isLocallySorted, generateExpectedPartitionNumRecords(records1));
records1, isGloballySorted, isLocallySorted, generateExpectedPartitionNumRecords(records1), Option.empty());
testBulkInsertInternalPartitioner(BulkInsertInternalPartitionerWithRowsFactory.get(sortMode),
records2, isGloballySorted, isLocallySorted, generateExpectedPartitionNumRecords(records2));
records2, isGloballySorted, isLocallySorted, generateExpectedPartitionNumRecords(records2), Option.empty());
}

@Test
public void testCustomColumnSortPartitionerWithRows() {
Dataset<Row> records1 = generateTestRecords();
Dataset<Row> records2 = generateTestRecords();
String sortColumnString = records1.columns()[5];
String[] sortColumns = sortColumnString.split(",");
Comparator<Row> comparator = getCustomColumnComparator(sortColumns);

testBulkInsertInternalPartitioner(new RowCustomColumnsSortPartitioner(sortColumns),
records1, false, true, generateExpectedPartitionNumRecords(records1), Option.of(comparator));
testBulkInsertInternalPartitioner(new RowCustomColumnsSortPartitioner(sortColumns),
records2, false, true, generateExpectedPartitionNumRecords(records2), Option.of(comparator));

HoodieWriteConfig config = HoodieWriteConfig
.newBuilder()
.withPath("/")
.withUserDefinedBulkInsertPartitionerClass(RowCustomColumnsSortPartitioner.class.getName())
.withUserDefinedBulkInsertPartitionerSortColumns(sortColumnString)
.build();
testBulkInsertInternalPartitioner(new RowCustomColumnsSortPartitioner(config),
records1, false, true, generateExpectedPartitionNumRecords(records1), Option.of(comparator));
testBulkInsertInternalPartitioner(new RowCustomColumnsSortPartitioner(config),
records2, false, true, generateExpectedPartitionNumRecords(records2), Option.of(comparator));
}

private void testBulkInsertInternalPartitioner(BulkInsertPartitioner partitioner,
Dataset<Row> rows,
boolean isGloballySorted, boolean isLocallySorted,
Map<String, Long> expectedPartitionNumRecords) {
Map<String, Long> expectedPartitionNumRecords,
Option<Comparator<Row>> comparator) {
int numPartitions = 2;
Dataset<Row> actualRecords = (Dataset<Row>) partitioner.repartitionRecords(rows, numPartitions);
List<Row> collectedActualRecords = actualRecords.collectAsList();
if (isGloballySorted) {
// Verify global order
verifyRowsAscendingOrder(collectedActualRecords);
verifyRowsAscendingOrder(collectedActualRecords, comparator);
} else if (isLocallySorted) {
// Verify local order
actualRecords.mapPartitions((MapPartitionsFunction<Row, Object>) input -> {
List<Row> partitionRows = new ArrayList<>();
while (input.hasNext()) {
partitionRows.add(input.next());
}
verifyRowsAscendingOrder(partitionRows);
verifyRowsAscendingOrder(partitionRows, comparator);
return Collections.emptyList().iterator();
}, SparkDatasetTestUtils.ENCODER);
}
Expand Down Expand Up @@ -130,10 +161,20 @@ public Dataset<Row> generateTestRecords() {
return rowsPart1.union(rowsPart2);
}

private void verifyRowsAscendingOrder(List<Row> records) {
private void verifyRowsAscendingOrder(List<Row> records, Option<Comparator<Row>> comparator) {
List<Row> expectedRecords = new ArrayList<>(records);
Collections.sort(expectedRecords, Comparator.comparing(o -> (o.getAs(HoodieRecord.PARTITION_PATH_METADATA_FIELD) + "+" + o.getAs(HoodieRecord.RECORD_KEY_METADATA_FIELD))));
Collections.sort(expectedRecords,comparator.orElse(KEY_COMPARATOR));
assertEquals(expectedRecords, records);
}

private Comparator<Row> getCustomColumnComparator(String[] sortColumns) {
Comparator<Row> comparator = Comparator.comparing(row -> {
StringBuilder sb = new StringBuilder();
for (String col : sortColumns) {
sb.append(row.getAs(col).toString());
}
return sb.toString();
});
return comparator;
}
}

0 comments on commit c652bac

Please sign in to comment.