Skip to content

Commit

Permalink
[HUDI-5291] Fixing NPE in MOR column stats accounting (apache#7349)
Browse files Browse the repository at this point in the history
This is addressing NPE while handling column stats w/in the HoodieAppendHandle
  • Loading branch information
alexeykudinkin authored and Alexey Kudinkin committed Dec 14, 2022
1 parent e56631f commit aecfb40
Show file tree
Hide file tree
Showing 6 changed files with 40 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,19 @@ public static Schema resolveUnionSchema(Schema schema, String fieldSchemaFullNam
return nonNullType;
}

/**
* Returns true in case provided {@link Schema} is nullable (ie accepting null values),
* returns false otherwise
*/
public static boolean isNullable(Schema schema) {
if (schema.getType() != Schema.Type.UNION) {
return false;
}

List<Schema> innerTypes = schema.getTypes();
return innerTypes.size() > 1 && innerTypes.stream().anyMatch(it -> it.getType() == Schema.Type.NULL);
}

/**
* Resolves typical Avro's nullable schema definition: {@code Union(Schema.Type.NULL, <NonNullType>)},
* decomposing union and returning the target non-null type
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,10 @@

import static org.apache.avro.Schema.Type.UNION;
import static org.apache.hudi.avro.AvroSchemaUtils.createNullableSchema;
import static org.apache.hudi.avro.AvroSchemaUtils.isNullable;
import static org.apache.hudi.avro.AvroSchemaUtils.resolveNullableSchema;
import static org.apache.hudi.avro.AvroSchemaUtils.resolveUnionSchema;
import static org.apache.hudi.common.util.ValidationUtils.checkState;

/**
* Helper class to do common stuff across Avro.
Expand Down Expand Up @@ -644,19 +646,17 @@ public static Option<String> getNullableValAsString(GenericRecord rec, String fi
* @param fieldValue avro field value
* @return field value either converted (for certain data types) or as it is.
*/
public static Object convertValueForSpecificDataTypes(Schema fieldSchema, Object fieldValue, boolean consistentLogicalTimestampEnabled) {
public static Object convertValueForSpecificDataTypes(Schema fieldSchema,
Object fieldValue,
boolean consistentLogicalTimestampEnabled) {
if (fieldSchema == null) {
return fieldValue;
} else if (fieldValue == null) {
checkState(isNullable(fieldSchema));
return null;
}

if (fieldSchema.getType() == Schema.Type.UNION) {
for (Schema schema : fieldSchema.getTypes()) {
if (schema.getType() != Schema.Type.NULL) {
return convertValueForAvroLogicalTypes(schema, fieldValue, consistentLogicalTimestampEnabled);
}
}
}
return convertValueForAvroLogicalTypes(fieldSchema, fieldValue, consistentLogicalTimestampEnabled);
return convertValueForAvroLogicalTypes(resolveNullableSchema(fieldSchema), fieldValue, consistentLogicalTimestampEnabled);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,6 @@
{"c1_maxValue":932,"c1_minValue":0,"c1_nullCount":0,"c2_maxValue":" 932sdc","c2_minValue":" 0sdc","c2_nullCount":0,"c3_maxValue":994.355,"c3_minValue":19.000,"c3_nullCount":0,"c4_maxValue":"2021-11-19T20:40:55.549-08:00","c4_minValue":"2021-11-19T20:40:55.339-08:00","c4_nullCount":0,"c5_maxValue":94,"c5_minValue":1,"c5_nullCount":0,"c6_maxValue":"2020-09-09","c6_minValue":"2020-01-01","c6_nullCount":0,"c7_maxValue":"xw==","c7_minValue":"AA==","c7_nullCount":0,"c8_maxValue":9,"c8_minValue":9,"c8_nullCount":0,"valueCount":8}
{"c1_maxValue":932,"c1_minValue":0,"c1_nullCount":0,"c2_maxValue":" 987sdc","c2_minValue":" 0sdc","c2_nullCount":0,"c3_maxValue":994.355,"c3_minValue":19.000,"c3_nullCount":0,"c4_maxValue":"2021-11-19T20:40:55.549-08:00","c4_minValue":"2021-11-18T23:34:44.180-08:00","c4_nullCount":0,"c5_maxValue":94,"c5_minValue":1,"c5_nullCount":0,"c6_maxValue":"2020-10-10","c6_minValue":"2020-01-01","c6_nullCount":0,"c7_maxValue":"xw==","c7_minValue":"AA==","c7_nullCount":0,"c8_maxValue":9,"c8_minValue":9,"c8_nullCount":0,"valueCount":8}
{"c1_maxValue":943,"c1_minValue":89,"c1_nullCount":0,"c2_maxValue":" 943sdc","c2_minValue":" 200sdc","c2_nullCount":0,"c3_maxValue":854.690,"c3_minValue":100.556,"c3_nullCount":0,"c4_maxValue":"2021-11-19T20:40:55.549-08:00","c4_minValue":"2021-11-19T20:40:55.508-08:00","c4_nullCount":0,"c5_maxValue":95,"c5_minValue":10,"c5_nullCount":0,"c6_maxValue":"2020-10-10","c6_minValue":"2020-01-10","c6_nullCount":0,"c7_maxValue":"yA==","c7_minValue":"LA==","c7_nullCount":0,"c8_maxValue":9,"c8_minValue":9,"c8_nullCount":0,"valueCount":10}
{"c1_maxValue":943,"c1_minValue":89,"c1_nullCount":0,"c2_maxValue":" 984sdc","c2_minValue":" 200sdc","c2_nullCount":0,"c3_maxValue":977.328,"c3_minValue":64.768,"c3_nullCount":0,"c4_maxValue":"2021-11-19T20:40:55.549-08:00","c4_minValue":"2021-11-18T23:34:44.181-08:00","c4_nullCount":0,"c5_maxValue":95,"c5_minValue":10,"c5_nullCount":0,"c6_maxValue":"2020-10-21","c6_minValue":"2020-01-10","c6_nullCount":0,"c7_maxValue":"yA==","c7_minValue":"AA==","c7_nullCount":0,"c8_maxValue":9,"c8_minValue":9,"c8_nullCount":0,"valueCount":10}
{"c1_maxValue":943,"c1_minValue":89,"c1_nullCount":0,"c2_maxValue":" 984sdc","c2_minValue":" 200sdc","c2_nullCount":0,"c3_maxValue":977.328,"c3_minValue":64.768,"c3_nullCount":1,"c4_maxValue":"2021-11-19T20:40:55.549-08:00","c4_minValue":"2021-11-18T23:34:44.181-08:00","c4_nullCount":0,"c5_maxValue":95,"c5_minValue":10,"c5_nullCount":0,"c6_maxValue":"2020-10-21","c6_minValue":"2020-01-10","c6_nullCount":0,"c7_maxValue":"yA==","c7_minValue":"AA==","c7_nullCount":0,"c8_maxValue":9,"c8_minValue":9,"c8_nullCount":0,"valueCount":10}
{"c1_maxValue":959,"c1_minValue":74,"c1_nullCount":0,"c2_maxValue":" 959sdc","c2_minValue":" 181sdc","c2_nullCount":0,"c3_maxValue":980.213,"c3_minValue":38.740,"c3_nullCount":0,"c4_maxValue":"2021-11-19T20:40:55.550-08:00","c4_minValue":"2021-11-19T20:40:55.507-08:00","c4_nullCount":0,"c5_maxValue":97,"c5_minValue":9,"c5_nullCount":0,"c6_maxValue":"2020-11-22","c6_minValue":"2020-01-23","c6_nullCount":0,"c7_maxValue":"1Q==","c7_minValue":"Kw==","c7_nullCount":0,"c8_maxValue":9,"c8_minValue":9,"c8_nullCount":0,"valueCount":13}
{"c1_maxValue":959,"c1_minValue":74,"c1_nullCount":0,"c2_maxValue":" 989sdc","c2_minValue":" 181sdc","c2_nullCount":0,"c3_maxValue":980.213,"c3_minValue":38.740,"c3_nullCount":0,"c4_maxValue":"2021-11-19T20:40:55.550-08:00","c4_minValue":"2021-11-18T23:34:44.179-08:00","c4_nullCount":0,"c5_maxValue":97,"c5_minValue":9,"c5_nullCount":0,"c6_maxValue":"2020-11-22","c6_minValue":"2020-02-25","c6_nullCount":0,"c7_maxValue":"1Q==","c7_minValue":"LA==","c7_nullCount":0,"c8_maxValue":9,"c8_minValue":9,"c8_nullCount":0,"valueCount":13}
{"c1_maxValue":959,"c1_minValue":74,"c1_nullCount":0,"c2_maxValue":" 989sdc","c2_minValue":" 181sdc","c2_nullCount":0,"c3_maxValue":980.213,"c3_minValue":38.740,"c3_nullCount":0,"c4_maxValue":"2021-11-19T20:40:55.550-08:00","c4_minValue":"2021-11-18T23:34:44.179-08:00","c4_nullCount":0,"c5_maxValue":97,"c5_minValue":9,"c5_nullCount":0,"c6_maxValue":"2020-11-22","c6_minValue":"2020-02-25","c6_nullCount":0,"c7_maxValue":"1Q==","c7_minValue":"LA==","c7_nullCount":0,"c8_maxValue":9,"c8_minValue":9,"c8_nullCount":0,"valueCount":13}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{"c1_maxValue":101,"c1_minValue":101,"c1_nullCount":0,"c2_maxValue":" 999sdc","c2_minValue":" 999sdc","c2_nullCount":0,"c3_maxValue":10.329,"c3_minValue":10.329,"c3_nullCount":0,"c4_maxValue":"2021-11-18T23:34:44.179-08:00","c4_minValue":"2021-11-18T23:34:44.179-08:00","c4_nullCount":0,"c5_maxValue":99,"c5_minValue":99,"c5_nullCount":0,"c6_maxValue":"2020-03-28","c6_minValue":"2020-03-28","c6_nullCount":0,"c7_maxValue":"SA==","c7_minValue":"SA==","c7_nullCount":0,"c8_maxValue":9,"c8_minValue":9,"c8_nullCount":0,"valueCount":1}
{"c1_maxValue":562,"c1_minValue":323,"c1_nullCount":0,"c2_maxValue":" 984sdc","c2_minValue":" 980sdc","c2_nullCount":0,"c3_maxValue":977.328,"c3_minValue":64.768,"c3_nullCount":0,"c4_maxValue":"2021-11-18T23:34:44.201-08:00","c4_minValue":"2021-11-18T23:34:44.181-08:00","c4_nullCount":0,"c5_maxValue":78,"c5_minValue":34,"c5_nullCount":0,"c6_maxValue":"2020-10-21","c6_minValue":"2020-01-15","c6_nullCount":0,"c7_maxValue":"SA==","c7_minValue":"qw==","c7_nullCount":0,"c8_maxValue":9,"c8_minValue":9,"c8_nullCount":0,"valueCount":4}
{"c1_maxValue":562,"c1_minValue":323,"c1_nullCount":0,"c2_maxValue":" 984sdc","c2_minValue":" 980sdc","c2_nullCount":0,"c3_maxValue":977.328,"c3_minValue":64.768,"c3_nullCount":1,"c4_maxValue":"2021-11-18T23:34:44.201-08:00","c4_minValue":"2021-11-18T23:34:44.181-08:00","c4_nullCount":0,"c5_maxValue":78,"c5_minValue":34,"c5_nullCount":0,"c6_maxValue":"2020-10-21","c6_minValue":"2020-01-15","c6_nullCount":0,"c7_maxValue":"SA==","c7_minValue":"qw==","c7_nullCount":0,"c8_maxValue":9,"c8_minValue":9,"c8_nullCount":0,"valueCount":4}
{"c1_maxValue":568,"c1_minValue":8,"c1_nullCount":0,"c2_maxValue":" 8sdc","c2_minValue":" 111sdc","c2_nullCount":0,"c3_maxValue":979.272,"c3_minValue":82.111,"c3_nullCount":0,"c4_maxValue":"2021-11-18T23:34:44.193-08:00","c4_minValue":"2021-11-18T23:34:44.159-08:00","c4_nullCount":0,"c5_maxValue":58,"c5_minValue":2,"c5_nullCount":0,"c6_maxValue":"2020-11-08","c6_minValue":"2020-01-01","c6_nullCount":0,"c7_maxValue":"9g==","c7_minValue":"Ag==","c7_nullCount":0,"c8_maxValue":9,"c8_minValue":9,"c8_nullCount":0,"valueCount":15}
{"c1_maxValue":619,"c1_minValue":619,"c1_nullCount":0,"c2_maxValue":" 985sdc","c2_minValue":" 985sdc","c2_nullCount":0,"c3_maxValue":230.320,"c3_minValue":230.320,"c3_nullCount":0,"c4_maxValue":"2021-11-18T23:34:44.180-08:00","c4_minValue":"2021-11-18T23:34:44.180-08:00","c4_nullCount":0,"c5_maxValue":33,"c5_minValue":33,"c5_nullCount":0,"c6_maxValue":"2020-02-13","c6_minValue":"2020-02-13","c6_nullCount":0,"c7_maxValue":"QA==","c7_minValue":"QA==","c7_nullCount":0,"c8_maxValue":9,"c8_minValue":9,"c8_nullCount":0,"valueCount":1}
{"c1_maxValue":633,"c1_minValue":624,"c1_nullCount":0,"c2_maxValue":" 987sdc","c2_minValue":" 986sdc","c2_nullCount":0,"c3_maxValue":580.317,"c3_minValue":375.308,"c3_nullCount":0,"c4_maxValue":"2021-11-18T23:34:44.180-08:00","c4_minValue":"2021-11-18T23:34:44.180-08:00","c4_nullCount":0,"c5_maxValue":33,"c5_minValue":32,"c5_nullCount":0,"c6_maxValue":"2020-10-10","c6_minValue":"2020-01-01","c6_nullCount":0,"c7_maxValue":"PQ==","c7_minValue":"NA==","c7_nullCount":0,"c8_maxValue":9,"c8_minValue":9,"c8_nullCount":0,"valueCount":2}
Expand All @@ -10,4 +10,4 @@
{"c1_maxValue":770,"c1_minValue":129,"c1_nullCount":0,"c2_maxValue":" 770sdc","c2_minValue":" 129sdc","c2_nullCount":0,"c3_maxValue":977.328,"c3_minValue":153.431,"c3_nullCount":0,"c4_maxValue":"2021-11-18T23:34:44.201-08:00","c4_minValue":"2021-11-18T23:34:44.169-08:00","c4_nullCount":0,"c5_maxValue":78,"c5_minValue":14,"c5_nullCount":0,"c6_maxValue":"2020-10-21","c6_minValue":"2020-01-15","c6_nullCount":0,"c7_maxValue":"rw==","c7_minValue":"Ag==","c7_nullCount":0,"c8_maxValue":9,"c8_minValue":9,"c8_nullCount":0,"valueCount":6}
{"c1_maxValue":932,"c1_minValue":0,"c1_nullCount":0,"c2_maxValue":" 932sdc","c2_minValue":" 0sdc","c2_nullCount":0,"c3_maxValue":994.355,"c3_minValue":19.000,"c3_nullCount":0,"c4_maxValue":"2021-11-19T20:40:55.549-08:00","c4_minValue":"2021-11-19T20:40:55.339-08:00","c4_nullCount":0,"c5_maxValue":94,"c5_minValue":1,"c5_nullCount":0,"c6_maxValue":"2020-09-09","c6_minValue":"2020-01-01","c6_nullCount":0,"c7_maxValue":"xw==","c7_minValue":"AA==","c7_nullCount":0,"c8_maxValue":9,"c8_minValue":9,"c8_nullCount":0,"valueCount":8}
{"c1_maxValue":943,"c1_minValue":89,"c1_nullCount":0,"c2_maxValue":" 943sdc","c2_minValue":" 200sdc","c2_nullCount":0,"c3_maxValue":854.690,"c3_minValue":100.556,"c3_nullCount":0,"c4_maxValue":"2021-11-19T20:40:55.549-08:00","c4_minValue":"2021-11-19T20:40:55.508-08:00","c4_nullCount":0,"c5_maxValue":95,"c5_minValue":10,"c5_nullCount":0,"c6_maxValue":"2020-10-10","c6_minValue":"2020-01-10","c6_nullCount":0,"c7_maxValue":"yA==","c7_minValue":"LA==","c7_nullCount":0,"c8_maxValue":9,"c8_minValue":9,"c8_nullCount":0,"valueCount":10}
{"c1_maxValue":959,"c1_minValue":74,"c1_nullCount":0,"c2_maxValue":" 959sdc","c2_minValue":" 181sdc","c2_nullCount":0,"c3_maxValue":980.213,"c3_minValue":38.740,"c3_nullCount":0,"c4_maxValue":"2021-11-19T20:40:55.550-08:00","c4_minValue":"2021-11-19T20:40:55.507-08:00","c4_nullCount":0,"c5_maxValue":97,"c5_minValue":9,"c5_nullCount":0,"c6_maxValue":"2020-11-22","c6_minValue":"2020-01-23","c6_nullCount":0,"c7_maxValue":"1Q==","c7_minValue":"Kw==","c7_nullCount":0,"c8_maxValue":9,"c8_minValue":9,"c8_nullCount":0,"valueCount":13}
{"c1_maxValue":959,"c1_minValue":74,"c1_nullCount":0,"c2_maxValue":" 959sdc","c2_minValue":" 181sdc","c2_nullCount":0,"c3_maxValue":980.213,"c3_minValue":38.740,"c3_nullCount":0,"c4_maxValue":"2021-11-19T20:40:55.550-08:00","c4_minValue":"2021-11-19T20:40:55.507-08:00","c4_nullCount":0,"c5_maxValue":97,"c5_minValue":9,"c5_nullCount":0,"c6_maxValue":"2020-11-22","c6_minValue":"2020-01-23","c6_nullCount":0,"c7_maxValue":"1Q==","c7_minValue":"Kw==","c7_nullCount":0,"c8_maxValue":9,"c8_minValue":9,"c8_nullCount":0,"valueCount":13}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{"c1":323,"c2":" 980sdc","c3":335.770,"c4":"2021-11-18T23:34:44.201-08:00","c5":78,"c6":"2020-01-15","c7":"Ag==","c8":9}
{"c1":323,"c2":" 980sdc","c3":null,"c4":"2021-11-18T23:34:44.201-08:00","c5":78,"c6":"2020-01-15","c7":"Ag==","c8":9}
{"c1":326,"c2":" 981sdc","c3":64.768,"c4":"2021-11-18T23:34:44.201-08:00","c5":78,"c6":"2020-10-13","c7":"AA==","c8":9}
{"c1":555,"c2":" 982sdc","c3":153.431,"c4":"2021-11-18T23:34:44.186-08:00","c5":44,"c6":"2020-03-12","c7":"rw==","c8":9}
{"c1":556,"c2":" 983sdc","c3":246.427,"c4":"2021-11-18T23:34:44.186-08:00","c5":44,"c6":"2020-10-08","c7":"qw==","c8":9}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,11 +111,14 @@ class TestColumnStatsIndex extends HoodieClientTestBase {
operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL,
saveMode = SaveMode.Append)

// NOTE: MOR and COW have different fixtures since MOR is bearing delta-log files (holding
// deferred updates), diverging from COW
val expectedColStatsSourcePath = if (testCase.tableType == HoodieTableType.COPY_ON_WRITE) {
"index/colstats/cow-updated2-column-stats-index-table.json"
} else {
"index/colstats/mor-updated2-column-stats-index-table.json"
}

doWriteAndValidateColumnStats(testCase, metadataOpts, commonOpts,
dataSourcePath = "index/colstats/update-input-table-json",
expectedColStatsSourcePath = expectedColStatsSourcePath,
Expand Down Expand Up @@ -332,15 +335,14 @@ class TestColumnStatsIndex extends HoodieClientTestBase {

metaClient = HoodieTableMetaClient.reload(metaClient)

// Only parquet files are supported for the validation against the generated column stats,
// constructing the column stats from parquet data files using Spark SQL and comparing that
// with column stats index. This means that the following operations are support for such
// validation: (1) COW: all operations; (2) MOR: insert only.
val validateColumnStatsAgainstDataFiles =
(testCase.tableType == HoodieTableType.COPY_ON_WRITE
|| operation.equals(DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL))
// Currently, routine manually validating the column stats (by actually reading every column of every file)
// only supports parquet files. Therefore we skip such validation when delta-log files are present, and only
// validate in following cases: (1) COW: all operations; (2) MOR: insert only.
val shouldValidateColumnStatsManually = testCase.tableType == HoodieTableType.COPY_ON_WRITE ||
operation.equals(DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)

validateColumnStatsIndex(
testCase, metadataOpts, expectedColStatsSourcePath, validateColumnStatsAgainstDataFiles)
testCase, metadataOpts, expectedColStatsSourcePath, shouldValidateColumnStatsManually)
}

private def buildColumnStatsTableManually(tablePath: String,
Expand Down Expand Up @@ -392,7 +394,7 @@ class TestColumnStatsIndex extends HoodieClientTestBase {
private def validateColumnStatsIndex(testCase: ColumnStatsTestCase,
metadataOpts: Map[String, String],
expectedColStatsSourcePath: String,
validateColumnStatsAgainstDataFiles: Boolean): Unit = {
validateColumnStatsManually: Boolean): Unit = {
val metadataConfig = HoodieMetadataConfig.newBuilder()
.fromProperties(toProperties(metadataOpts))
.build()
Expand All @@ -416,11 +418,11 @@ class TestColumnStatsIndex extends HoodieClientTestBase {
assertEquals(asJson(sort(expectedColStatsIndexTableDf, validationSortColumns)),
asJson(sort(transposedColStatsDF.drop("fileName"), validationSortColumns)))

if (validateColumnStatsAgainstDataFiles) {
if (validateColumnStatsManually) {
// TODO(HUDI-4557): support validation of column stats of avro log files
// Collect Column Stats manually (reading individual Parquet files)
val manualColStatsTableDF =
buildColumnStatsTableManually(basePath, sourceTableSchema.fieldNames, sourceTableSchema.fieldNames, expectedColStatsSchema)
buildColumnStatsTableManually(basePath, sourceTableSchema.fieldNames, sourceTableSchema.fieldNames, expectedColStatsSchema)

assertEquals(asJson(sort(manualColStatsTableDF, validationSortColumns)),
asJson(sort(transposedColStatsDF, validationSortColumns)))
Expand Down

0 comments on commit aecfb40

Please sign in to comment.