Skip to content
This repository has been archived by the owner on Sep 18, 2023. It is now read-only.

Commit

Permalink
[NSE-746]Fix memory allocation in row to columnar (#757)
Browse files Browse the repository at this point in the history
* Fix memory allocation in row to columnar

This patch tries to remove the hard-coded memory allocation in r2c

Signed-off-by: Yuan Zhou <yuan.zhou@intel.com>

* fix buf release

Signed-off-by: Yuan Zhou <yuan.zhou@intel.com>

* increase hashmap size

Signed-off-by: Yuan Zhou <yuan.zhou@intel.com>
  • Loading branch information
zhouyuan authored Mar 10, 2022
1 parent 34ea83e commit 5f7d7b6
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -102,12 +102,14 @@ class ArrowRowToColumnarExec(child: SparkPlan) extends RowToColumnarExec(child =
// Allocate large buffer to store the numRows rows
val bufferSize = 134217728 // 128M can estimator the buffer size based on the data type
val allocator = SparkMemoryUtils.contextAllocator()
val arrowBuf: ArrowBuf = allocator.buffer(bufferSize)
var arrowBuf: ArrowBuf = null
override def hasNext: Boolean = {
rowIterator.hasNext
}
TaskContext.get().addTaskCompletionListener[Unit] { _ =>
arrowBuf.close()
if (arrowBuf != null) {
arrowBuf.close()
}
}
override def next(): ColumnarBatch = {
var isUnsafeRow = true
Expand All @@ -130,6 +132,9 @@ class ArrowRowToColumnarExec(child: SparkPlan) extends RowToColumnarExec(child =
assert(firstRow.isInstanceOf[UnsafeRow])
val unsafeRow = firstRow.asInstanceOf[UnsafeRow]
val sizeInBytes = unsafeRow.getSizeInBytes
// allocate buffer based on 1st row
val estimatedBufSize = sizeInBytes * numRows * 1.2
arrowBuf = allocator.buffer(estimatedBufSize.toLong)
Platform.copyMemory(unsafeRow.getBaseObject, unsafeRow.getBaseOffset,
null, arrowBuf.memoryAddress() + offset, sizeInBytes)
offset += sizeInBytes
Expand All @@ -141,6 +146,10 @@ class ArrowRowToColumnarExec(child: SparkPlan) extends RowToColumnarExec(child =
assert(row.isInstanceOf[UnsafeRow])
val unsafeRow = row.asInstanceOf[UnsafeRow]
val sizeInBytes = unsafeRow.getSizeInBytes
if ((offset + sizeInBytes) > arrowBuf.capacity()) {
arrowBuf.close()
arrowBuf = allocator.buffer((arrowBuf.capacity() * 1.2).toLong)
}
Platform.copyMemory(unsafeRow.getBaseObject, unsafeRow.getBaseOffset,
null, arrowBuf.memoryAddress() + offset, sizeInBytes)
offset += sizeInBytes
Expand All @@ -158,6 +167,7 @@ class ArrowRowToColumnarExec(child: SparkPlan) extends RowToColumnarExec(child =
val output = ConverterUtils.fromArrowRecordBatch(arrowSchema, rb)
val outputNumRows = rb.getLength
ConverterUtils.releaseArrowRecordBatch(rb)
arrowBuf.close()
last_cb = new ColumnarBatch(output.map(v => v.asInstanceOf[ColumnVector]).toArray, outputNumRows)
elapse = System.nanoTime() - start
processTime.set(NANOSECONDS.toMillis(elapse))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ class HashRelationKernel::Impl {
}
long tmp_capacity = init_key_capacity;
if (key_size_ != -1) {
tmp_capacity *= 12;
tmp_capacity *= 16;
} else {
tmp_capacity *= 128;
}
Expand Down

0 comments on commit 5f7d7b6

Please sign in to comment.