Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-5191] Fix compatibility with avro 1.10 #7175

Merged
merged 5 commits into from
Nov 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .github/workflows/bot.yml
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,14 @@ jobs:
run: |
HUDI_VERSION=$(mvn help:evaluate -Dexpression=project.version -q -DforceStdout)
./packaging/bundle-validation/ci_run.sh $HUDI_VERSION
- name: Common Test
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Zouxxyy what are we specifically looking for to be tested in here? We need to be careful in expanding the scope here

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the cmd, it seems to test the hudi-common module specifically.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alexeykudinkin Because the patch repairs the avro compatibility of the test cases in the hudi-common module under spark3, the test of the hudi-common module is added in bot

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Zouxxyy my point being -- hudi-common components (or tests) should not depend on particular Spark version. If something like in this case relies on particular Spark version (or other dependency that Spark brings) we should move it from hudi-common to hudi-spark

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is a deeper issue with this - hudi common is tightly coupled with avro models, which variates wrt spark profiles. currently hudi-common jar won't be compatible across all engine profiles: e.g., if built with spark3.3 (avro 1.11), it won't work with spark 2 (avro 1.8) or flink (avro 1.10). this needs to be decoupled first.

env:
SCALA_PROFILE: ${{ matrix.scalaProfile }}
SPARK_PROFILE: ${{ matrix.sparkProfile }}
FLINK_PROFILE: ${{ matrix.flinkProfile }}
if: ${{ !endsWith(env.SPARK_PROFILE, '2.4') }} # skip test spark 2.4 as it's covered by Azure CI
run:
mvn test -Punit-tests -D"$SCALA_PROFILE" -D"$SPARK_PROFILE" -D"$FLINK_PROFILE" '-Dtest=Test*' -pl hudi-common $MVN_ARGS
- name: Spark SQL Test
env:
SCALA_PROFILE: ${{ matrix.scalaProfile }}
Expand Down
87 changes: 65 additions & 22 deletions hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@
import org.apache.avro.io.JsonEncoder;
import org.apache.avro.specific.SpecificRecordBase;

