Skip to content

Commit

Permalink
[HUDI-4865] Optimize HoodieAvroUtils#isMetadataField to use O(1) comp…
Browse files Browse the repository at this point in the history
…lexity (apache#6702)
  • Loading branch information
danny0405 authored and fengjian committed Apr 5, 2023
1 parent 1c9b534 commit b7ae1b2
Show file tree
Hide file tree
Showing 6 changed files with 19 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
import java.util.Map;
import java.util.Deque;
import java.util.LinkedList;
import java.util.Set;
import java.util.TimeZone;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -169,12 +170,7 @@ public static GenericRecord jsonBytesToAvro(byte[] bytes, Schema schema) throws
}

public static boolean isMetadataField(String fieldName) {
return HoodieRecord.COMMIT_TIME_METADATA_FIELD.equals(fieldName)
|| HoodieRecord.COMMIT_SEQNO_METADATA_FIELD.equals(fieldName)
|| HoodieRecord.RECORD_KEY_METADATA_FIELD.equals(fieldName)
|| HoodieRecord.PARTITION_PATH_METADATA_FIELD.equals(fieldName)
|| HoodieRecord.FILENAME_METADATA_FIELD.equals(fieldName)
|| HoodieRecord.OPERATION_METADATA_FIELD.equals(fieldName);
return HoodieRecord.HOODIE_META_COLUMNS_WITH_OPERATION.contains(fieldName);
}

public static Schema createHoodieWriteSchema(Schema originalSchema) {
Expand Down Expand Up @@ -245,7 +241,7 @@ public static Schema removeMetadataFields(Schema schema) {
return removeFields(schema, HoodieRecord.HOODIE_META_COLUMNS_WITH_OPERATION);
}

public static Schema removeFields(Schema schema, List<String> fieldsToRemove) {
public static Schema removeFields(Schema schema, Set<String> fieldsToRemove) {
List<Schema.Field> filteredFields = schema.getFields()
.stream()
.filter(field -> !fieldsToRemove.contains(field.name()))
Expand Down Expand Up @@ -422,7 +418,7 @@ public static List<GenericRecord> rewriteRecords(List<GenericRecord> records, Sc
* <p>
* To better understand how it removes please check {@link #rewriteRecord(GenericRecord, Schema)}
*/
public static GenericRecord removeFields(GenericRecord record, List<String> fieldsToRemove) {
public static GenericRecord removeFields(GenericRecord record, Set<String> fieldsToRemove) {
Schema newSchema = removeFields(record.getSchema(), fieldsToRemove);
return rewriteRecord(record, newSchema);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

Expand All @@ -48,8 +49,8 @@ public abstract class HoodieRecord<T> implements Serializable {

// Temporary to support the '_hoodie_operation' field, once we solve
// the compatibility problem, it can be removed.
public static final List<String> HOODIE_META_COLUMNS_WITH_OPERATION =
CollectionUtils.createImmutableList(COMMIT_TIME_METADATA_FIELD, COMMIT_SEQNO_METADATA_FIELD,
public static final Set<String> HOODIE_META_COLUMNS_WITH_OPERATION =
CollectionUtils.createImmutableSet(COMMIT_TIME_METADATA_FIELD, COMMIT_SEQNO_METADATA_FIELD,
RECORD_KEY_METADATA_FIELD, PARTITION_PATH_METADATA_FIELD, FILENAME_METADATA_FIELD,
OPERATION_METADATA_FIELD);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
import java.nio.ByteBuffer;
import java.sql.Date;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -249,7 +249,7 @@ public void testRemoveFields() {
rec.put("non_pii_col", "val1");
rec.put("pii_col", "val2");
rec.put("timestamp", 3.5);
GenericRecord rec1 = HoodieAvroUtils.removeFields(rec, Arrays.asList("pii_col"));
GenericRecord rec1 = HoodieAvroUtils.removeFields(rec, Collections.singleton("pii_col"));
assertEquals("key1", rec1.get("_row_key"));
assertEquals("val1", rec1.get("non_pii_col"));
assertEquals(3.5, rec1.get("timestamp"));
Expand All @@ -262,7 +262,7 @@ public void testRemoveFields() {
+ "{\"name\": \"non_pii_col\", \"type\": \"string\"},"
+ "{\"name\": \"pii_col\", \"type\": \"string\"}]},";
expectedSchema = new Schema.Parser().parse(schemaStr);
rec1 = HoodieAvroUtils.removeFields(rec, Arrays.asList(""));
rec1 = HoodieAvroUtils.removeFields(rec, Collections.singleton(""));
assertEquals(expectedSchema, rec1.getSchema());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hudi.table.catalog;

import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
Expand Down Expand Up @@ -60,7 +61,7 @@ public static List<String> getFieldNames(List<FieldSchema> fieldSchemas) {
public static org.apache.flink.table.api.Schema convertTableSchema(Table hiveTable) {
List<FieldSchema> allCols = hiveTable.getSd().getCols().stream()
// filter out the metadata columns
.filter(s -> !HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS.containsKey(s.getName()))
.filter(s -> !HoodieAvroUtils.isMetadataField(s.getName()))
.collect(Collectors.toList());
// need to refactor the partition key field positions: they are not always in the last
allCols.addAll(hiveTable.getPartitionKeys());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -344,9 +344,9 @@ object HoodieSparkSqlWriter {
}

def generateSchemaWithoutPartitionColumns(partitionParam: String, schema: Schema): Schema = {
val fieldsToRemove = new java.util.ArrayList[String]()
val fieldsToRemove = new java.util.HashSet[String]()
partitionParam.split(",").map(partitionField => partitionField.trim)
.filter(s => !s.isEmpty).map(field => fieldsToRemove.add(field))
.filter(s => s.nonEmpty).map(field => fieldsToRemove.add(field))
HoodieAvroUtils.removeFields(schema, fieldsToRemove)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,7 @@ private Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> fetchFromSourc
}

boolean shouldCombine = cfg.filterDupes || cfg.operation.equals(WriteOperationType.UPSERT);
List<String> partitionColumns = getPartitionColumns(keyGenerator, props);
Set<String> partitionColumns = getPartitionColumns(keyGenerator, props);
JavaRDD<GenericRecord> avroRDD = avroRDDOptional.get();
JavaRDD<HoodieRecord> records = avroRDD.map(record -> {
GenericRecord gr = isDropPartitionColumns() ? HoodieAvroUtils.removeFields(record, partitionColumns) : record;
Expand Down Expand Up @@ -952,14 +952,14 @@ private Boolean isDropPartitionColumns() {
}

/**
* Get the list of partition columns as a list of strings.
* Get the partition columns as a set of strings.
*
* @param keyGenerator KeyGenerator
* @param props TypedProperties
* @return List of partition columns.
* @return Set of partition columns.
*/
private List<String> getPartitionColumns(KeyGenerator keyGenerator, TypedProperties props) {
private Set<String> getPartitionColumns(KeyGenerator keyGenerator, TypedProperties props) {
String partitionColumns = SparkKeyGenUtils.getPartitionColumns(keyGenerator, props);
return Arrays.asList(partitionColumns.split(","));
return Arrays.stream(partitionColumns.split(",")).collect(Collectors.toSet());
}
}

0 comments on commit b7ae1b2

Please sign in to comment.