From aa2cca99cf403c8f0504d47fd179578a7ae0df8b Mon Sep 17 00:00:00 2001 From: Yuan Date: Sat, 9 Apr 2022 11:12:34 +0800 Subject: [PATCH] [NSE-746]Fix memory allocation in row to columnar (#834) * Optimize row to column memory allocation This patch improves the memory allocation in r2c by doing estimation based on first row. Also check the capacity during the conversation and increase the buffer size if not enough Signed-off-by: Yuan Zhou * fix leakage Signed-off-by: Yuan Zhou * fix test Signed-off-by: Yuan Zhou --- .../com/intel/oap/execution/ArrowRowToColumnarExec.scala | 6 ++++-- .../scala/org/apache/spark/sql/execution/SortSuite.scala | 6 ++++++ 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ArrowRowToColumnarExec.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ArrowRowToColumnarExec.scala index a32ae1f69..87f7de1fe 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ArrowRowToColumnarExec.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ArrowRowToColumnarExec.scala @@ -166,8 +166,10 @@ case class ArrowRowToColumnarExec(child: SparkPlan) extends UnaryExecNode { val unsafeRow = row.asInstanceOf[UnsafeRow] val sizeInBytes = unsafeRow.getSizeInBytes if ((offset + sizeInBytes) > arrowBuf.capacity()) { + val tmpBuf = allocator.buffer(((offset + sizeInBytes) * 1.5).toLong) + tmpBuf.setBytes(0, arrowBuf, 0, offset) arrowBuf.close() - arrowBuf = allocator.buffer((arrowBuf.capacity() * 1.2).toLong) + arrowBuf = tmpBuf } Platform.copyMemory(unsafeRow.getBaseObject, unsafeRow.getBaseOffset, null, arrowBuf.memoryAddress() + offset, sizeInBytes) @@ -192,7 +194,7 @@ case class ArrowRowToColumnarExec(child: SparkPlan) extends UnaryExecNode { processTime.set(NANOSECONDS.toMillis(elapse)) last_cb } else { - logInfo("the buffer allocated failed and will fall back to non arrow optimization") + logInfo("not unsaferow, fallback to java based r2c") val vectors: Seq[WritableColumnVector] = ArrowWritableColumnVector.allocateColumns(numRows, schema) var rowCount = 0 diff --git a/native-sql-engine/core/src/test/scala/org/apache/spark/sql/execution/SortSuite.scala b/native-sql-engine/core/src/test/scala/org/apache/spark/sql/execution/SortSuite.scala index 6a4f3f626..b2dd9aac0 100644 --- a/native-sql-engine/core/src/test/scala/org/apache/spark/sql/execution/SortSuite.scala +++ b/native-sql-engine/core/src/test/scala/org/apache/spark/sql/execution/SortSuite.scala @@ -32,6 +32,12 @@ import org.apache.spark.sql.types._ class SortSuite extends SparkPlanTest with SharedSparkSession { import testImplicits.newProductEncoder import testImplicits.localSeqToDatasetHolder + + override protected def sparkConf: SparkConf = { + val conf = super.sparkConf + conf.set("spark.memory.offHeap.size", String.valueOf("5000m")) + .set("spark.sql.inMemoryColumnarStorage.batchSize", "100") + } test("basic sorting using ExternalSort") {