import org.apache.hadoop.util.VersionUtil;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
Expand Down Expand Up @@ -89,6 +91,7 @@
*/
public class HoodieAvroUtils {

public static final String AVRO_VERSION = Schema.class.getPackage().getImplementationVersion();
private static final ThreadLocal<BinaryEncoder> BINARY_ENCODER = ThreadLocal.withInitial(() -> null);
private static final ThreadLocal<BinaryDecoder> BINARY_DECODER = ThreadLocal.withInitial(() -> null);

Expand Down Expand Up @@ -478,6 +481,32 @@ public static String getRootLevelFieldName(String fieldName) {
return fieldName.split("\\.")[0];
}

/**
* Obtain value of the provided key, which is consistent with avro before 1.10
*/
public static Object getFieldVal(GenericRecord record, String key) {
return getFieldVal(record, key, true);
}

/**
* Obtain value of the provided key, when set returnNullIfNotFound false,
* it is consistent with avro after 1.10
*/
public static Object getFieldVal(GenericRecord record, String key, boolean returnNullIfNotFound) {
if (record.getSchema().getField(key) == null) {
if (returnNullIfNotFound) {
return null;
} else {
// Since avro 1.10, arvo will throw AvroRuntimeException("Not a valid schema field: " + key)
// rather than return null like the previous version if record doesn't contain this key.
// Here we simulate this behavior.
throw new AvroRuntimeException("Not a valid schema field: " + key);
}
} else {
return record.get(key);
}
}

/**
* Obtain value of the provided field as string, denoted by dot notation. e.g: a.b.c
*/
Expand All @@ -492,44 +521,50 @@ public static String getNestedFieldValAsString(GenericRecord record, String fiel
public static Object getNestedFieldVal(GenericRecord record, String fieldName, boolean returnNullIfNotFound, boolean consistentLogicalTimestampEnabled) {
String[] parts = fieldName.split("\\.");
GenericRecord valueNode = record;
int i = 0;
try {
for (; i < parts.length; i++) {
String part = parts[i];
Object val = valueNode.get(part);
if (val == null) {
break;

for (int i = 0; i < parts.length; i++) {
String part = parts[i];
Object val;
try {
val = HoodieAvroUtils.getFieldVal(valueNode, part, returnNullIfNotFound);
} catch (AvroRuntimeException e) {
if (returnNullIfNotFound) {
return null;
} else {
throw new HoodieException(
fieldName + "(Part -" + parts[i] + ") field not found in record. Acceptable fields were :"
+ valueNode.getSchema().getFields().stream().map(Field::name).collect(Collectors.toList()));
}
}

if (i == parts.length - 1) {
// return, if last part of name
if (i == parts.length - 1) {
if (val == null) {
return null;
} else {
Schema fieldSchema = valueNode.getSchema().getField(part).schema();
return convertValueForSpecificDataTypes(fieldSchema, val, consistentLogicalTimestampEnabled);
} else {
// VC: Need a test here
if (!(val instanceof GenericRecord)) {
}
} else {
if (!(val instanceof GenericRecord)) {
if (returnNullIfNotFound) {
return null;
} else {
throw new HoodieException("Cannot find a record at part value :" + part);
}
} else {
valueNode = (GenericRecord) val;
}
}
} catch (AvroRuntimeException e) {
// Since avro 1.10, arvo will throw AvroRuntimeException("Not a valid schema field: " + key)
// rather than return null like the previous version if if record doesn't contain this key.
// So when returnNullIfNotFound is true, catch this exception.
if (!returnNullIfNotFound) {
throw e;
}
}

// This can only be reached if the length of parts is 0
if (returnNullIfNotFound) {
return null;
} else if (valueNode.getSchema().getField(parts[i]) == null) {
} else {
throw new HoodieException(
fieldName + "(Part -" + parts[i] + ") field not found in record. Acceptable fields were :"
fieldName + " field not found in record. Acceptable fields were :"
+ valueNode.getSchema().getFields().stream().map(Field::name).collect(Collectors.toList()));
} else {
throw new HoodieException("The value of " + parts[i] + " can not be null");
}
}

Expand Down Expand Up @@ -1033,4 +1068,12 @@ public GenericRecord next() {
public static GenericRecord rewriteRecordDeep(GenericRecord oldRecord, Schema newSchema) {
return rewriteRecordWithNewSchema(oldRecord, newSchema, Collections.EMPTY_MAP);
}

public static boolean gteqAvro1_9() {
return VersionUtil.compareVersions(AVRO_VERSION, "1.9") >= 0;
}

public static boolean gteqAvro1_10() {
return VersionUtil.compareVersions(AVRO_VERSION, "1.10") >= 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hudi.common.model.debezium;

import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
import org.apache.hudi.common.util.Option;

Expand Down Expand Up @@ -76,7 +77,7 @@ private Option<IndexedRecord> handleDeleteOperation(IndexedRecord insertRecord)
boolean delete = false;
if (insertRecord instanceof GenericRecord) {
GenericRecord record = (GenericRecord) insertRecord;
Object value = record.get(DebeziumConstants.FLATTENED_OP_COL_NAME);
Object value = HoodieAvroUtils.getFieldVal(record, DebeziumConstants.FLATTENED_OP_COL_NAME);
delete = value != null && value.toString().equalsIgnoreCase(DebeziumConstants.DELETE_OP);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,15 @@

import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.testutils.SchemaTestUtil;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.SchemaCompatibilityException;

import org.apache.avro.AvroRuntimeException;
import org.apache.avro.JsonProperties;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Assertions;

Expand Down Expand Up @@ -253,7 +256,12 @@ public void testRemoveFields() {
assertEquals("key1", rec1.get("_row_key"));
assertEquals("val1", rec1.get("non_pii_col"));
assertEquals(3.5, rec1.get("timestamp"));
assertNull(rec1.get("pii_col"));
if (HoodieAvroUtils.gteqAvro1_10()) {
GenericRecord finalRec1 = rec1;
assertThrows(AvroRuntimeException.class, () -> finalRec1.get("pii_col"));
} else {
assertNull(rec1.get("pii_col"));
}
assertEquals(expectedSchema, rec1.getSchema());

// non-partitioned table test with empty list of fields.
Expand Down Expand Up @@ -287,19 +295,58 @@ public void testGetNestedFieldVal() {
assertNull(rowKeyNotExist);

// Field does not exist
try {
HoodieAvroUtils.getNestedFieldVal(rec, "fake_key", false, false);
} catch (Exception e) {
assertEquals("fake_key(Part -fake_key) field not found in record. Acceptable fields were :[timestamp, _row_key, non_pii_col, pii_col]",
e.getMessage());
}
assertEquals("fake_key(Part -fake_key) field not found in record. Acceptable fields were :[timestamp, _row_key, non_pii_col, pii_col]",
assertThrows(HoodieException.class, () ->
HoodieAvroUtils.getNestedFieldVal(rec, "fake_key", false, false)).getMessage());

// Field exist while value not
try {
HoodieAvroUtils.getNestedFieldVal(rec, "timestamp", false, false);
} catch (Exception e) {
assertEquals("The value of timestamp can not be null", e.getMessage());
}
// Field exists while value not
assertNull(HoodieAvroUtils.getNestedFieldVal(rec, "timestamp", false, false));
}

@Test
public void testGetNestedFieldValWithNestedField() {
Schema nestedSchema = new Schema.Parser().parse(SCHEMA_WITH_NESTED_FIELD);
GenericRecord rec = new GenericData.Record(nestedSchema);

// test get .
assertEquals(". field not found in record. Acceptable fields were :[firstname, lastname, student]",
assertThrows(HoodieException.class, () ->
HoodieAvroUtils.getNestedFieldVal(rec, ".", false, false)).getMessage());

// test get fake_key
assertEquals("fake_key(Part -fake_key) field not found in record. Acceptable fields were :[firstname, lastname, student]",
assertThrows(HoodieException.class, () ->
HoodieAvroUtils.getNestedFieldVal(rec, "fake_key", false, false)).getMessage());

// test get student(null)
assertNull(HoodieAvroUtils.getNestedFieldVal(rec, "student", false, false));

// test get student
GenericRecord studentRecord = new GenericData.Record(rec.getSchema().getField("student").schema());
studentRecord.put("firstname", "person");
rec.put("student", studentRecord);
assertEquals(studentRecord, HoodieAvroUtils.getNestedFieldVal(rec, "student", false, false));

// test get student.fake_key
assertEquals("student.fake_key(Part -fake_key) field not found in record. Acceptable fields were :[firstname, lastname]",
assertThrows(HoodieException.class, () ->
HoodieAvroUtils.getNestedFieldVal(rec, "student.fake_key", false, false)).getMessage());

// test get student.firstname
assertEquals("person", HoodieAvroUtils.getNestedFieldVal(rec, "student.firstname", false, false));

// test get student.lastname(null)
assertNull(HoodieAvroUtils.getNestedFieldVal(rec, "student.lastname", false, false));

// test get student.firstname.fake_key
assertEquals("Cannot find a record at part value :firstname",
assertThrows(HoodieException.class, () ->
HoodieAvroUtils.getNestedFieldVal(rec, "student.firstname.fake_key", false, false)).getMessage());

// test get student.lastname(null).fake_key
assertEquals("Cannot find a record at part value :lastname",
assertThrows(HoodieException.class, () ->
HoodieAvroUtils.getNestedFieldVal(rec, "student.lastname.fake_key", false, false)).getMessage());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2347,7 +2347,7 @@ public void testDataBlockFormatAppendAndReadWithProjectedSchema(
new HashMap<HoodieLogBlockType, Integer>() {{
put(HoodieLogBlockType.AVRO_DATA_BLOCK, 0); // not supported
put(HoodieLogBlockType.HFILE_DATA_BLOCK, 0); // not supported
put(HoodieLogBlockType.PARQUET_DATA_BLOCK, 2605);
put(HoodieLogBlockType.PARQUET_DATA_BLOCK, HoodieAvroUtils.gteqAvro1_9() ? 2593 : 2605);
}};

List<IndexedRecord> recordsRead = getRecords(dataBlockRead);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,10 +181,10 @@ public void testDeletedRecord() throws IOException {
@Test
public void testNullColumn() throws IOException {
Schema avroSchema = Schema.createRecord(Arrays.asList(
new Schema.Field("id", Schema.createUnion(Schema.create(Schema.Type.STRING), Schema.create(Schema.Type.NULL)), "", JsonProperties.NULL_VALUE),
new Schema.Field("name", Schema.createUnion(Schema.create(Schema.Type.STRING), Schema.create(Schema.Type.NULL)), "", JsonProperties.NULL_VALUE),
new Schema.Field("age", Schema.createUnion(Schema.create(Schema.Type.STRING), Schema.create(Schema.Type.NULL)), "", JsonProperties.NULL_VALUE),
new Schema.Field("job", Schema.createUnion(Schema.create(Schema.Type.STRING), Schema.create(Schema.Type.NULL)), "", JsonProperties.NULL_VALUE)
new Schema.Field("id", Schema.createUnion(Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.STRING)), "", JsonProperties.NULL_VALUE),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why we need to change this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After avro 1.10, the defaultValue is checked when the Schema is initialized, and here are the rules of the union

Unions, as mentioned above, are represented using JSON arrays. For example, ["null", "string"] declares a schema which may be either a null or string.
(Note that when a default value is specified for a record field whose type is a union, the type of the default value must match the first element of the union. Thus, for unions containing “null”, the “null” is usually listed first, since the default value of such unions is typically null.)

new Schema.Field("name", Schema.createUnion(Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.STRING)), "", JsonProperties.NULL_VALUE),
new Schema.Field("age", Schema.createUnion(Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.STRING)), "", JsonProperties.NULL_VALUE),
new Schema.Field("job", Schema.createUnion(Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.STRING)), "", JsonProperties.NULL_VALUE)
));
GenericRecord record1 = new GenericData.Record(avroSchema);
record1.put("id", "1");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public class TestPartialUpdateAvroPayload {
+ " {\"name\": \"id\", \"type\": [\"null\", \"string\"]},\n"
+ " {\"name\": \"partition\", \"type\": [\"null\", \"string\"]},\n"
+ " {\"name\": \"ts\", \"type\": [\"null\", \"long\"]},\n"
+ " {\"name\": \"_hoodie_is_deleted\", \"type\": [\"null\", \"boolean\"], \"default\":false},\n"
+ " {\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false},\n"
+ " {\"name\": \"city\", \"type\": [\"null\", \"string\"]},\n"
+ " {\"name\": \"child\", \"type\": [\"null\", {\"type\": \"array\", \"items\": \"string\"}]}\n"
+ " ]\n"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.hudi.common.util;

import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.model.HoodieRecord;

import org.apache.avro.Schema;
Expand Down Expand Up @@ -73,7 +74,10 @@ public void testGetObjectSize() {
assertEquals(32, getObjectSize(emptyClass));
assertEquals(40, getObjectSize(stringClass));
assertEquals(40, getObjectSize(payloadClass));
assertEquals(1240, getObjectSize(Schema.create(Schema.Type.STRING)));
// Since avro 1.9, Schema use ConcurrentHashMap instead of LinkedHashMap to
// implement props, which will change the size of the object.
assertEquals(HoodieAvroUtils.gteqAvro1_9() ? 1320 : 1240,
getObjectSize(Schema.create(Schema.Type.STRING)));
assertEquals(104, getObjectSize(person));
}

Expand Down