From 462566a54aa549d31cbb2f90a831b6a9da0d3db1 Mon Sep 17 00:00:00 2001 From: Zouxxyy Date: Sat, 12 Nov 2022 23:25:44 +0800 Subject: [PATCH] [HUDI-5191] Fix compatibility with avro 1.10 (#7175) [HUDI-5191] Fix avro compatibility with spark3.2+ --- .github/workflows/bot.yml | 8 ++ .../org/apache/hudi/avro/HoodieAvroUtils.java | 87 ++++++++++++++----- .../debezium/AbstractDebeziumAvroPayload.java | 3 +- .../apache/hudi/avro/TestHoodieAvroUtils.java | 73 +++++++++++++--- .../functional/TestHoodieLogFormat.java | 2 +- ...writeNonDefaultsWithLatestAvroPayload.java | 8 +- .../model/TestPartialUpdateAvroPayload.java | 2 +- .../common/util/TestObjectSizeCalculator.java | 6 +- 8 files changed, 146 insertions(+), 43 deletions(-) diff --git a/.github/workflows/bot.yml b/.github/workflows/bot.yml index 0a61fa2544bac..fca33bb700a8f 100644 --- a/.github/workflows/bot.yml +++ b/.github/workflows/bot.yml @@ -73,6 +73,14 @@ jobs: run: | HUDI_VERSION=$(mvn help:evaluate -Dexpression=project.version -q -DforceStdout) ./packaging/bundle-validation/ci_run.sh $HUDI_VERSION + - name: Common Test + env: + SCALA_PROFILE: ${{ matrix.scalaProfile }} + SPARK_PROFILE: ${{ matrix.sparkProfile }} + FLINK_PROFILE: ${{ matrix.flinkProfile }} + if: ${{ !endsWith(env.SPARK_PROFILE, '2.4') }} # skip test spark 2.4 as it's covered by Azure CI + run: + mvn test -Punit-tests -D"$SCALA_PROFILE" -D"$SPARK_PROFILE" -D"$FLINK_PROFILE" '-Dtest=Test*' -pl hudi-common $MVN_ARGS - name: Spark SQL Test env: SCALA_PROFILE: ${{ matrix.scalaProfile }} diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java index 5288f7fa0cf63..2f226b2d46098 100644 --- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java @@ -54,6 +54,8 @@ import org.apache.avro.io.JsonEncoder; import org.apache.avro.specific.SpecificRecordBase; +import org.apache.hadoop.util.VersionUtil; + import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; @@ -89,6 +91,7 @@ */ public class HoodieAvroUtils { + public static final String AVRO_VERSION = Schema.class.getPackage().getImplementationVersion(); private static final ThreadLocal BINARY_ENCODER = ThreadLocal.withInitial(() -> null); private static final ThreadLocal BINARY_DECODER = ThreadLocal.withInitial(() -> null); @@ -478,6 +481,32 @@ public static String getRootLevelFieldName(String fieldName) { return fieldName.split("\\.")[0]; } + /** + * Obtain value of the provided key, which is consistent with avro before 1.10 + */ + public static Object getFieldVal(GenericRecord record, String key) { + return getFieldVal(record, key, true); + } + + /** + * Obtain value of the provided key, when set returnNullIfNotFound false, + * it is consistent with avro after 1.10 + */ + public static Object getFieldVal(GenericRecord record, String key, boolean returnNullIfNotFound) { + if (record.getSchema().getField(key) == null) { + if (returnNullIfNotFound) { + return null; + } else { + // Since avro 1.10, arvo will throw AvroRuntimeException("Not a valid schema field: " + key) + // rather than return null like the previous version if record doesn't contain this key. + // Here we simulate this behavior. + throw new AvroRuntimeException("Not a valid schema field: " + key); + } + } else { + return record.get(key); + } + } + /** * Obtain value of the provided field as string, denoted by dot notation. e.g: a.b.c */ @@ -492,44 +521,50 @@ public static String getNestedFieldValAsString(GenericRecord record, String fiel public static Object getNestedFieldVal(GenericRecord record, String fieldName, boolean returnNullIfNotFound, boolean consistentLogicalTimestampEnabled) { String[] parts = fieldName.split("\\."); GenericRecord valueNode = record; - int i = 0; - try { - for (; i < parts.length; i++) { - String part = parts[i]; - Object val = valueNode.get(part); - if (val == null) { - break; + + for (int i = 0; i < parts.length; i++) { + String part = parts[i]; + Object val; + try { + val = HoodieAvroUtils.getFieldVal(valueNode, part, returnNullIfNotFound); + } catch (AvroRuntimeException e) { + if (returnNullIfNotFound) { + return null; + } else { + throw new HoodieException( + fieldName + "(Part -" + parts[i] + ") field not found in record. Acceptable fields were :" + + valueNode.getSchema().getFields().stream().map(Field::name).collect(Collectors.toList())); } + } + if (i == parts.length - 1) { // return, if last part of name - if (i == parts.length - 1) { + if (val == null) { + return null; + } else { Schema fieldSchema = valueNode.getSchema().getField(part).schema(); return convertValueForSpecificDataTypes(fieldSchema, val, consistentLogicalTimestampEnabled); - } else { - // VC: Need a test here - if (!(val instanceof GenericRecord)) { + } + } else { + if (!(val instanceof GenericRecord)) { + if (returnNullIfNotFound) { + return null; + } else { throw new HoodieException("Cannot find a record at part value :" + part); } + } else { valueNode = (GenericRecord) val; } } - } catch (AvroRuntimeException e) { - // Since avro 1.10, arvo will throw AvroRuntimeException("Not a valid schema field: " + key) - // rather than return null like the previous version if if record doesn't contain this key. - // So when returnNullIfNotFound is true, catch this exception. - if (!returnNullIfNotFound) { - throw e; - } } + // This can only be reached if the length of parts is 0 if (returnNullIfNotFound) { return null; - } else if (valueNode.getSchema().getField(parts[i]) == null) { + } else { throw new HoodieException( - fieldName + "(Part -" + parts[i] + ") field not found in record. Acceptable fields were :" + fieldName + " field not found in record. Acceptable fields were :" + valueNode.getSchema().getFields().stream().map(Field::name).collect(Collectors.toList())); - } else { - throw new HoodieException("The value of " + parts[i] + " can not be null"); } } @@ -1033,4 +1068,12 @@ public GenericRecord next() { public static GenericRecord rewriteRecordDeep(GenericRecord oldRecord, Schema newSchema) { return rewriteRecordWithNewSchema(oldRecord, newSchema, Collections.EMPTY_MAP); } + + public static boolean gteqAvro1_9() { + return VersionUtil.compareVersions(AVRO_VERSION, "1.9") >= 0; + } + + public static boolean gteqAvro1_10() { + return VersionUtil.compareVersions(AVRO_VERSION, "1.10") >= 0; + } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/debezium/AbstractDebeziumAvroPayload.java b/hudi-common/src/main/java/org/apache/hudi/common/model/debezium/AbstractDebeziumAvroPayload.java index 33f1d9f0025b2..9082d572a4bdb 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/debezium/AbstractDebeziumAvroPayload.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/debezium/AbstractDebeziumAvroPayload.java @@ -18,6 +18,7 @@ package org.apache.hudi.common.model.debezium; +import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload; import org.apache.hudi.common.util.Option; @@ -76,7 +77,7 @@ private Option handleDeleteOperation(IndexedRecord insertRecord) boolean delete = false; if (insertRecord instanceof GenericRecord) { GenericRecord record = (GenericRecord) insertRecord; - Object value = record.get(DebeziumConstants.FLATTENED_OP_COL_NAME); + Object value = HoodieAvroUtils.getFieldVal(record, DebeziumConstants.FLATTENED_OP_COL_NAME); delete = value != null && value.toString().equalsIgnoreCase(DebeziumConstants.DELETE_OP); } diff --git a/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java b/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java index 3371085d2130f..483c49b1f50bc 100644 --- a/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java @@ -20,12 +20,15 @@ import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.testutils.SchemaTestUtil; +import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.SchemaCompatibilityException; +import org.apache.avro.AvroRuntimeException; import org.apache.avro.JsonProperties; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; + import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Assertions; @@ -253,7 +256,12 @@ public void testRemoveFields() { assertEquals("key1", rec1.get("_row_key")); assertEquals("val1", rec1.get("non_pii_col")); assertEquals(3.5, rec1.get("timestamp")); - assertNull(rec1.get("pii_col")); + if (HoodieAvroUtils.gteqAvro1_10()) { + GenericRecord finalRec1 = rec1; + assertThrows(AvroRuntimeException.class, () -> finalRec1.get("pii_col")); + } else { + assertNull(rec1.get("pii_col")); + } assertEquals(expectedSchema, rec1.getSchema()); // non-partitioned table test with empty list of fields. @@ -287,19 +295,58 @@ public void testGetNestedFieldVal() { assertNull(rowKeyNotExist); // Field does not exist - try { - HoodieAvroUtils.getNestedFieldVal(rec, "fake_key", false, false); - } catch (Exception e) { - assertEquals("fake_key(Part -fake_key) field not found in record. Acceptable fields were :[timestamp, _row_key, non_pii_col, pii_col]", - e.getMessage()); - } + assertEquals("fake_key(Part -fake_key) field not found in record. Acceptable fields were :[timestamp, _row_key, non_pii_col, pii_col]", + assertThrows(HoodieException.class, () -> + HoodieAvroUtils.getNestedFieldVal(rec, "fake_key", false, false)).getMessage()); - // Field exist while value not - try { - HoodieAvroUtils.getNestedFieldVal(rec, "timestamp", false, false); - } catch (Exception e) { - assertEquals("The value of timestamp can not be null", e.getMessage()); - } + // Field exists while value not + assertNull(HoodieAvroUtils.getNestedFieldVal(rec, "timestamp", false, false)); + } + + @Test + public void testGetNestedFieldValWithNestedField() { + Schema nestedSchema = new Schema.Parser().parse(SCHEMA_WITH_NESTED_FIELD); + GenericRecord rec = new GenericData.Record(nestedSchema); + + // test get . + assertEquals(". field not found in record. Acceptable fields were :[firstname, lastname, student]", + assertThrows(HoodieException.class, () -> + HoodieAvroUtils.getNestedFieldVal(rec, ".", false, false)).getMessage()); + + // test get fake_key + assertEquals("fake_key(Part -fake_key) field not found in record. Acceptable fields were :[firstname, lastname, student]", + assertThrows(HoodieException.class, () -> + HoodieAvroUtils.getNestedFieldVal(rec, "fake_key", false, false)).getMessage()); + + // test get student(null) + assertNull(HoodieAvroUtils.getNestedFieldVal(rec, "student", false, false)); + + // test get student + GenericRecord studentRecord = new GenericData.Record(rec.getSchema().getField("student").schema()); + studentRecord.put("firstname", "person"); + rec.put("student", studentRecord); + assertEquals(studentRecord, HoodieAvroUtils.getNestedFieldVal(rec, "student", false, false)); + + // test get student.fake_key + assertEquals("student.fake_key(Part -fake_key) field not found in record. Acceptable fields were :[firstname, lastname]", + assertThrows(HoodieException.class, () -> + HoodieAvroUtils.getNestedFieldVal(rec, "student.fake_key", false, false)).getMessage()); + + // test get student.firstname + assertEquals("person", HoodieAvroUtils.getNestedFieldVal(rec, "student.firstname", false, false)); + + // test get student.lastname(null) + assertNull(HoodieAvroUtils.getNestedFieldVal(rec, "student.lastname", false, false)); + + // test get student.firstname.fake_key + assertEquals("Cannot find a record at part value :firstname", + assertThrows(HoodieException.class, () -> + HoodieAvroUtils.getNestedFieldVal(rec, "student.firstname.fake_key", false, false)).getMessage()); + + // test get student.lastname(null).fake_key + assertEquals("Cannot find a record at part value :lastname", + assertThrows(HoodieException.class, () -> + HoodieAvroUtils.getNestedFieldVal(rec, "student.lastname.fake_key", false, false)).getMessage()); } @Test diff --git a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java index 1c58727b39952..f16eb52d3686f 100755 --- a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java @@ -2347,7 +2347,7 @@ public void testDataBlockFormatAppendAndReadWithProjectedSchema( new HashMap() {{ put(HoodieLogBlockType.AVRO_DATA_BLOCK, 0); // not supported put(HoodieLogBlockType.HFILE_DATA_BLOCK, 0); // not supported - put(HoodieLogBlockType.PARQUET_DATA_BLOCK, 2605); + put(HoodieLogBlockType.PARQUET_DATA_BLOCK, HoodieAvroUtils.gteqAvro1_9() ? 2593 : 2605); }}; List recordsRead = getRecords(dataBlockRead); diff --git a/hudi-common/src/test/java/org/apache/hudi/common/model/TestOverwriteNonDefaultsWithLatestAvroPayload.java b/hudi-common/src/test/java/org/apache/hudi/common/model/TestOverwriteNonDefaultsWithLatestAvroPayload.java index 0807b41f610c9..4b7e4bda0b36c 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/model/TestOverwriteNonDefaultsWithLatestAvroPayload.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/model/TestOverwriteNonDefaultsWithLatestAvroPayload.java @@ -181,10 +181,10 @@ public void testDeletedRecord() throws IOException { @Test public void testNullColumn() throws IOException { Schema avroSchema = Schema.createRecord(Arrays.asList( - new Schema.Field("id", Schema.createUnion(Schema.create(Schema.Type.STRING), Schema.create(Schema.Type.NULL)), "", JsonProperties.NULL_VALUE), - new Schema.Field("name", Schema.createUnion(Schema.create(Schema.Type.STRING), Schema.create(Schema.Type.NULL)), "", JsonProperties.NULL_VALUE), - new Schema.Field("age", Schema.createUnion(Schema.create(Schema.Type.STRING), Schema.create(Schema.Type.NULL)), "", JsonProperties.NULL_VALUE), - new Schema.Field("job", Schema.createUnion(Schema.create(Schema.Type.STRING), Schema.create(Schema.Type.NULL)), "", JsonProperties.NULL_VALUE) + new Schema.Field("id", Schema.createUnion(Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.STRING)), "", JsonProperties.NULL_VALUE), + new Schema.Field("name", Schema.createUnion(Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.STRING)), "", JsonProperties.NULL_VALUE), + new Schema.Field("age", Schema.createUnion(Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.STRING)), "", JsonProperties.NULL_VALUE), + new Schema.Field("job", Schema.createUnion(Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.STRING)), "", JsonProperties.NULL_VALUE) )); GenericRecord record1 = new GenericData.Record(avroSchema); record1.put("id", "1"); diff --git a/hudi-common/src/test/java/org/apache/hudi/common/model/TestPartialUpdateAvroPayload.java b/hudi-common/src/test/java/org/apache/hudi/common/model/TestPartialUpdateAvroPayload.java index 1736bc35faff6..6431b63899f2f 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/model/TestPartialUpdateAvroPayload.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/model/TestPartialUpdateAvroPayload.java @@ -57,7 +57,7 @@ public class TestPartialUpdateAvroPayload { + " {\"name\": \"id\", \"type\": [\"null\", \"string\"]},\n" + " {\"name\": \"partition\", \"type\": [\"null\", \"string\"]},\n" + " {\"name\": \"ts\", \"type\": [\"null\", \"long\"]},\n" - + " {\"name\": \"_hoodie_is_deleted\", \"type\": [\"null\", \"boolean\"], \"default\":false},\n" + + " {\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false},\n" + " {\"name\": \"city\", \"type\": [\"null\", \"string\"]},\n" + " {\"name\": \"child\", \"type\": [\"null\", {\"type\": \"array\", \"items\": \"string\"}]}\n" + " ]\n" diff --git a/hudi-common/src/test/java/org/apache/hudi/common/util/TestObjectSizeCalculator.java b/hudi-common/src/test/java/org/apache/hudi/common/util/TestObjectSizeCalculator.java index 712f4b85f84a5..625e30198404a 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/util/TestObjectSizeCalculator.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/util/TestObjectSizeCalculator.java @@ -19,6 +19,7 @@ package org.apache.hudi.common.util; +import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.common.model.HoodieRecord; import org.apache.avro.Schema; @@ -73,7 +74,10 @@ public void testGetObjectSize() { assertEquals(32, getObjectSize(emptyClass)); assertEquals(40, getObjectSize(stringClass)); assertEquals(40, getObjectSize(payloadClass)); - assertEquals(1240, getObjectSize(Schema.create(Schema.Type.STRING))); + // Since avro 1.9, Schema use ConcurrentHashMap instead of LinkedHashMap to + // implement props, which will change the size of the object. + assertEquals(HoodieAvroUtils.gteqAvro1_9() ? 1320 : 1240, + getObjectSize(Schema.create(Schema.Type.STRING))); assertEquals(104, getObjectSize(person)); }