Skip to content

Commit

Permalink
[HUDI-5191] Fix compatibility with avro 1.10 (apache#7175)
Browse files Browse the repository at this point in the history
[HUDI-5191] Fix avro compatibility with spark3.2+
  • Loading branch information
Zouxxyy authored and Alexey Kudinkin committed Dec 14, 2022
1 parent a7d8850 commit 060b163
Show file tree
Hide file tree
Showing 7 changed files with 145 additions and 42 deletions.
8 changes: 8 additions & 0 deletions .github/workflows/bot.yml
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,14 @@ jobs:
run: |
HUDI_VERSION=$(mvn help:evaluate -Dexpression=project.version -q -DforceStdout)
./packaging/bundle-validation/ci_run.sh $HUDI_VERSION
- name: Common Test
env:
SCALA_PROFILE: ${{ matrix.scalaProfile }}
SPARK_PROFILE: ${{ matrix.sparkProfile }}
FLINK_PROFILE: ${{ matrix.flinkProfile }}
if: ${{ !endsWith(env.SPARK_PROFILE, '2.4') }} # skip test spark 2.4 as it's covered by Azure CI
run:
mvn test -Punit-tests -D"$SCALA_PROFILE" -D"$SPARK_PROFILE" -D"$FLINK_PROFILE" '-Dtest=Test*' -pl hudi-common $MVN_ARGS
- name: Spark SQL Test
env:
SCALA_PROFILE: ${{ matrix.scalaProfile }}
Expand Down
87 changes: 65 additions & 22 deletions hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@
import org.apache.avro.io.JsonEncoder;
import org.apache.avro.specific.SpecificRecordBase;

import org.apache.hadoop.util.VersionUtil;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
Expand Down Expand Up @@ -89,6 +91,7 @@
*/
public class HoodieAvroUtils {

public static final String AVRO_VERSION = Schema.class.getPackage().getImplementationVersion();
private static final ThreadLocal<BinaryEncoder> BINARY_ENCODER = ThreadLocal.withInitial(() -> null);
private static final ThreadLocal<BinaryDecoder> BINARY_DECODER = ThreadLocal.withInitial(() -> null);

Expand Down Expand Up @@ -478,6 +481,32 @@ public static String getRootLevelFieldName(String fieldName) {
return fieldName.split("\\.")[0];
}

/**
* Obtain value of the provided key, which is consistent with avro before 1.10
*/
public static Object getFieldVal(GenericRecord record, String key) {
return getFieldVal(record, key, true);
}

/**
* Obtain value of the provided key, when set returnNullIfNotFound false,
* it is consistent with avro after 1.10
*/
public static Object getFieldVal(GenericRecord record, String key, boolean returnNullIfNotFound) {
if (record.getSchema().getField(key) == null) {
if (returnNullIfNotFound) {
return null;
} else {
// Since avro 1.10, arvo will throw AvroRuntimeException("Not a valid schema field: " + key)
// rather than return null like the previous version if record doesn't contain this key.
// Here we simulate this behavior.
throw new AvroRuntimeException("Not a valid schema field: " + key);
}
} else {
return record.get(key);
}
}

/**
* Obtain value of the provided field as string, denoted by dot notation. e.g: a.b.c
*/
Expand All @@ -492,44 +521,50 @@ public static String getNestedFieldValAsString(GenericRecord record, String fiel
public static Object getNestedFieldVal(GenericRecord record, String fieldName, boolean returnNullIfNotFound, boolean consistentLogicalTimestampEnabled) {
String[] parts = fieldName.split("\\.");
GenericRecord valueNode = record;
int i = 0;
try {
for (; i < parts.length; i++) {
String part = parts[i];
Object val = valueNode.get(part);
if (val == null) {
break;

for (int i = 0; i < parts.length; i++) {
String part = parts[i];
Object val;
try {
val = HoodieAvroUtils.getFieldVal(valueNode, part, returnNullIfNotFound);
} catch (AvroRuntimeException e) {
if (returnNullIfNotFound) {
return null;
} else {
throw new HoodieException(
fieldName + "(Part -" + parts[i] + ") field not found in record. Acceptable fields were :"
+ valueNode.getSchema().getFields().stream().map(Field::name).collect(Collectors.toList()));
}
}

if (i == parts.length - 1) {
// return, if last part of name
if (i == parts.length - 1) {
if (val == null) {
return null;
} else {
Schema fieldSchema = valueNode.getSchema().getField(part).schema();
return convertValueForSpecificDataTypes(fieldSchema, val, consistentLogicalTimestampEnabled);
} else {
// VC: Need a test here
if (!(val instanceof GenericRecord)) {
}
} else {
if (!(val instanceof GenericRecord)) {
if (returnNullIfNotFound) {
return null;
} else {
throw new HoodieException("Cannot find a record at part value :" + part);
}
} else {
valueNode = (GenericRecord) val;
}
}
} catch (AvroRuntimeException e) {
// Since avro 1.10, arvo will throw AvroRuntimeException("Not a valid schema field: " + key)
// rather than return null like the previous version if if record doesn't contain this key.
// So when returnNullIfNotFound is true, catch this exception.
if (!returnNullIfNotFound) {
throw e;
}
}

// This can only be reached if the length of parts is 0
if (returnNullIfNotFound) {
return null;
} else if (valueNode.getSchema().getField(parts[i]) == null) {
} else {
throw new HoodieException(
fieldName + "(Part -" + parts[i] + ") field not found in record. Acceptable fields were :"
fieldName + " field not found in record. Acceptable fields were :"
+ valueNode.getSchema().getFields().stream().map(Field::name).collect(Collectors.toList()));
} else {
throw new HoodieException("The value of " + parts[i] + " can not be null");
}
}

Expand Down Expand Up @@ -1033,4 +1068,12 @@ public GenericRecord next() {
public static GenericRecord rewriteRecordDeep(GenericRecord oldRecord, Schema newSchema) {
return rewriteRecordWithNewSchema(oldRecord, newSchema, Collections.EMPTY_MAP);
}

public static boolean gteqAvro1_9() {
return VersionUtil.compareVersions(AVRO_VERSION, "1.9") >= 0;
}

public static boolean gteqAvro1_10() {
return VersionUtil.compareVersions(AVRO_VERSION, "1.10") >= 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hudi.common.model.debezium;

import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
import org.apache.hudi.common.util.Option;

Expand Down Expand Up @@ -76,7 +77,7 @@ private Option<IndexedRecord> handleDeleteOperation(IndexedRecord insertRecord)
boolean delete = false;
if (insertRecord instanceof GenericRecord) {
GenericRecord record = (GenericRecord) insertRecord;
Object value = record.get(DebeziumConstants.FLATTENED_OP_COL_NAME);
Object value = HoodieAvroUtils.getFieldVal(record, DebeziumConstants.FLATTENED_OP_COL_NAME);
delete = value != null && value.toString().equalsIgnoreCase(DebeziumConstants.DELETE_OP);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,15 @@

import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.testutils.SchemaTestUtil;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.SchemaCompatibilityException;

import org.apache.avro.AvroRuntimeException;
import org.apache.avro.JsonProperties;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Assertions;

Expand Down Expand Up @@ -253,7 +256,12 @@ public void testRemoveFields() {
assertEquals("key1", rec1.get("_row_key"));
assertEquals("val1", rec1.get("non_pii_col"));
assertEquals(3.5, rec1.get("timestamp"));
assertNull(rec1.get("pii_col"));
if (HoodieAvroUtils.gteqAvro1_10()) {
GenericRecord finalRec1 = rec1;
assertThrows(AvroRuntimeException.class, () -> finalRec1.get("pii_col"));
} else {
assertNull(rec1.get("pii_col"));
}
assertEquals(expectedSchema, rec1.getSchema());

// non-partitioned table test with empty list of fields.
Expand Down Expand Up @@ -287,19 +295,58 @@ public void testGetNestedFieldVal() {
assertNull(rowKeyNotExist);

// Field does not exist
try {
HoodieAvroUtils.getNestedFieldVal(rec, "fake_key", false, false);
} catch (Exception e) {
assertEquals("fake_key(Part -fake_key) field not found in record. Acceptable fields were :[timestamp, _row_key, non_pii_col, pii_col]",
e.getMessage());
}
assertEquals("fake_key(Part -fake_key) field not found in record. Acceptable fields were :[timestamp, _row_key, non_pii_col, pii_col]",
assertThrows(HoodieException.class, () ->
HoodieAvroUtils.getNestedFieldVal(rec, "fake_key", false, false)).getMessage());

// Field exist while value not
try {
HoodieAvroUtils.getNestedFieldVal(rec, "timestamp", false, false);
} catch (Exception e) {
assertEquals("The value of timestamp can not be null", e.getMessage());
}
// Field exists while value not
assertNull(HoodieAvroUtils.getNestedFieldVal(rec, "timestamp", false, false));
}

@Test
public void testGetNestedFieldValWithNestedField() {
Schema nestedSchema = new Schema.Parser().parse(SCHEMA_WITH_NESTED_FIELD);
GenericRecord rec = new GenericData.Record(nestedSchema);

// test get .
assertEquals(". field not found in record. Acceptable fields were :[firstname, lastname, student]",
assertThrows(HoodieException.class, () ->
HoodieAvroUtils.getNestedFieldVal(rec, ".", false, false)).getMessage());

// test get fake_key
assertEquals("fake_key(Part -fake_key) field not found in record. Acceptable fields were :[firstname, lastname, student]",
assertThrows(HoodieException.class, () ->
HoodieAvroUtils.getNestedFieldVal(rec, "fake_key", false, false)).getMessage());

// test get student(null)
assertNull(HoodieAvroUtils.getNestedFieldVal(rec, "student", false, false));

// test get student
GenericRecord studentRecord = new GenericData.Record(rec.getSchema().getField("student").schema());
studentRecord.put("firstname", "person");
rec.put("student", studentRecord);
assertEquals(studentRecord, HoodieAvroUtils.getNestedFieldVal(rec, "student", false, false));

// test get student.fake_key
assertEquals("student.fake_key(Part -fake_key) field not found in record. Acceptable fields were :[firstname, lastname]",
assertThrows(HoodieException.class, () ->
HoodieAvroUtils.getNestedFieldVal(rec, "student.fake_key", false, false)).getMessage());

// test get student.firstname
assertEquals("person", HoodieAvroUtils.getNestedFieldVal(rec, "student.firstname", false, false));

// test get student.lastname(null)
assertNull(HoodieAvroUtils.getNestedFieldVal(rec, "student.lastname", false, false));

// test get student.firstname.fake_key
assertEquals("Cannot find a record at part value :firstname",
assertThrows(HoodieException.class, () ->
HoodieAvroUtils.getNestedFieldVal(rec, "student.firstname.fake_key", false, false)).getMessage());

// test get student.lastname(null).fake_key
assertEquals("Cannot find a record at part value :lastname",
assertThrows(HoodieException.class, () ->
HoodieAvroUtils.getNestedFieldVal(rec, "student.lastname.fake_key", false, false)).getMessage());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2005,7 +2005,7 @@ public void testDataBlockFormatAppendAndReadWithProjectedSchema(
new HashMap<HoodieLogBlockType, Integer>() {{
put(HoodieLogBlockType.AVRO_DATA_BLOCK, 0); // not supported
put(HoodieLogBlockType.HFILE_DATA_BLOCK, 0); // not supported
put(HoodieLogBlockType.PARQUET_DATA_BLOCK, 2605);
put(HoodieLogBlockType.PARQUET_DATA_BLOCK, HoodieAvroUtils.gteqAvro1_9() ? 2593 : 2605);
}};

List<IndexedRecord> recordsRead = getRecords(dataBlockRead);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,10 +181,10 @@ public void testDeletedRecord() throws IOException {
@Test
public void testNullColumn() throws IOException {
Schema avroSchema = Schema.createRecord(Arrays.asList(
new Schema.Field("id", Schema.createUnion(Schema.create(Schema.Type.STRING), Schema.create(Schema.Type.NULL)), "", JsonProperties.NULL_VALUE),
new Schema.Field("name", Schema.createUnion(Schema.create(Schema.Type.STRING), Schema.create(Schema.Type.NULL)), "", JsonProperties.NULL_VALUE),
new Schema.Field("age", Schema.createUnion(Schema.create(Schema.Type.STRING), Schema.create(Schema.Type.NULL)), "", JsonProperties.NULL_VALUE),
new Schema.Field("job", Schema.createUnion(Schema.create(Schema.Type.STRING), Schema.create(Schema.Type.NULL)), "", JsonProperties.NULL_VALUE)
new Schema.Field("id", Schema.createUnion(Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.STRING)), "", JsonProperties.NULL_VALUE),
new Schema.Field("name", Schema.createUnion(Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.STRING)), "", JsonProperties.NULL_VALUE),
new Schema.Field("age", Schema.createUnion(Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.STRING)), "", JsonProperties.NULL_VALUE),
new Schema.Field("job", Schema.createUnion(Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.STRING)), "", JsonProperties.NULL_VALUE)
));
GenericRecord record1 = new GenericData.Record(avroSchema);
record1.put("id", "1");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.hudi.common.util;

import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.model.HoodieRecord;

import org.apache.avro.Schema;
Expand Down Expand Up @@ -73,7 +74,10 @@ public void testGetObjectSize() {
assertEquals(32, getObjectSize(emptyClass));
assertEquals(40, getObjectSize(stringClass));
assertEquals(40, getObjectSize(payloadClass));
assertEquals(1240, getObjectSize(Schema.create(Schema.Type.STRING)));
// Since avro 1.9, Schema use ConcurrentHashMap instead of LinkedHashMap to
// implement props, which will change the size of the object.
assertEquals(HoodieAvroUtils.gteqAvro1_9() ? 1320 : 1240,
getObjectSize(Schema.create(Schema.Type.STRING)));
assertEquals(104, getObjectSize(person));
}

Expand Down

0 comments on commit 060b163

Please sign in to comment.