Skip to content
This repository has been archived by the owner on Sep 18, 2023. It is now read-only.

Commit

Permalink
[NSE-958] Fix SQLMetrics inaccuracy in JVM/Native R2C and CoalesceBat…
Browse files Browse the repository at this point in the history
…cth (#959)
  • Loading branch information
zhixingheyi-tian authored Jun 8, 2022
1 parent c14d994 commit e179f98
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ case class RowToArrowColumnarExec(child: SparkPlan) extends UnaryExecNode {
rowCount += 1
}
vectors.foreach(v => v.asInstanceOf[ArrowWritableColumnVector].setValueCount(rowCount))
processTime.set(NANOSECONDS.toMillis(elapse))
processTime += NANOSECONDS.toMillis(elapse)
numInputRows += rowCount
numOutputBatches += 1
last_cb = new ColumnarBatch(vectors.toArray, rowCount)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,6 @@ case class ArrowCoalesceBatchesExec(child: SparkPlan) extends UnaryExecNode {
val arrBufSizes = new ArrayBuffer[Array[Long]]()
val numrows = ListBuffer[Int]()

val beforeConcat = System.nanoTime
while (hasNext && rowCount < recordsPerBatch) {
val delta: ColumnarBatch = iter.next()
delta.retain()
Expand Down Expand Up @@ -134,16 +133,17 @@ case class ArrowCoalesceBatchesExec(child: SparkPlan) extends UnaryExecNode {
val schema = new Schema(expected_output_arrow_fields.asJava)
val arrowSchema = ConverterUtils.getSchemaBytesBuf(schema)

val beforeConcat = System.nanoTime
val serializedRecordBatch = jniWrapper.nativeCoalesceBatches(
arrowSchema, rowCount, numrows.toArray, arrBufAddrs.toArray, arrBufSizes.toArray,
SparkMemoryUtils.contextMemoryPool().getNativeInstanceId)
concatTime += System.nanoTime - beforeConcat
val rb = UnsafeRecordBatchSerializer.deserializeUnsafe(SparkMemoryUtils.contextAllocator(), serializedRecordBatch)
val ColVecArr = ConverterUtils.fromArrowRecordBatch(schema, rb)
val outputNumRows = rb.getLength
ConverterUtils.releaseArrowRecordBatch(rb)
val bigColBatch = new ColumnarBatch(ColVecArr.map(v => v.asInstanceOf[ColumnVector]).toArray, rowCount)

concatTime += System.nanoTime - beforeConcat
numOutputRows += rowCount
numInputBatches += batchesToAppend.length
numOutputBatches += 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,6 @@ case class ArrowRowToColumnarExec(child: SparkPlan) extends UnaryExecNode {
val rowLength = new ListBuffer[Long]()
var rowCount = 0
var offset = 0
val start = System.nanoTime()

assert(firstRow.isInstanceOf[UnsafeRow])
val unsafeRow = firstRow.asInstanceOf[UnsafeRow]
Expand Down Expand Up @@ -180,8 +179,10 @@ case class ArrowRowToColumnarExec(child: SparkPlan) extends UnaryExecNode {
if (schemaBytes == null) {
schemaBytes = ConverterUtils.getSchemaBytesBuf(arrowSchema)
}
val start = System.nanoTime()
val serializedRecordBatch = jniWrapper.nativeConvertRowToColumnar(schemaBytes, rowLength.toArray,
arrowBuf.memoryAddress(), SparkMemoryUtils.contextMemoryPool().getNativeInstanceId)
elapse = System.nanoTime() - start
numInputRows += rowCount
numOutputBatches += 1
val rb = UnsafeRecordBatchSerializer.deserializeUnsafe(allocator, serializedRecordBatch)
Expand All @@ -190,8 +191,7 @@ case class ArrowRowToColumnarExec(child: SparkPlan) extends UnaryExecNode {
ConverterUtils.releaseArrowRecordBatch(rb)
arrowBuf.close()
last_cb = new ColumnarBatch(output.map(v => v.asInstanceOf[ColumnVector]).toArray, outputNumRows)
elapse = System.nanoTime() - start
processTime.set(NANOSECONDS.toMillis(elapse))
processTime += NANOSECONDS.toMillis(elapse)
last_cb
} else {
logInfo("not unsaferow, fallback to java based r2c")
Expand All @@ -212,7 +212,7 @@ case class ArrowRowToColumnarExec(child: SparkPlan) extends UnaryExecNode {
rowCount += 1
}
vectors.foreach(v => v.asInstanceOf[ArrowWritableColumnVector].setValueCount(rowCount))
processTime.set(NANOSECONDS.toMillis(elapse))
processTime += NANOSECONDS.toMillis(elapse)
numInputRows += rowCount
numOutputBatches += 1
last_cb = new ColumnarBatch(vectors.toArray, rowCount)
Expand Down

0 comments on commit e179f98

Please sign in to comment.