diff --git a/arrow-data-source/common/src/main/java/com/intel/oap/vectorized/ArrowWritableColumnVector.java b/arrow-data-source/common/src/main/java/com/intel/oap/vectorized/ArrowWritableColumnVector.java index a3553f985..107a4343d 100644 --- a/arrow-data-source/common/src/main/java/com/intel/oap/vectorized/ArrowWritableColumnVector.java +++ b/arrow-data-source/common/src/main/java/com/intel/oap/vectorized/ArrowWritableColumnVector.java @@ -287,7 +287,11 @@ private ArrowVectorWriter createVectorWriter(ValueVector vector) { return new TimestampMicroWriter((TimeStampVector) vector); } else if (vector instanceof MapVector) { MapVector mapVector = (MapVector) vector; - return new MapWriter(mapVector); + final StructVector structVector = (StructVector) mapVector.getDataVector(); + final FieldVector keyChild = structVector.getChild(MapVector.KEY_NAME); + final FieldVector valueChild = structVector.getChild(MapVector.VALUE_NAME); + return new MapWriter(mapVector, createVectorWriter(keyChild), + createVectorWriter(valueChild)); } else if (vector instanceof ListVector) { ListVector listVector = (ListVector) vector; ArrowVectorWriter elementVector = createVectorWriter(listVector.getDataVector()); @@ -1949,8 +1953,36 @@ void setNotNull(int rowId) { } private static class MapWriter extends ArrowVectorWriter { - MapWriter(ValueVector vector) { + private final MapVector mapVector; + private final ArrowVectorWriter keyWriter; + private final ArrowVectorWriter valueWriter; + + MapWriter(MapVector vector, ArrowVectorWriter keyWriter, ArrowVectorWriter valueWriter) { super(vector); + this.mapVector = vector; + this.keyWriter = keyWriter; + this.valueWriter = valueWriter; + } + + public ArrowVectorWriter getKeyWriter() { + return keyWriter; + } + + public ArrowVectorWriter getValueWriter() { + return valueWriter; + } + + @Override + void setArray(int rowId, int offset, int length) { + int index = rowId * ListVector.OFFSET_WIDTH; + mapVector.getOffsetBuffer().setInt(index, offset); + mapVector.getOffsetBuffer().setInt(index + ListVector.OFFSET_WIDTH, offset + length); + mapVector.setNotNull(rowId); + } + + @Override + final void setNull(int rowId) { + mapVector.setNull(rowId); } } } diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ArrowRowToColumnarExec.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ArrowRowToColumnarExec.scala index 27d124f1c..085856e26 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ArrowRowToColumnarExec.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ArrowRowToColumnarExec.scala @@ -63,7 +63,7 @@ class ArrowRowToColumnarExec(child: SparkPlan) extends RowToColumnarExec(child = case d: DecimalType => case d: TimestampType => case d: BinaryType => - case d: ArrayType => + case d: ArrayType => ConverterUtils.checkIfTypeSupported(d.elementType) case _ => throw new UnsupportedOperationException(s"${field.dataType} " + s"is not supported in ArrowRowToColumnarExec.") diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ConverterUtils.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ConverterUtils.scala index 7ad56f582..c380e8659 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ConverterUtils.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/expression/ConverterUtils.scala @@ -56,6 +56,7 @@ import java.io.{InputStream, OutputStream} import java.util import java.util.concurrent.TimeUnit.SECONDS +import org.apache.arrow.vector.complex.MapVector import org.apache.arrow.vector.types.TimeUnit import org.apache.arrow.vector.types.pojo.ArrowType import org.apache.arrow.vector.types.pojo.ArrowType.ArrowTypeID @@ -522,6 +523,7 @@ object ConverterUtils extends Logging { for ( structField <- d.fields ) { checkIfTypeSupported(structField.dataType) } + case d: MapType => case d: BooleanType => case d: ByteType => case d: ShortType => @@ -537,26 +539,35 @@ object ConverterUtils extends Logging { throw new UnsupportedOperationException(s"Unsupported data type: $dt") } - def createArrowField(name: String, dt: DataType): Field = dt match { + def createArrowField(name: String, dt: DataType, nullable: Boolean = true): Field = dt match { case at: ArrayType => new Field( name, - FieldType.nullable(ArrowType.List.INSTANCE), + new FieldType(nullable, ArrowType.List.INSTANCE, null), Lists.newArrayList(createArrowField(s"${name}_${dt}", at.elementType))) - case mt: MapType => - throw new UnsupportedOperationException(s"${dt} is not supported yet") case st: StructType => val fieldlist = new util.ArrayList[Field] var structField = null for ( structField <- st.fields ) { - fieldlist.add(createArrowField(structField.name, structField.dataType)) + fieldlist.add(createArrowField(structField.name, structField.dataType, structField.nullable)) } new Field( name, - FieldType.nullable(ArrowType.Struct.INSTANCE), + new FieldType(nullable, ArrowType.Struct.INSTANCE, null), fieldlist) + case mt: MapType => + // Note: Map Type struct can not be null, Struct Type key field can not be null + new Field( + name, + new FieldType(nullable, new ArrowType.Map(false), null), + Lists.newArrayList(createArrowField(MapVector.DATA_VECTOR_NAME, + new StructType() + .add(MapVector.KEY_NAME, mt.keyType, false) + .add(MapVector.VALUE_NAME, mt.valueType, mt.valueContainsNull), + nullable = false + ))) case _ => - Field.nullable(name, CodeGeneration.getResultType(dt)) + new Field (name, new FieldType(nullable, CodeGeneration.getResultType(dt), null), null) } def createArrowField(attr: Attribute): Field = diff --git a/native-sql-engine/core/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala b/native-sql-engine/core/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala index d02dd0262..3b4ed52de 100644 --- a/native-sql-engine/core/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala +++ b/native-sql-engine/core/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala @@ -86,13 +86,30 @@ case class ColumnarShuffleExchangeExec( // check input datatype for (attr <- child.output) { try { - ConverterUtils.checkIfNestTypeSupported(attr.dataType) + ConverterUtils.createArrowField(attr) } catch { case e: UnsupportedOperationException => throw new UnsupportedOperationException( s"${attr.dataType} is not supported in ColumnarShuffledExchangeExec.") } } + + // Check partitioning keys + outputPartitioning match { + case HashPartitioning(exprs, n) => + exprs.zipWithIndex.foreach { + case (expr, i) => + val attr = ConverterUtils.getAttrFromExpr(expr) + try { + ConverterUtils.checkIfTypeSupported(attr.dataType) + } catch { + case e: UnsupportedOperationException => + throw new UnsupportedOperationException( + s"${attr.dataType} is not supported in ColumnarShuffledExchangeExec Partitioning.") + } + } + case _ => + } } val serializer: Serializer = new ArrowColumnarBatchSerializer( diff --git a/native-sql-engine/core/src/test/scala/org/apache/spark/shuffle/ColumnarShuffleSQLSuite.scala b/native-sql-engine/core/src/test/scala/org/apache/spark/shuffle/ColumnarShuffleSQLSuite.scala index cf9c77f38..2cc87a375 100644 --- a/native-sql-engine/core/src/test/scala/org/apache/spark/shuffle/ColumnarShuffleSQLSuite.scala +++ b/native-sql-engine/core/src/test/scala/org/apache/spark/shuffle/ColumnarShuffleSQLSuite.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package com.intel.oap.misc +package org.apache.spark.shuffle import java.nio.file.Files @@ -23,6 +23,8 @@ import com.intel.oap.tpc.util.TPCRunner import org.apache.log4j.{Level, LogManager} import org.apache.spark.SparkConf import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.execution.ColumnarShuffleExchangeExec +import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec import org.apache.spark.sql.functions.{col, expr} import org.apache.spark.sql.test.SharedSparkSession @@ -58,12 +60,9 @@ class ComplexTypeSuite extends QueryTest with SharedSparkSession { .set("spark.sql.autoBroadcastJoinThreshold", "-1") .set("spark.oap.sql.columnar.sortmergejoin.lazyread", "true") .set("spark.oap.sql.columnar.autorelease", "false") - .set("spark.sql.adaptive.enabled", "true") .set("spark.sql.shuffle.partitions", "50") .set("spark.sql.adaptive.coalescePartitions.initialPartitionNum", "5") .set("spark.oap.sql.columnar.shuffledhashjoin.buildsizelimit", "200m") - .set("spark.oap.sql.columnar.rowtocolumnar", "false") - .set("spark.oap.sql.columnar.columnartorow", "false") return conf } @@ -76,7 +75,17 @@ class ComplexTypeSuite extends QueryTest with SharedSparkSession { lPath = lfile.getAbsolutePath spark.range(2).select(col("id"), expr("1").as("kind"), expr("array(1, 2)").as("arr_field"), - expr("struct(1, 2)").as("struct_field")) + expr("array(array(1, 2), array(3, 4))").as("arr_arr_field"), + expr("array(struct(1, 2), struct(1, 2))").as("arr_struct_field"), + expr("array(map(1, 2), map(3,4))").as("arr_map_field"), + expr("struct(1, 2)").as("struct_field"), + expr("struct(1, struct(1, 2))").as("struct_struct_field"), + expr("struct(1, array(1, 2))").as("struct_array_field"), + expr("map(1, 2)").as("map_field"), + expr("map(1, map(3,4))").as("map_map_field"), + expr("map(1, array(1, 2))").as("map_arr_field"), + expr("map(struct(1, 2), 2)").as("map_struct_field")) + .coalesce(1) .write .format("parquet") .mode("overwrite") @@ -88,6 +97,7 @@ class ComplexTypeSuite extends QueryTest with SharedSparkSession { spark.range(2).select(col("id"), expr("id % 2").as("kind"), expr("array(1, 2)").as("arr_field"), expr("struct(1, 2)").as("struct_field")) + .coalesce(1) .write .format("parquet") .mode("overwrite") @@ -101,16 +111,97 @@ class ComplexTypeSuite extends QueryTest with SharedSparkSession { val df = spark.sql("SELECT ltab.arr_field FROM ltab, rtab WHERE ltab.kind = rtab.kind") df.explain(true) df.show() - assert(df.count() == 2) + assert(df.queryExecution.executedPlan.find(_.isInstanceOf[ColumnarShuffleExchangeExec]).isDefined) + assert(df.count == 2) + } + + test("Test Nest Array in Shuffle split") { + val df = spark.sql("SELECT ltab.arr_arr_field FROM ltab, rtab WHERE ltab.kind = rtab.kind") + df.explain(true) + df.show() + assert(df.queryExecution.executedPlan.find(_.isInstanceOf[ColumnarShuffleExchangeExec]).isDefined) + assert(df.count == 2) + } + + test("Test Array_Struct in Shuffle split") { + val df = spark.sql("SELECT ltab.arr_struct_field FROM ltab, rtab WHERE ltab.kind = rtab.kind") + df.explain(true) + df.show() + assert(df.queryExecution.executedPlan.find(_.isInstanceOf[ColumnarShuffleExchangeExec]).isDefined) + assert(df.count == 2) + } + + test("Test Array_Map in Shuffle split") { + val df = spark.sql("SELECT ltab.arr_map_field FROM ltab, rtab WHERE ltab.kind = rtab.kind") + df.explain(true) + df.show() + assert(df.queryExecution.executedPlan.find(_.isInstanceOf[ColumnarShuffleExchangeExec]).isDefined) + assert(df.count == 2) } test("Test Struct in Shuffle stage") { val df = spark.sql("SELECT ltab.struct_field FROM ltab, rtab WHERE ltab.kind = rtab.kind") df.explain(true) df.show() + assert(df.queryExecution.executedPlan.find(_.isInstanceOf[ColumnarShuffleExchangeExec]).isDefined) + assert(df.count() == 2) + } + + test("Test Nest Struct in Shuffle stage") { + val df = spark.sql("SELECT ltab.struct_struct_field FROM ltab, rtab WHERE ltab.kind = rtab.kind") + df.explain(true) + df.show() + assert(df.queryExecution.executedPlan.find(_.isInstanceOf[ColumnarShuffleExchangeExec]).isDefined) assert(df.count() == 2) } + test("Test Struct_Array in Shuffle stage") { + val df = spark.sql("SELECT ltab.struct_array_field FROM ltab, rtab WHERE ltab.kind = rtab.kind") + df.explain(true) + df.show() + assert(df.queryExecution.executedPlan.find(_.isInstanceOf[ColumnarShuffleExchangeExec]).isDefined) + assert(df.count() == 2) + } + + test("Test Map in Shuffle stage") { + val df = spark.sql("SELECT ltab.map_field FROM ltab, rtab WHERE ltab.kind = rtab.kind") + df.explain(true) + df.show() + assert(df.queryExecution.executedPlan.find(_.isInstanceOf[ColumnarShuffleExchangeExec]).isDefined) + assert(df.count() == 2) + } + + test("Test Nest Map in Shuffle stage") { + val df = spark.sql("SELECT ltab.map_map_field FROM ltab, rtab WHERE ltab.kind = rtab.kind") + df.explain(true) + df.show() + assert(df.queryExecution.executedPlan.find(_.isInstanceOf[ColumnarShuffleExchangeExec]).isDefined) + assert(df.count() == 2) + } + + test("Test Map_Array in Shuffle stage") { + val df = spark.sql("SELECT ltab.map_arr_field FROM ltab, rtab WHERE ltab.kind = rtab.kind") + df.explain(true) + df.show() + assert(df.queryExecution.executedPlan.find(_.isInstanceOf[ColumnarShuffleExchangeExec]).isDefined) + assert(df.count() == 2) + } + + test("Test Map_Struct in Shuffle stage") { + val df = spark.sql("SELECT ltab.map_struct_field FROM ltab, rtab WHERE ltab.kind = rtab.kind") + df.explain(true) + df.show() + assert(df.queryExecution.executedPlan.find(_.isInstanceOf[ColumnarShuffleExchangeExec]).isDefined) + assert(df.count() == 2) + } + + test("Test Fall back with complex type in Partitioning keys") { + val df = spark.sql("SELECT ltab.arr_field FROM ltab, rtab WHERE ltab.arr_field = rtab.arr_field") + df.explain(true) + df.show() + assert(df.queryExecution.executedPlan.find(_.isInstanceOf[ShuffleExchangeExec]).isDefined) + } + override def afterAll(): Unit = { super.afterAll() } diff --git a/native-sql-engine/cpp/src/jni/jni_common.h b/native-sql-engine/cpp/src/jni/jni_common.h index 187d01b9e..43b6efee2 100644 --- a/native-sql-engine/cpp/src/jni/jni_common.h +++ b/native-sql-engine/cpp/src/jni/jni_common.h @@ -157,6 +157,12 @@ arrow::Status AppendBuffers(std::shared_ptr column, RETURN_NOT_OK(AppendBuffers(struct_array->field(i), buffers)); } } break; + case arrow::Type::MAP: { + auto map_array = std::dynamic_pointer_cast(column); + (*buffers).push_back(map_array->null_bitmap()); + (*buffers).push_back(map_array->value_offsets()); + RETURN_NOT_OK(AppendBuffers(map_array->values(), buffers)); + } break; default: { for (auto& buffer : column->data()->buffers) { (*buffers).push_back(buffer); @@ -187,24 +193,36 @@ arrow::Status MakeArrayData(std::shared_ptr type, int num_rows, switch (type->id()) { case arrow::Type::LIST: case arrow::Type::LARGE_LIST: { - auto offset_data_type = GetOffsetDataType(type); + int64_t null_count = arrow::kUnknownNullCount; + std::vector> buffers; + if (*buf_idx_ptr >= in_bufs_len) { + return arrow::Status::Invalid("insufficient number of in_buf_addrs"); + } + if (in_bufs[*buf_idx_ptr]->size() == 0) { + null_count = 0; + } + buffers.push_back(in_bufs[*buf_idx_ptr]); + *buf_idx_ptr += 1; + + auto offsetbits = arrow::offset_bit_width(type->id()); + if (*buf_idx_ptr >= in_bufs_len) { + return arrow::Status::Invalid("insufficient number of in_buf_addrs"); + } + buffers.push_back(in_bufs[*buf_idx_ptr]); + auto offsets_size = in_bufs[*buf_idx_ptr]->size(); + *buf_idx_ptr += 1; + num_rows = offsets_size * 8 / offsetbits - 1; + auto list_type = std::dynamic_pointer_cast(type); auto child_type = list_type->value_type(); - std::shared_ptr child_array_data, offset_array_data; - // create offset array - // Chendi: For some reason, for ListArray::FromArrays will remove last row from - // offset array, refer to array_nested.cc CleanListOffsets function - FIXOffsetBuffer(&in_bufs[*buf_idx_ptr], num_rows); - RETURN_NOT_OK(MakeArrayData(offset_data_type, num_rows + 1, in_bufs, in_bufs_len, - &offset_array_data, buf_idx_ptr)); - auto offset_array = arrow::MakeArray(offset_array_data); - // create child data array + ArrayDataVector list_child_data_vec; + std::shared_ptr list_child_data; + // create child ArrayData RETURN_NOT_OK(MakeArrayData(child_type, -1, in_bufs, in_bufs_len, - &child_array_data, buf_idx_ptr)); - auto child_array = arrow::MakeArray(child_array_data); - auto list_array = - arrow::ListArray::FromArrays(*offset_array, *child_array).ValueOrDie(); - *arr_data = list_array->data(); + &list_child_data, buf_idx_ptr)); + list_child_data_vec.push_back(list_child_data); + *arr_data = arrow::ArrayData::Make(type, num_rows, std::move(buffers), + list_child_data_vec, null_count); } break; case arrow::Type::STRUCT: { int64_t null_count = arrow::kUnknownNullCount; @@ -226,9 +244,62 @@ arrow::Status MakeArrayData(std::shared_ptr type, int num_rows, &struct_child_data, buf_idx_ptr)); struct_child_data_vec.push_back(struct_child_data); } + // For Struct recursion (Multiple levels) in the NestArray, num_rows cannot be + // calculated from offsets. + if (num_rows == -1) { + num_rows = struct_child_data_vec.at(0)->length; + } *arr_data = arrow::ArrayData::Make(type, num_rows, std::move(buffers), struct_child_data_vec, null_count); } break; + case arrow::Type::MAP: { + int64_t null_count = arrow::kUnknownNullCount; + std::vector> buffers; + if (*buf_idx_ptr >= in_bufs_len) { + return arrow::Status::Invalid("insufficient number of in_buf_addrs"); + } + if (in_bufs[*buf_idx_ptr]->size() == 0) { + null_count = 0; + } + buffers.push_back(in_bufs[*buf_idx_ptr]); + *buf_idx_ptr += 1; + + auto offsetbits = arrow::offset_bit_width(type->id()); + if (*buf_idx_ptr >= in_bufs_len) { + return arrow::Status::Invalid("insufficient number of in_buf_addrs"); + } + buffers.push_back(in_bufs[*buf_idx_ptr]); + auto offsets_size = in_bufs[*buf_idx_ptr]->size(); + *buf_idx_ptr += 1; + num_rows = offsets_size * 8 / offsetbits - 1; + + auto map_type = std::dynamic_pointer_cast(type); + auto child_type = map_type->value_type(); + ArrayDataVector map_child_data_vec; + std::shared_ptr map_child_data; + // create child ArrayData + RETURN_NOT_OK(MakeArrayData(child_type, -1, in_bufs, in_bufs_len, &map_child_data, + buf_idx_ptr)); + map_child_data_vec.push_back(map_child_data); + // specific handing because of the different schema between spark and native + if (map_child_data->null_count == arrow::kUnknownNullCount) { + map_child_data->buffers.at(0) = nullptr; + map_child_data->null_count = 0; + } + // validate child data for map + if (map_child_data->child_data.size() != 2) { + return Status::Invalid("Map array child array should have two fields"); + } + if (map_child_data->child_data[0]->null_count == arrow::kUnknownNullCount) { + int key_null_count = map_child_data->child_data[0]->GetNullCount(); + if (key_null_count != 0) { + return Status::Invalid("Map array keys array should have no nulls"); + } + } + + *arr_data = arrow::ArrayData::Make(type, num_rows, std::move(buffers), + map_child_data_vec, null_count); + } break; default: return arrow::Status::NotImplemented("MakeArrayData for type ", type->ToString(), " is not supported yet."); diff --git a/native-sql-engine/cpp/src/tests/jniutils_test.cc b/native-sql-engine/cpp/src/tests/jniutils_test.cc index 4c02e7bda..0d29a1a9e 100644 --- a/native-sql-engine/cpp/src/tests/jniutils_test.cc +++ b/native-sql-engine/cpp/src/tests/jniutils_test.cc @@ -130,5 +130,90 @@ TEST_F(JniUtilsTest, TestMakeRecordBatchBuild_Int_Struct) { ASSERT_TRUE(rb->Equals(*input_batch_arr.get())); } +TEST_F(JniUtilsTest, TestMakeRecordBatchBuild_map) { + auto f_map = field("f_map", map(int32(), int32())); + + auto rb_schema = arrow::schema({f_map}); + + const std::vector input_data_arr = {R"([[[1, 2]], [[1, 2]]])"}; + + std::shared_ptr input_batch_arr; + std::shared_ptr res_batch_arr; + MakeInputBatch(input_data_arr, rb_schema, &input_batch_arr); + + auto num_rows = input_batch_arr->num_rows(); + std::vector> buffers; + std::vector in_buf_addrs; + std::vector in_buf_sizes; + for (int i = 0; i < rb_schema->num_fields(); ++i) { + ASSERT_NOT_OK(AppendBuffers(input_batch_arr->column(i), &buffers)); + } + + for (auto buffer : buffers) { + if (buffer == nullptr) { + in_buf_addrs.push_back(0); + in_buf_sizes.push_back(0); + } else { + in_buf_addrs.push_back((int64_t)buffer->data()); + in_buf_sizes.push_back((int64_t)buffer->size()); + } + } + + auto status = MakeRecordBatch(rb_schema, num_rows, &in_buf_addrs[0], &in_buf_sizes[0], + buffers.size(), &res_batch_arr); + + const auto& rb = res_batch_arr; + ASSERT_EQ(rb->num_columns(), rb_schema->num_fields()); + for (auto j = 0; j < rb->num_columns(); ++j) { + ASSERT_EQ(rb->column(j)->length(), rb->num_rows()); + } + + ASSERT_TRUE(rb->Equals(*input_batch_arr.get())); +} + +TEST_F(JniUtilsTest, TestMakeRecordBatchBuild_list_map) { + auto f_arr_int32 = field("f_int32", arrow::list(arrow::list(arrow::int32()))); + auto f_arr_list_map = field("f_list_map", list(map(utf8(), utf8()))); + + auto rb_schema = arrow::schema({f_arr_int32, f_arr_list_map}); + + const std::vector input_data_arr = { + R"([[[1, 2, 3]], [[9, 8], [null]], [[3, 1], [0]], [[1, 9, null]]])", + R"([[[["key1", "val_aa1"]]], [[["key1", "val_bb1"]], [["key2", "val_bb2"]]], [[["key1", "val_cc1"]]], [[["key1", "val_dd1"]]]])"}; + + std::shared_ptr input_batch_arr; + std::shared_ptr res_batch_arr; + MakeInputBatch(input_data_arr, rb_schema, &input_batch_arr); + + auto num_rows = input_batch_arr->num_rows(); + std::vector> buffers; + std::vector in_buf_addrs; + std::vector in_buf_sizes; + for (int i = 0; i < rb_schema->num_fields(); ++i) { + ASSERT_NOT_OK(AppendBuffers(input_batch_arr->column(i), &buffers)); + } + + for (auto buffer : buffers) { + if (buffer == nullptr) { + in_buf_addrs.push_back(0); + in_buf_sizes.push_back(0); + } else { + in_buf_addrs.push_back((int64_t)buffer->data()); + in_buf_sizes.push_back((int64_t)buffer->size()); + } + } + + auto status = MakeRecordBatch(rb_schema, num_rows, &in_buf_addrs[0], &in_buf_sizes[0], + buffers.size(), &res_batch_arr); + + const auto& rb = res_batch_arr; + ASSERT_EQ(rb->num_columns(), rb_schema->num_fields()); + for (auto j = 0; j < rb->num_columns(); ++j) { + ASSERT_EQ(rb->column(j)->length(), rb->num_rows()); + } + + ASSERT_TRUE(rb->Equals(*input_batch_arr.get())); +} + } // namespace jniutils } // namespace sparkcolumnarplugin