Skip to content
This repository has been archived by the owner on Sep 18, 2023. It is now read-only.

Commit

Permalink
[NSE-746]Fix memory allocation in row to columnar (#834) (#836)
Browse files Browse the repository at this point in the history
* Optimize row to column memory allocation

This patch improves the memory allocation in r2c by doing
estimation based on first row. Also check the capacity during
the conversation and increase the buffer size if not enough

Signed-off-by: Yuan Zhou <yuan.zhou@intel.com>

* fix leakage

Signed-off-by: Yuan Zhou <yuan.zhou@intel.com>

* fix test

Signed-off-by: Yuan Zhou <yuan.zhou@intel.com>
  • Loading branch information
zhouyuan authored Apr 9, 2022
1 parent 1730f17 commit 12b4acb
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,10 @@ case class ArrowRowToColumnarExec(child: SparkPlan) extends UnaryExecNode {
val unsafeRow = row.asInstanceOf[UnsafeRow]
val sizeInBytes = unsafeRow.getSizeInBytes
if ((offset + sizeInBytes) > arrowBuf.capacity()) {
val tmpBuf = allocator.buffer(((offset + sizeInBytes) * 1.5).toLong)
tmpBuf.setBytes(0, arrowBuf, 0, offset)
arrowBuf.close()
arrowBuf = allocator.buffer((arrowBuf.capacity() * 1.2).toLong)
arrowBuf = tmpBuf
}
Platform.copyMemory(unsafeRow.getBaseObject, unsafeRow.getBaseOffset,
null, arrowBuf.memoryAddress() + offset, sizeInBytes)
Expand All @@ -192,7 +194,7 @@ case class ArrowRowToColumnarExec(child: SparkPlan) extends UnaryExecNode {
processTime.set(NANOSECONDS.toMillis(elapse))
last_cb
} else {
logInfo("the buffer allocated failed and will fall back to non arrow optimization")
logInfo("not unsaferow, fallback to java based r2c")
val vectors: Seq[WritableColumnVector] =
ArrowWritableColumnVector.allocateColumns(numRows, schema)
var rowCount = 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@ import org.apache.spark.sql.types._
class SortSuite extends SparkPlanTest with SharedSparkSession {
import testImplicits.newProductEncoder
import testImplicits.localSeqToDatasetHolder

override protected def sparkConf: SparkConf = {
val conf = super.sparkConf
conf.set("spark.memory.offHeap.size", String.valueOf("5000m"))
.set("spark.sql.inMemoryColumnarStorage.batchSize", "100")
}

test("basic sorting using ExternalSort") {

Expand Down

0 comments on commit 12b4acb

Please sign in to comment.