From 6c89bfb0c387d1d6b91621291d9612ccf742721a Mon Sep 17 00:00:00 2001 From: y00617041 Date: Sat, 17 Sep 2022 11:33:37 +0800 Subject: [PATCH] [HUDI-4093] If the value of the partition column is null or abnormal, the insert operation throws NPE. --- .../java/org/apache/hudi/avro/HoodieAvroUtils.java | 2 +- .../datasources/Spark3ParsePartitionUtil.scala | 11 +++++++++-- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java index af6478e56e4a..e7b796be99a1 100644 --- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java @@ -610,7 +610,7 @@ public static Option getNullableValAsString(GenericRecord rec, String fi * @return field value either converted (for certain data types) or as it is. */ public static Object convertValueForSpecificDataTypes(Schema fieldSchema, Object fieldValue, boolean consistentLogicalTimestampEnabled) { - if (fieldSchema == null) { + if (fieldSchema == null || fieldValue == null) { return fieldValue; } diff --git a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/execution/datasources/Spark3ParsePartitionUtil.scala b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/execution/datasources/Spark3ParsePartitionUtil.scala index ebe92a5a32a9..bdd629ad6f1a 100644 --- a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/execution/datasources/Spark3ParsePartitionUtil.scala +++ b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/execution/datasources/Spark3ParsePartitionUtil.scala @@ -63,9 +63,16 @@ object Spark3ParsePartitionUtil extends SparkParsePartitionUtil { (dateFormatter, timestampFormatter) }) - val (partitionValues, _) = parsePartition(path, typeInference, basePaths, userSpecifiedDataTypes, - validatePartitionValues, tz.toZoneId, dateFormatter, timestampFormatter) + var partitionStr = path.toString + userSpecifiedDataTypes.keySet.foreach { name => + val dataType = userSpecifiedDataTypes.get(name).getOrElse("") + if (!dataType.isInstanceOf[StringType]) { + partitionStr = partitionStr.replace(s"$name=default", s"$name=__HIVE_DEFAULT_PARTITION__") + } + } + val (partitionValues, _) = parsePartition(new Path(partitionStr), typeInference, basePaths, userSpecifiedDataTypes, + validatePartitionValues, tz.toZoneId, dateFormatter, timestampFormatter) partitionValues.map { case PartitionValues(columnNames: Seq[String], typedValues: Seq[TypedPartValue]) => val rowValues = columnNames.zip(typedValues).map { case (columnName, typedValue) =>