diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ArrowRowToColumnarExec.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ArrowRowToColumnarExec.scala index a32ae1f69..87f7de1fe 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ArrowRowToColumnarExec.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ArrowRowToColumnarExec.scala @@ -166,8 +166,10 @@ case class ArrowRowToColumnarExec(child: SparkPlan) extends UnaryExecNode { val unsafeRow = row.asInstanceOf[UnsafeRow] val sizeInBytes = unsafeRow.getSizeInBytes if ((offset + sizeInBytes) > arrowBuf.capacity()) { + val tmpBuf = allocator.buffer(((offset + sizeInBytes) * 1.5).toLong) + tmpBuf.setBytes(0, arrowBuf, 0, offset) arrowBuf.close() - arrowBuf = allocator.buffer((arrowBuf.capacity() * 1.2).toLong) + arrowBuf = tmpBuf } Platform.copyMemory(unsafeRow.getBaseObject, unsafeRow.getBaseOffset, null, arrowBuf.memoryAddress() + offset, sizeInBytes) @@ -192,7 +194,7 @@ case class ArrowRowToColumnarExec(child: SparkPlan) extends UnaryExecNode { processTime.set(NANOSECONDS.toMillis(elapse)) last_cb } else { - logInfo("the buffer allocated failed and will fall back to non arrow optimization") + logInfo("not unsaferow, fallback to java based r2c") val vectors: Seq[WritableColumnVector] = ArrowWritableColumnVector.allocateColumns(numRows, schema) var rowCount = 0 diff --git a/native-sql-engine/core/src/test/scala/org/apache/spark/sql/execution/SortSuite.scala b/native-sql-engine/core/src/test/scala/org/apache/spark/sql/execution/SortSuite.scala index 6a4f3f626..b2dd9aac0 100644 --- a/native-sql-engine/core/src/test/scala/org/apache/spark/sql/execution/SortSuite.scala +++ b/native-sql-engine/core/src/test/scala/org/apache/spark/sql/execution/SortSuite.scala @@ -32,6 +32,12 @@ import org.apache.spark.sql.types._ class SortSuite extends SparkPlanTest with SharedSparkSession { import testImplicits.newProductEncoder import testImplicits.localSeqToDatasetHolder + + override protected def sparkConf: SparkConf = { + val conf = super.sparkConf + conf.set("spark.memory.offHeap.size", String.valueOf("5000m")) + .set("spark.sql.inMemoryColumnarStorage.batchSize", "100") + } test("basic sorting using ExternalSort") {