Skip to content

Commit

Permalink
[HUDI-4706] Fix InternalSchemaChangeApplier#applyAddChange error to a…
Browse files Browse the repository at this point in the history
…dd nest type (apache#6486)

InternalSchemaChangeApplier#applyAddChange forget to remove parent name when calling ColumnAddChange#addColumns
  • Loading branch information
wzx140 authored and fengjian committed Apr 5, 2023
1 parent ea82cf8 commit 024b7fe
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ public InternalSchema applyAddChange(
TableChange.ColumnPositionChange.ColumnPositionType positionType) {
TableChanges.ColumnAddChange add = TableChanges.ColumnAddChange.get(latestSchema);
String parentName = TableChangesHelper.getParentName(colName);
add.addColumns(parentName, colName, colType, doc);
String leafName = TableChangesHelper.getLeafName(colName);
add.addColumns(parentName, leafName, colType, doc);
if (positionType != null) {
switch (positionType) {
case NO_OPERATION:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,4 +76,9 @@ public static String getParentName(String fullColName) {
int offset = fullColName.lastIndexOf(".");
return offset > 0 ? fullColName.substring(0, offset) : "";
}

public static String getLeafName(String fullColName) {
int offset = fullColName.lastIndexOf(".");
return offset > 0 ? fullColName.substring(offset + 1) : fullColName;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,11 @@

import org.apache.hudi.internal.schema.HoodieSchemaException;
import org.apache.hudi.internal.schema.InternalSchema;
import org.apache.hudi.internal.schema.Type;
import org.apache.hudi.internal.schema.Types;

import org.apache.hudi.internal.schema.Types.StringType;
import org.apache.hudi.internal.schema.action.TableChange.ColumnPositionChange.ColumnPositionType;
import org.apache.hudi.internal.schema.utils.SchemaChangeUtils;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Assertions;
Expand Down Expand Up @@ -225,5 +228,88 @@ public void testNestUpdate() {
);
Assertions.assertEquals(newSchema.getRecord(), checkSchema.getRecord());
}

@Test
public void testChangeApplier() {
// We add test here to verify the logic of applyAddChange and applyReOrderColPositionChange
InternalSchema oldSchema = new InternalSchema(Types.Field.get(0, false, "id", Types.IntType.get()),
Types.Field.get(1, true, "data", Types.StringType.get()),
Types.Field.get(2, true, "preferences",
Types.RecordType.get(Types.Field.get(7, false, "feature1",
Types.BooleanType.get()), Types.Field.get(8, true, "feature2", Types.BooleanType.get()))),
Types.Field.get(3, false, "locations", Types.MapType.get(9, 10, Types.StringType.get(),
Types.RecordType.get(Types.Field.get(11, false, "lat", Types.FloatType.get()), Types.Field.get(12, false, "long", Types.FloatType.get())), false)),
Types.Field.get(4, true, "points", Types.ArrayType.get(13, true,
Types.RecordType.get(Types.Field.get(14, false, "x", Types.LongType.get()), Types.Field.get(15, false, "y", Types.LongType.get())))),
Types.Field.get(5, false,"doubles", Types.ArrayType.get(16, false, Types.DoubleType.get())),
Types.Field.get(6, true, "properties", Types.MapType.get(17, 18, Types.StringType.get(), Types.StringType.get()))
);

// add c1 first
InternalSchema newSchema = addOperationForSchemaChangeApplier(oldSchema, "c1", StringType.get(), "add c1 first",
"id", ColumnPositionType.BEFORE);
//add preferences.cx before preferences.feature2
newSchema = addOperationForSchemaChangeApplier(newSchema, "preferences.cx", Types.BooleanType.get(), "add preferences.cx before preferences.feature2",
"preferences.feature2", ColumnPositionType.BEFORE);
// check repeated add.
InternalSchema currSchema = newSchema;
Assertions.assertThrows(HoodieSchemaException.class, () -> addOperationForSchemaChangeApplier(currSchema, "preferences.cx", Types.BooleanType.get(),
"add preferences.cx before preferences.feature2"));
// add locations.value.lax before locations.value.long
newSchema = addOperationForSchemaChangeApplier(newSchema, "locations.value.lax", Types.BooleanType.get(), "add locations.value.lax before locations.value.long");
newSchema = reOrderOperationForSchemaChangeApplier(newSchema, "locations.value.lax", "locations.value.long", ColumnPositionType.BEFORE);
//
// add points.element.z after points.element.y
newSchema = addOperationForSchemaChangeApplier(newSchema, "points.element.z", Types.BooleanType.get(), "add points.element.z after points.element.y", "points.element.y", ColumnPositionType.AFTER);
InternalSchema checkedSchema = new InternalSchema(
Types.Field.get(19, true, "c1", Types.StringType.get(), "add c1 first"),
Types.Field.get(0, false, "id", Types.IntType.get()),
Types.Field.get(1, true, "data", Types.StringType.get()),
Types.Field.get(2, true, "preferences",
Types.RecordType.get(Types.Field.get(7, false, "feature1", Types.BooleanType.get()),
Types.Field.get(20, true, "cx", Types.BooleanType.get(), "add preferences.cx before preferences.feature2"),
Types.Field.get(8, true, "feature2", Types.BooleanType.get()))),
Types.Field.get(3, false, "locations", Types.MapType.get(9, 10, Types.StringType.get(),
Types.RecordType.get(Types.Field.get(11, false, "lat", Types.FloatType.get()),
Types.Field.get(21, true, "lax", Types.BooleanType.get(), "add locations.value.lax before locations.value.long"),
Types.Field.get(12, false, "long", Types.FloatType.get())), false)),
Types.Field.get(4, true, "points", Types.ArrayType.get(13, true,
Types.RecordType.get(Types.Field.get(14, false, "x", Types.LongType.get()),
Types.Field.get(15, false, "y", Types.LongType.get()),
Types.Field.get(22, true, "z", Types.BooleanType.get(), "add points.element.z after points.element.y")))),
Types.Field.get(5, false,"doubles", Types.ArrayType.get(16, false, Types.DoubleType.get())),
Types.Field.get(6, true, "properties", Types.MapType.get(17, 18, Types.StringType.get(), Types.StringType.get()))
);
Assertions.assertEquals(newSchema.getRecord(), checkedSchema.getRecord());
}

private static InternalSchema addOperationForSchemaChangeApplier(
InternalSchema schema,
String colName,
Type colType,
String doc,
String position,
TableChange.ColumnPositionChange.ColumnPositionType positionType) {
InternalSchemaChangeApplier applier = new InternalSchemaChangeApplier(schema);
return applier.applyAddChange(colName, colType, doc, position, positionType);
}

private static InternalSchema reOrderOperationForSchemaChangeApplier(
InternalSchema schema,
String colName,
String position,
TableChange.ColumnPositionChange.ColumnPositionType positionType) {
InternalSchemaChangeApplier applier = new InternalSchemaChangeApplier(schema);
return applier.applyReOrderColPositionChange(colName, position, positionType);
}

private static InternalSchema addOperationForSchemaChangeApplier(
InternalSchema schema,
String colName,
Type colType,
String doc) {
return addOperationForSchemaChangeApplier(schema, colName, colType, doc, "",
ColumnPositionType.NO_OPERATION);
}
}

0 comments on commit 024b7fe

Please sign in to comment.