From a3c4fb0b91d195f9ac729520443c05e1fb7bf352 Mon Sep 17 00:00:00 2001 From: Yuan Zhou Date: Fri, 8 Apr 2022 17:57:25 +0800 Subject: [PATCH 1/3] Optimize row to column memory allocation This patch improves the memory allocation in r2c by doing estimation based on first row. Also check the capacity during the conversation and increase the buffer size if not enough Signed-off-by: Yuan Zhou --- .../com/intel/oap/execution/ArrowRowToColumnarExec.scala | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ArrowRowToColumnarExec.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ArrowRowToColumnarExec.scala index a32ae1f69..ed76b082a 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ArrowRowToColumnarExec.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ArrowRowToColumnarExec.scala @@ -166,8 +166,10 @@ case class ArrowRowToColumnarExec(child: SparkPlan) extends UnaryExecNode { val unsafeRow = row.asInstanceOf[UnsafeRow] val sizeInBytes = unsafeRow.getSizeInBytes if ((offset + sizeInBytes) > arrowBuf.capacity()) { + val tmpBuf = allocator.buffer(((offset + sizeInBytes) * 1.5).toLong) + tmpBuf.setBytes(0, arrowBuf, 0, offset) arrowBuf.close() - arrowBuf = allocator.buffer((arrowBuf.capacity() * 1.2).toLong) + arrowBuf = tmpBuf } Platform.copyMemory(unsafeRow.getBaseObject, unsafeRow.getBaseOffset, null, arrowBuf.memoryAddress() + offset, sizeInBytes) @@ -186,13 +188,12 @@ case class ArrowRowToColumnarExec(child: SparkPlan) extends UnaryExecNode { val output = ConverterUtils.fromArrowRecordBatch(arrowSchema, rb) val outputNumRows = rb.getLength ConverterUtils.releaseArrowRecordBatch(rb) - arrowBuf.close() last_cb = new ColumnarBatch(output.map(v => v.asInstanceOf[ColumnVector]).toArray, outputNumRows) elapse = System.nanoTime() - start processTime.set(NANOSECONDS.toMillis(elapse)) last_cb } else { - logInfo("the buffer allocated failed and will fall back to non arrow optimization") + logInfo("not unsaferow, fallback to java based r2c") val vectors: Seq[WritableColumnVector] = ArrowWritableColumnVector.allocateColumns(numRows, schema) var rowCount = 0 From a8e5ea9371f5f52049b4b44ba5424c49489e5588 Mon Sep 17 00:00:00 2001 From: Yuan Zhou Date: Fri, 8 Apr 2022 18:20:42 +0800 Subject: [PATCH 2/3] fix leakage Signed-off-by: Yuan Zhou --- .../scala/com/intel/oap/execution/ArrowRowToColumnarExec.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ArrowRowToColumnarExec.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ArrowRowToColumnarExec.scala index ed76b082a..87f7de1fe 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ArrowRowToColumnarExec.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ArrowRowToColumnarExec.scala @@ -188,6 +188,7 @@ case class ArrowRowToColumnarExec(child: SparkPlan) extends UnaryExecNode { val output = ConverterUtils.fromArrowRecordBatch(arrowSchema, rb) val outputNumRows = rb.getLength ConverterUtils.releaseArrowRecordBatch(rb) + arrowBuf.close() last_cb = new ColumnarBatch(output.map(v => v.asInstanceOf[ColumnVector]).toArray, outputNumRows) elapse = System.nanoTime() - start processTime.set(NANOSECONDS.toMillis(elapse)) From 117411fc6adf0391040870ed6c502a324b9ef4fc Mon Sep 17 00:00:00 2001 From: Yuan Zhou Date: Fri, 8 Apr 2022 22:06:52 +0800 Subject: [PATCH 3/3] fix test Signed-off-by: Yuan Zhou --- .../test/scala/org/apache/spark/sql/execution/SortSuite.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/native-sql-engine/core/src/test/scala/org/apache/spark/sql/execution/SortSuite.scala b/native-sql-engine/core/src/test/scala/org/apache/spark/sql/execution/SortSuite.scala index 1c70335c6..b10e6eaac 100644 --- a/native-sql-engine/core/src/test/scala/org/apache/spark/sql/execution/SortSuite.scala +++ b/native-sql-engine/core/src/test/scala/org/apache/spark/sql/execution/SortSuite.scala @@ -37,6 +37,7 @@ class SortSuite extends SparkPlanTest with SharedSparkSession { override protected def sparkConf: SparkConf = { val conf = super.sparkConf conf.set("spark.memory.offHeap.size", String.valueOf("5000m")) + .set("spark.sql.inMemoryColumnarStorage.batchSize", "100") } test("basic sorting using ExternalSort") {