Skip to content

Commit

Permalink
[HUDI-5206] RowColumnReader should not return null value for certain …
Browse files Browse the repository at this point in the history
…null child columns (#7194)
  • Loading branch information
SteNicholas authored Nov 15, 2022
1 parent 5ac605c commit 3b8df4b
Show file tree
Hide file tree
Showing 8 changed files with 85 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1337,6 +1337,32 @@ void testParquetComplexNestedRowTypes(String operation) {
assertRowsEquals(result, expected);
}

@ParameterizedTest
@ValueSource(strings = {"insert", "upsert", "bulk_insert"})
void testParquetNullChildColumnsRowTypes(String operation) {
TableEnvironment tableEnv = batchTableEnv;

String hoodieTableDDL = sql("t1")
.field("f_int int")
.field("f_row row(f_row_f0 int, f_row_f1 varchar(10))")
.pkField("f_int")
.noPartition()
.option(FlinkOptions.PATH, tempFile.getAbsolutePath())
.option(FlinkOptions.OPERATION, operation)
.end();
tableEnv.executeSql(hoodieTableDDL);

execInsertSql(tableEnv, TestSQL.NULL_CHILD_COLUMNS_ROW_TYPE_INSERT_T1);

List<Row> result = CollectionUtil.iterableToList(
() -> tableEnv.sqlQuery("select * from t1").execute().collect());
final String expected = "["
+ "+I[1, +I[null, abc1]], "
+ "+I[2, +I[2, null]], "
+ "+I[3, null]]";
assertRowsEquals(result, expected);
}

@ParameterizedTest
@ValueSource(strings = {"insert", "upsert", "bulk_insert"})
void testBuiltinFunctionWithCatalog(String operation) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,11 @@ private TestSQL() {
+ "(2, array['abc2', 'def2'], array[2, 2], map['abc2', 1, 'def2', 3], row(array['abc2', 'def2'], row(2, 'abc2'))),\n"
+ "(3, array['abc3', 'def3'], array[3, 3], map['abc3', 1, 'def3', 3], row(array['abc3', 'def3'], row(3, 'abc3')))";

public static final String NULL_CHILD_COLUMNS_ROW_TYPE_INSERT_T1 = "insert into t1 values\n"
+ "(1, row(cast(null as int), 'abc1')),\n"
+ "(2, row(2, cast(null as varchar))),\n"
+ "(3, row(cast(null as int), cast(null as varchar)))";

public static final String INSERT_DATE_PARTITION_T1 = "insert into t1 values\n"
+ "('id1','Danny',23,DATE '1970-01-01'),\n"
+ "('id2','Stephen',33,DATE '1970-01-01'),\n"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,12 @@ public ColumnarRowData getRow(int i) {
columnarRowData.setRowId(i);
return columnarRowData;
}

@Override
public void reset() {
super.reset();
for (WritableColumnVector vector : vectors) {
vector.reset();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,20 @@ public RowColumnReader(List<ColumnReader> fieldReaders) {
public void readToVector(int readNumber, WritableColumnVector vector) throws IOException {
HeapRowColumnVector rowColumnVector = (HeapRowColumnVector) vector;
WritableColumnVector[] vectors = rowColumnVector.vectors;
// row vector null array
boolean[] isNulls = new boolean[readNumber];
for (int i = 0; i < vectors.length; i++) {
fieldReaders.get(i).readToVector(readNumber, vectors[i]);

for (int j = 0; j < readNumber; j++) {
boolean isNull = (i == 0)
? vectors[i].isNullAt(j)
: rowColumnVector.isNullAt(j) && vectors[i].isNullAt(j);
if (isNull) {
if (i == 0) {
isNulls[j] = vectors[i].isNullAt(j);
} else {
isNulls[j] = isNulls[j] && vectors[i].isNullAt(j);
}
if (i == vectors.length - 1 && isNulls[j]) {
// rowColumnVector[j] is null only when all fields[j] of rowColumnVector[j] is
// null
rowColumnVector.setNullAt(j);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,12 @@ public ColumnarRowData getRow(int i) {
columnarRowData.setRowId(i);
return columnarRowData;
}

@Override
public void reset() {
super.reset();
for (WritableColumnVector vector : vectors) {
vector.reset();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,20 @@ public RowColumnReader(List<ColumnReader> fieldReaders) {
public void readToVector(int readNumber, WritableColumnVector vector) throws IOException {
HeapRowColumnVector rowColumnVector = (HeapRowColumnVector) vector;
WritableColumnVector[] vectors = rowColumnVector.vectors;
// row vector null array
boolean[] isNulls = new boolean[readNumber];
for (int i = 0; i < vectors.length; i++) {
fieldReaders.get(i).readToVector(readNumber, vectors[i]);

for (int j = 0; j < readNumber; j++) {
boolean isNull = (i == 0)
? vectors[i].isNullAt(j)
: rowColumnVector.isNullAt(j) && vectors[i].isNullAt(j);
if (isNull) {
if (i == 0) {
isNulls[j] = vectors[i].isNullAt(j);
} else {
isNulls[j] = isNulls[j] && vectors[i].isNullAt(j);
}
if (i == vectors.length - 1 && isNulls[j]) {
// rowColumnVector[j] is null only when all fields[j] of rowColumnVector[j] is
// null
rowColumnVector.setNullAt(j);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,12 @@ public ColumnarRowData getRow(int i) {
columnarRowData.setRowId(i);
return columnarRowData;
}

@Override
public void reset() {
super.reset();
for (WritableColumnVector vector : vectors) {
vector.reset();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,20 @@ public RowColumnReader(List<ColumnReader> fieldReaders) {
public void readToVector(int readNumber, WritableColumnVector vector) throws IOException {
HeapRowColumnVector rowColumnVector = (HeapRowColumnVector) vector;
WritableColumnVector[] vectors = rowColumnVector.vectors;
// row vector null array
boolean[] isNulls = new boolean[readNumber];
for (int i = 0; i < vectors.length; i++) {
fieldReaders.get(i).readToVector(readNumber, vectors[i]);

for (int j = 0; j < readNumber; j++) {
boolean isNull = (i == 0)
? vectors[i].isNullAt(j)
: rowColumnVector.isNullAt(j) && vectors[i].isNullAt(j);
if (isNull) {
if (i == 0) {
isNulls[j] = vectors[i].isNullAt(j);
} else {
isNulls[j] = isNulls[j] && vectors[i].isNullAt(j);
}
if (i == vectors.length - 1 && isNulls[j]) {
// rowColumnVector[j] is null only when all fields[j] of rowColumnVector[j] is
// null
rowColumnVector.setNullAt(j);
}
}
Expand Down

0 comments on commit 3b8df4b

Please sign in to comment.