Skip to content

Commit

Permalink
[HUDI-4865] Optimize HoodieAvroUtils#isMetadataField to use O(1) comp…
Browse files Browse the repository at this point in the history
…lexity (#6702)
  • Loading branch information
danny0405 authored and yuzhaojing committed Sep 22, 2022
1 parent 9b886d1 commit 6572693
Show file tree
Hide file tree
Showing 7 changed files with 81 additions and 22 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.util

import org.apache.hudi.common.config.TypedProperties
import org.apache.hudi.keygen.constant.KeyGeneratorOptions
import org.apache.hudi.keygen.{BaseKeyGenerator, CustomAvroKeyGenerator, CustomKeyGenerator, KeyGenerator}
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory

import java.util.Properties
import scala.collection.JavaConverters._

object SparkKeyGenUtils {

/**
* @param properties config properties
* @return partition columns
*/
def getPartitionColumns(properties: Properties): String = {
val props = new TypedProperties(properties)
val keyGenerator = HoodieSparkKeyGeneratorFactory.createKeyGenerator(props)
getPartitionColumns(keyGenerator, props)
}

/**
* @param keyGen key generator
* @return partition columns
*/
def getPartitionColumns(keyGen: KeyGenerator, typedProperties: TypedProperties): String = {
keyGen match {
// For CustomKeyGenerator and CustomAvroKeyGenerator, the partition path filed format
// is: "field_name: field_type", we extract the field_name from the partition path field.
case c: BaseKeyGenerator
if c.isInstanceOf[CustomKeyGenerator] || c.isInstanceOf[CustomAvroKeyGenerator] =>
c.getPartitionPathFields.asScala.map(pathField =>
pathField.split(CustomAvroKeyGenerator.SPLIT_REGEX)
.headOption.getOrElse(s"Illegal partition path field format: '$pathField' for ${c.getClass.getSimpleName}"))
.mkString(",")

case b: BaseKeyGenerator => b.getPartitionPathFields.asScala.mkString(",")
case _ => typedProperties.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key())
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
import java.util.Map;
import java.util.Deque;
import java.util.LinkedList;
import java.util.Set;
import java.util.TimeZone;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -169,12 +170,7 @@ public static GenericRecord jsonBytesToAvro(byte[] bytes, Schema schema) throws
}

public static boolean isMetadataField(String fieldName) {
return HoodieRecord.COMMIT_TIME_METADATA_FIELD.equals(fieldName)
|| HoodieRecord.COMMIT_SEQNO_METADATA_FIELD.equals(fieldName)
|| HoodieRecord.RECORD_KEY_METADATA_FIELD.equals(fieldName)
|| HoodieRecord.PARTITION_PATH_METADATA_FIELD.equals(fieldName)
|| HoodieRecord.FILENAME_METADATA_FIELD.equals(fieldName)
|| HoodieRecord.OPERATION_METADATA_FIELD.equals(fieldName);
return HoodieRecord.HOODIE_META_COLUMNS_WITH_OPERATION.contains(fieldName);
}

public static Schema createHoodieWriteSchema(Schema originalSchema) {
Expand Down Expand Up @@ -245,7 +241,7 @@ public static Schema removeMetadataFields(Schema schema) {
return removeFields(schema, HoodieRecord.HOODIE_META_COLUMNS_WITH_OPERATION);
}

public static Schema removeFields(Schema schema, List<String> fieldsToRemove) {
public static Schema removeFields(Schema schema, Set<String> fieldsToRemove) {
List<Schema.Field> filteredFields = schema.getFields()
.stream()
.filter(field -> !fieldsToRemove.contains(field.name()))
Expand Down Expand Up @@ -422,7 +418,7 @@ public static List<GenericRecord> rewriteRecords(List<GenericRecord> records, Sc
* <p>
* To better understand how it removes please check {@link #rewriteRecord(GenericRecord, Schema)}
*/
public static GenericRecord removeFields(GenericRecord record, List<String> fieldsToRemove) {
public static GenericRecord removeFields(GenericRecord record, Set<String> fieldsToRemove) {
Schema newSchema = removeFields(record.getSchema(), fieldsToRemove);
return rewriteRecord(record, newSchema);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

Expand All @@ -48,8 +49,8 @@ public abstract class HoodieRecord<T> implements Serializable {

// Temporary to support the '_hoodie_operation' field, once we solve
// the compatibility problem, it can be removed.
public static final List<String> HOODIE_META_COLUMNS_WITH_OPERATION =
CollectionUtils.createImmutableList(COMMIT_TIME_METADATA_FIELD, COMMIT_SEQNO_METADATA_FIELD,
public static final Set<String> HOODIE_META_COLUMNS_WITH_OPERATION =
CollectionUtils.createImmutableSet(COMMIT_TIME_METADATA_FIELD, COMMIT_SEQNO_METADATA_FIELD,
RECORD_KEY_METADATA_FIELD, PARTITION_PATH_METADATA_FIELD, FILENAME_METADATA_FIELD,
OPERATION_METADATA_FIELD);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
import java.nio.ByteBuffer;
import java.sql.Date;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -249,7 +249,7 @@ public void testRemoveFields() {
rec.put("non_pii_col", "val1");
rec.put("pii_col", "val2");
rec.put("timestamp", 3.5);
GenericRecord rec1 = HoodieAvroUtils.removeFields(rec, Arrays.asList("pii_col"));
GenericRecord rec1 = HoodieAvroUtils.removeFields(rec, Collections.singleton("pii_col"));
assertEquals("key1", rec1.get("_row_key"));
assertEquals("val1", rec1.get("non_pii_col"));
assertEquals(3.5, rec1.get("timestamp"));
Expand All @@ -262,7 +262,7 @@ public void testRemoveFields() {
+ "{\"name\": \"non_pii_col\", \"type\": \"string\"},"
+ "{\"name\": \"pii_col\", \"type\": \"string\"}]},";
expectedSchema = new Schema.Parser().parse(schemaStr);
rec1 = HoodieAvroUtils.removeFields(rec, Arrays.asList(""));
rec1 = HoodieAvroUtils.removeFields(rec, Collections.singleton(""));
assertEquals(expectedSchema, rec1.getSchema());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hudi.table.catalog;

import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
Expand Down Expand Up @@ -60,7 +61,7 @@ public static List<String> getFieldNames(List<FieldSchema> fieldSchemas) {
public static org.apache.flink.table.api.Schema convertTableSchema(Table hiveTable) {
List<FieldSchema> allCols = hiveTable.getSd().getCols().stream()
// filter out the metadata columns
.filter(s -> !HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS.containsKey(s.getName()))
.filter(s -> !HoodieAvroUtils.isMetadataField(s.getName()))
.collect(Collectors.toList());
// need to refactor the partition key field positions: they are not always in the last
allCols.addAll(hiveTable.getPartitionKeys());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -343,9 +343,9 @@ object HoodieSparkSqlWriter {
}

def generateSchemaWithoutPartitionColumns(partitionParam: String, schema: Schema): Schema = {
val fieldsToRemove = new java.util.ArrayList[String]()
val fieldsToRemove = new java.util.HashSet[String]()
partitionParam.split(",").map(partitionField => partitionField.trim)
.filter(s => !s.isEmpty).map(field => fieldsToRemove.add(field))
.filter(s => s.nonEmpty).map(field => fieldsToRemove.add(field))
HoodieAvroUtils.removeFields(schema, fieldsToRemove)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory;
import org.apache.hudi.metrics.HoodieMetrics;
import org.apache.hudi.sync.common.util.SyncUtilHelpers;
import org.apache.hudi.util.SparkKeyGenUtils;
import org.apache.hudi.utilities.UtilHelpers;
import org.apache.hudi.utilities.callback.kafka.HoodieWriteCommitKafkaCallback;
import org.apache.hudi.utilities.callback.kafka.HoodieWriteCommitKafkaCallbackConfig;
Expand Down Expand Up @@ -485,7 +486,7 @@ private Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> fetchFromSourc
}

boolean shouldCombine = cfg.filterDupes || cfg.operation.equals(WriteOperationType.UPSERT);
List<String> partitionColumns = getPartitionColumns(keyGenerator, props);
Set<String> partitionColumns = getPartitionColumns(keyGenerator, props);
JavaRDD<GenericRecord> avroRDD = avroRDDOptional.get();
JavaRDD<HoodieRecord> records = avroRDD.map(record -> {
GenericRecord gr = isDropPartitionColumns() ? HoodieAvroUtils.removeFields(record, partitionColumns) : record;
Expand Down Expand Up @@ -946,14 +947,14 @@ private Boolean isDropPartitionColumns() {
}

/**
* Get the list of partition columns as a list of strings.
* Get the partition columns as a set of strings.
*
* @param keyGenerator KeyGenerator
* @param props TypedProperties
* @return List of partition columns.
* @return Set of partition columns.
*/
private List<String> getPartitionColumns(KeyGenerator keyGenerator, TypedProperties props) {
String partitionColumns = HoodieSparkUtils.getPartitionColumns(keyGenerator, props);
return Arrays.asList(partitionColumns.split(","));
private Set<String> getPartitionColumns(KeyGenerator keyGenerator, TypedProperties props) {
String partitionColumns = SparkKeyGenUtils.getPartitionColumns(keyGenerator, props);
return Arrays.stream(partitionColumns.split(",")).collect(Collectors.toSet());
}
}

0 comments on commit 6572693

Please sign in to comment.