diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java index 865a717e08a67..364aeafd2c1b7 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java @@ -1303,8 +1303,8 @@ void testParquetComplexTypes(String operation) { () -> tableEnv.sqlQuery("select * from t1").execute().collect()); final String expected = "[" + "+I[1, [abc1, def1], {abc1=1, def1=3}, +I[1, abc1]], " - + "+I[2, [abc2, def2], {def2=3, abc2=1}, +I[2, abc2]], " - + "+I[3, [abc3, def3], {def3=3, abc3=1}, +I[3, abc3]]]"; + + "+I[2, [abc2, def2], {def2=3, abc2=1}, +I[2, null]], " + + "+I[3, [abc3, def3], {def3=3, abc3=1}, null]]"; assertRowsEquals(result, expected); } diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestSQL.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestSQL.java index b109fee0fff2a..a4a088016b1e1 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestSQL.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestSQL.java @@ -54,8 +54,8 @@ private TestSQL() { public static final String COMPLEX_TYPE_INSERT_T1 = "insert into t1 values\n" + "(1, array['abc1', 'def1'], map['abc1', 1, 'def1', 3], row(1, 'abc1')),\n" - + "(2, array['abc2', 'def2'], map['abc2', 1, 'def2', 3], row(2, 'abc2')),\n" - + "(3, array['abc3', 'def3'], map['abc3', 1, 'def3', 3], row(3, 'abc3'))"; + + "(2, array['abc2', 'def2'], map['abc2', 1, 'def2', 3], row(2, cast(null as varchar))),\n" + + "(3, array['abc3', 'def3'], map['abc3', 1, 'def3', 3], row(cast(null as int), cast(null as varchar)))"; public static final String COMPLEX_NESTED_ROW_TYPE_INSERT_T1 = "insert into t1 values\n" + "(1, array['abc1', 'def1'], array[1, 1], map['abc1', 1, 'def1', 3], row(array['abc1', 'def1'], row(1, 'abc1'))),\n" diff --git a/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapRowColumnVector.java b/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapRowColumnVector.java index 132b48f139c22..03da9205d313e 100644 --- a/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapRowColumnVector.java +++ b/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapRowColumnVector.java @@ -44,4 +44,12 @@ public ColumnarRowData getRow(int i) { columnarRowData.setRowId(i); return columnarRowData; } + + @Override + public void reset() { + super.reset(); + for (WritableColumnVector vector : vectors) { + vector.reset(); + } + } } diff --git a/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/RowColumnReader.java b/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/RowColumnReader.java index 39ebb90ee6c1a..524c00f402d47 100644 --- a/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/RowColumnReader.java +++ b/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/RowColumnReader.java @@ -41,14 +41,20 @@ public RowColumnReader(List fieldReaders) { public void readToVector(int readNumber, WritableColumnVector vector) throws IOException { HeapRowColumnVector rowColumnVector = (HeapRowColumnVector) vector; WritableColumnVector[] vectors = rowColumnVector.vectors; + // row vector null array + boolean[] isNulls = new boolean[readNumber]; for (int i = 0; i < vectors.length; i++) { fieldReaders.get(i).readToVector(readNumber, vectors[i]); for (int j = 0; j < readNumber; j++) { - boolean isNull = (i == 0) - ? vectors[i].isNullAt(j) - : rowColumnVector.isNullAt(j) && vectors[i].isNullAt(j); - if (isNull) { + if (i == 0) { + isNulls[j] = vectors[i].isNullAt(j); + } else { + isNulls[j] = isNulls[j] && vectors[i].isNullAt(j); + } + if (i == vectors.length - 1 && isNulls[j]) { + // rowColumnVector[j] is null only when all fields[j] of rowColumnVector[j] is + // null rowColumnVector.setNullAt(j); } } diff --git a/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapRowColumnVector.java b/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapRowColumnVector.java index 0193e6cbb1d22..53a1eee68cd2a 100644 --- a/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapRowColumnVector.java +++ b/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapRowColumnVector.java @@ -43,4 +43,12 @@ public ColumnarRowData getRow(int i) { columnarRowData.setRowId(i); return columnarRowData; } + + @Override + public void reset() { + super.reset(); + for (WritableColumnVector vector : vectors) { + vector.reset(); + } + } } diff --git a/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/RowColumnReader.java b/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/RowColumnReader.java index 39ebb90ee6c1a..524c00f402d47 100644 --- a/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/RowColumnReader.java +++ b/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/RowColumnReader.java @@ -41,14 +41,20 @@ public RowColumnReader(List fieldReaders) { public void readToVector(int readNumber, WritableColumnVector vector) throws IOException { HeapRowColumnVector rowColumnVector = (HeapRowColumnVector) vector; WritableColumnVector[] vectors = rowColumnVector.vectors; + // row vector null array + boolean[] isNulls = new boolean[readNumber]; for (int i = 0; i < vectors.length; i++) { fieldReaders.get(i).readToVector(readNumber, vectors[i]); for (int j = 0; j < readNumber; j++) { - boolean isNull = (i == 0) - ? vectors[i].isNullAt(j) - : rowColumnVector.isNullAt(j) && vectors[i].isNullAt(j); - if (isNull) { + if (i == 0) { + isNulls[j] = vectors[i].isNullAt(j); + } else { + isNulls[j] = isNulls[j] && vectors[i].isNullAt(j); + } + if (i == vectors.length - 1 && isNulls[j]) { + // rowColumnVector[j] is null only when all fields[j] of rowColumnVector[j] is + // null rowColumnVector.setNullAt(j); } } diff --git a/hudi-flink-datasource/hudi-flink1.15.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapRowColumnVector.java b/hudi-flink-datasource/hudi-flink1.15.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapRowColumnVector.java index 8d4031251d5a2..ae194e4e6ab05 100644 --- a/hudi-flink-datasource/hudi-flink1.15.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapRowColumnVector.java +++ b/hudi-flink-datasource/hudi-flink1.15.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapRowColumnVector.java @@ -43,4 +43,12 @@ public ColumnarRowData getRow(int i) { columnarRowData.setRowId(i); return columnarRowData; } + + @Override + public void reset() { + super.reset(); + for (WritableColumnVector vector : vectors) { + vector.reset(); + } + } } diff --git a/hudi-flink-datasource/hudi-flink1.15.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/RowColumnReader.java b/hudi-flink-datasource/hudi-flink1.15.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/RowColumnReader.java index 8d6a8cc52dbec..79b50487f13c1 100644 --- a/hudi-flink-datasource/hudi-flink1.15.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/RowColumnReader.java +++ b/hudi-flink-datasource/hudi-flink1.15.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/RowColumnReader.java @@ -41,14 +41,20 @@ public RowColumnReader(List fieldReaders) { public void readToVector(int readNumber, WritableColumnVector vector) throws IOException { HeapRowColumnVector rowColumnVector = (HeapRowColumnVector) vector; WritableColumnVector[] vectors = rowColumnVector.vectors; + // row vector null array + boolean[] isNulls = new boolean[readNumber]; for (int i = 0; i < vectors.length; i++) { fieldReaders.get(i).readToVector(readNumber, vectors[i]); for (int j = 0; j < readNumber; j++) { - boolean isNull = (i == 0) - ? vectors[i].isNullAt(j) - : rowColumnVector.isNullAt(j) && vectors[i].isNullAt(j); - if (isNull) { + if (i == 0) { + isNulls[j] = vectors[i].isNullAt(j); + } else { + isNulls[j] = isNulls[j] && vectors[i].isNullAt(j); + } + if (i == vectors.length - 1 && isNulls[j]) { + // rowColumnVector[j] is null only when all fields[j] of rowColumnVector[j] is + // null rowColumnVector.setNullAt(j); } }