diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java index af6478e56e4a..a352e86b96c7 100644 --- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java @@ -75,6 +75,7 @@ import java.util.Map; import java.util.Deque; import java.util.LinkedList; +import java.util.Set; import java.util.TimeZone; import java.util.stream.Collectors; @@ -169,12 +170,7 @@ public static GenericRecord jsonBytesToAvro(byte[] bytes, Schema schema) throws } public static boolean isMetadataField(String fieldName) { - return HoodieRecord.COMMIT_TIME_METADATA_FIELD.equals(fieldName) - || HoodieRecord.COMMIT_SEQNO_METADATA_FIELD.equals(fieldName) - || HoodieRecord.RECORD_KEY_METADATA_FIELD.equals(fieldName) - || HoodieRecord.PARTITION_PATH_METADATA_FIELD.equals(fieldName) - || HoodieRecord.FILENAME_METADATA_FIELD.equals(fieldName) - || HoodieRecord.OPERATION_METADATA_FIELD.equals(fieldName); + return HoodieRecord.HOODIE_META_COLUMNS_WITH_OPERATION.contains(fieldName); } public static Schema createHoodieWriteSchema(Schema originalSchema) { @@ -245,7 +241,7 @@ public static Schema removeMetadataFields(Schema schema) { return removeFields(schema, HoodieRecord.HOODIE_META_COLUMNS_WITH_OPERATION); } - public static Schema removeFields(Schema schema, List fieldsToRemove) { + public static Schema removeFields(Schema schema, Set fieldsToRemove) { List filteredFields = schema.getFields() .stream() .filter(field -> !fieldsToRemove.contains(field.name())) @@ -422,7 +418,7 @@ public static List rewriteRecords(List records, Sc *

* To better understand how it removes please check {@link #rewriteRecord(GenericRecord, Schema)} */ - public static GenericRecord removeFields(GenericRecord record, List fieldsToRemove) { + public static GenericRecord removeFields(GenericRecord record, Set fieldsToRemove) { Schema newSchema = removeFields(record.getSchema(), fieldsToRemove); return rewriteRecord(record, newSchema); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java index c7ef08a16264..c18d1f333bd3 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java @@ -26,6 +26,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Set; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -48,8 +49,8 @@ public abstract class HoodieRecord implements Serializable { // Temporary to support the '_hoodie_operation' field, once we solve // the compatibility problem, it can be removed. - public static final List HOODIE_META_COLUMNS_WITH_OPERATION = - CollectionUtils.createImmutableList(COMMIT_TIME_METADATA_FIELD, COMMIT_SEQNO_METADATA_FIELD, + public static final Set HOODIE_META_COLUMNS_WITH_OPERATION = + CollectionUtils.createImmutableSet(COMMIT_TIME_METADATA_FIELD, COMMIT_SEQNO_METADATA_FIELD, RECORD_KEY_METADATA_FIELD, PARTITION_PATH_METADATA_FIELD, FILENAME_METADATA_FIELD, OPERATION_METADATA_FIELD); diff --git a/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java b/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java index ad77e13b4654..3371085d2130 100644 --- a/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java @@ -34,7 +34,7 @@ import java.nio.ByteBuffer; import java.sql.Date; import java.util.ArrayList; -import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -249,7 +249,7 @@ public void testRemoveFields() { rec.put("non_pii_col", "val1"); rec.put("pii_col", "val2"); rec.put("timestamp", 3.5); - GenericRecord rec1 = HoodieAvroUtils.removeFields(rec, Arrays.asList("pii_col")); + GenericRecord rec1 = HoodieAvroUtils.removeFields(rec, Collections.singleton("pii_col")); assertEquals("key1", rec1.get("_row_key")); assertEquals("val1", rec1.get("non_pii_col")); assertEquals(3.5, rec1.get("timestamp")); @@ -262,7 +262,7 @@ public void testRemoveFields() { + "{\"name\": \"non_pii_col\", \"type\": \"string\"}," + "{\"name\": \"pii_col\", \"type\": \"string\"}]},"; expectedSchema = new Schema.Parser().parse(schemaStr); - rec1 = HoodieAvroUtils.removeFields(rec, Arrays.asList("")); + rec1 = HoodieAvroUtils.removeFields(rec, Collections.singleton("")); assertEquals(expectedSchema, rec1.getSchema()); } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HiveSchemaUtils.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HiveSchemaUtils.java index 2d3bf4389cef..a057c02f2cca 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HiveSchemaUtils.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HiveSchemaUtils.java @@ -18,6 +18,7 @@ package org.apache.hudi.table.catalog; +import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.collection.Pair; @@ -60,7 +61,7 @@ public static List getFieldNames(List fieldSchemas) { public static org.apache.flink.table.api.Schema convertTableSchema(Table hiveTable) { List allCols = hiveTable.getSd().getCols().stream() // filter out the metadata columns - .filter(s -> !HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS.containsKey(s.getName())) + .filter(s -> !HoodieAvroUtils.isMetadataField(s.getName())) .collect(Collectors.toList()); // need to refactor the partition key field positions: they are not always in the last allCols.addAll(hiveTable.getPartitionKeys()); diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index 99be676f1413..b9ff4c0d1a3e 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -344,9 +344,9 @@ object HoodieSparkSqlWriter { } def generateSchemaWithoutPartitionColumns(partitionParam: String, schema: Schema): Schema = { - val fieldsToRemove = new java.util.ArrayList[String]() + val fieldsToRemove = new java.util.HashSet[String]() partitionParam.split(",").map(partitionField => partitionField.trim) - .filter(s => !s.isEmpty).map(field => fieldsToRemove.add(field)) + .filter(s => s.nonEmpty).map(field => fieldsToRemove.add(field)) HoodieAvroUtils.removeFields(schema, fieldsToRemove) } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java index 89716047dbb1..3fbb8c0f8d48 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java @@ -487,7 +487,7 @@ private Pair>> fetchFromSourc } boolean shouldCombine = cfg.filterDupes || cfg.operation.equals(WriteOperationType.UPSERT); - List partitionColumns = getPartitionColumns(keyGenerator, props); + Set partitionColumns = getPartitionColumns(keyGenerator, props); JavaRDD avroRDD = avroRDDOptional.get(); JavaRDD records = avroRDD.map(record -> { GenericRecord gr = isDropPartitionColumns() ? HoodieAvroUtils.removeFields(record, partitionColumns) : record; @@ -952,14 +952,14 @@ private Boolean isDropPartitionColumns() { } /** - * Get the list of partition columns as a list of strings. + * Get the partition columns as a set of strings. * * @param keyGenerator KeyGenerator * @param props TypedProperties - * @return List of partition columns. + * @return Set of partition columns. */ - private List getPartitionColumns(KeyGenerator keyGenerator, TypedProperties props) { + private Set getPartitionColumns(KeyGenerator keyGenerator, TypedProperties props) { String partitionColumns = SparkKeyGenUtils.getPartitionColumns(keyGenerator, props); - return Arrays.asList(partitionColumns.split(",")); + return Arrays.stream(partitionColumns.split(",")).collect(Collectors.toSet()); } }