Skip to content

Commit

Permalink
[HUDI-5206] RowColumnReader should not return null value for certain …
Browse files Browse the repository at this point in the history
…null child columns
  • Loading branch information
SteNicholas committed Nov 14, 2022
1 parent 5ac605c commit 31bab1d
Show file tree
Hide file tree
Showing 8 changed files with 58 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1303,8 +1303,8 @@ void testParquetComplexTypes(String operation) {
() -> tableEnv.sqlQuery("select * from t1").execute().collect());
final String expected = "["
+ "+I[1, [abc1, def1], {abc1=1, def1=3}, +I[1, abc1]], "
+ "+I[2, [abc2, def2], {def2=3, abc2=1}, +I[2, abc2]], "
+ "+I[3, [abc3, def3], {def3=3, abc3=1}, +I[3, abc3]]]";
+ "+I[2, [abc2, def2], {def2=3, abc2=1}, +I[null, abc2]], "
+ "+I[3, [abc3, def3], {def3=3, abc3=1}, +I[3, null]]]";
assertRowsEquals(result, expected);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ private TestSQL() {

public static final String COMPLEX_TYPE_INSERT_T1 = "insert into t1 values\n"
+ "(1, array['abc1', 'def1'], map['abc1', 1, 'def1', 3], row(1, 'abc1')),\n"
+ "(2, array['abc2', 'def2'], map['abc2', 1, 'def2', 3], row(2, 'abc2')),\n"
+ "(3, array['abc3', 'def3'], map['abc3', 1, 'def3', 3], row(3, 'abc3'))";
+ "(2, array['abc2', 'def2'], map['abc2', 1, 'def2', 3], row(cast(null as int), 'abc2')),\n"
+ "(3, array['abc3', 'def3'], map['abc3', 1, 'def3', 3], row(3, cast(null as varchar)))";

public static final String COMPLEX_NESTED_ROW_TYPE_INSERT_T1 = "insert into t1 values\n"
+ "(1, array['abc1', 'def1'], array[1, 1], map['abc1', 1, 'def1', 3], row(array['abc1', 'def1'], row(1, 'abc1'))),\n"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,12 @@ public ColumnarRowData getRow(int i) {
columnarRowData.setRowId(i);
return columnarRowData;
}

@Override
public void reset() {
super.reset();
for (WritableColumnVector vector : vectors) {
vector.reset();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,20 @@ public RowColumnReader(List<ColumnReader> fieldReaders) {
public void readToVector(int readNumber, WritableColumnVector vector) throws IOException {
HeapRowColumnVector rowColumnVector = (HeapRowColumnVector) vector;
WritableColumnVector[] vectors = rowColumnVector.vectors;
// row vector null array
boolean[] isNulls = new boolean[readNumber];
for (int i = 0; i < vectors.length; i++) {
fieldReaders.get(i).readToVector(readNumber, vectors[i]);

for (int j = 0; j < readNumber; j++) {
boolean isNull = (i == 0)
? vectors[i].isNullAt(j)
: rowColumnVector.isNullAt(j) && vectors[i].isNullAt(j);
if (isNull) {
if (i == 0) {
isNulls[j] = vectors[i].isNullAt(j);
} else {
isNulls[j] = isNulls[j] && vectors[i].isNullAt(j);
}
if (i == vectors.length - 1 && isNulls[j]) {
// rowColumnVector[j] is null only when all fields[j] of rowColumnVector[j] is
// null
rowColumnVector.setNullAt(j);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,12 @@ public ColumnarRowData getRow(int i) {
columnarRowData.setRowId(i);
return columnarRowData;
}

@Override
public void reset() {
super.reset();
for (WritableColumnVector vector : vectors) {
vector.reset();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,20 @@ public RowColumnReader(List<ColumnReader> fieldReaders) {
public void readToVector(int readNumber, WritableColumnVector vector) throws IOException {
HeapRowColumnVector rowColumnVector = (HeapRowColumnVector) vector;
WritableColumnVector[] vectors = rowColumnVector.vectors;
// row vector null array
boolean[] isNulls = new boolean[readNumber];
for (int i = 0; i < vectors.length; i++) {
fieldReaders.get(i).readToVector(readNumber, vectors[i]);

for (int j = 0; j < readNumber; j++) {
boolean isNull = (i == 0)
? vectors[i].isNullAt(j)
: rowColumnVector.isNullAt(j) && vectors[i].isNullAt(j);
if (isNull) {
if (i == 0) {
isNulls[j] = vectors[i].isNullAt(j);
} else {
isNulls[j] = isNulls[j] && vectors[i].isNullAt(j);
}
if (i == vectors.length - 1 && isNulls[j]) {
// rowColumnVector[j] is null only when all fields[j] of rowColumnVector[j] is
// null
rowColumnVector.setNullAt(j);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,12 @@ public ColumnarRowData getRow(int i) {
columnarRowData.setRowId(i);
return columnarRowData;
}

@Override
public void reset() {
super.reset();
for (WritableColumnVector vector : vectors) {
vector.reset();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,20 @@ public RowColumnReader(List<ColumnReader> fieldReaders) {
public void readToVector(int readNumber, WritableColumnVector vector) throws IOException {
HeapRowColumnVector rowColumnVector = (HeapRowColumnVector) vector;
WritableColumnVector[] vectors = rowColumnVector.vectors;
// row vector null array
boolean[] isNulls = new boolean[readNumber];
for (int i = 0; i < vectors.length; i++) {
fieldReaders.get(i).readToVector(readNumber, vectors[i]);

for (int j = 0; j < readNumber; j++) {
boolean isNull = (i == 0)
? vectors[i].isNullAt(j)
: rowColumnVector.isNullAt(j) && vectors[i].isNullAt(j);
if (isNull) {
if (i == 0) {
isNulls[j] = vectors[i].isNullAt(j);
} else {
isNulls[j] = isNulls[j] && vectors[i].isNullAt(j);
}
if (i == vectors.length - 1 && isNulls[j]) {
// rowColumnVector[j] is null only when all fields[j] of rowColumnVector[j] is
// null
rowColumnVector.setNullAt(j);
}
}
Expand Down

0 comments on commit 31bab1d

Please sign in to comment.