From e179f98dfe313b194bf270cb30e9b7cf06615ab0 Mon Sep 17 00:00:00 2001 From: zhixingheyi-tian Date: Wed, 8 Jun 2022 16:47:44 +0800 Subject: [PATCH] [NSE-958] Fix SQLMetrics inaccuracy in JVM/Native R2C and CoalesceBatcth (#959) --- .../intel/oap/sql/execution/RowToArrowColumnarExec.scala | 2 +- .../intel/oap/execution/ArrowCoalesceBatchesExec.scala | 4 ++-- .../com/intel/oap/execution/ArrowRowToColumnarExec.scala | 8 ++++---- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/arrow-data-source/common/src/main/scala/com/intel/oap/sql/execution/RowToArrowColumnarExec.scala b/arrow-data-source/common/src/main/scala/com/intel/oap/sql/execution/RowToArrowColumnarExec.scala index e5f06dc06..25897f29c 100644 --- a/arrow-data-source/common/src/main/scala/com/intel/oap/sql/execution/RowToArrowColumnarExec.scala +++ b/arrow-data-source/common/src/main/scala/com/intel/oap/sql/execution/RowToArrowColumnarExec.scala @@ -297,7 +297,7 @@ case class RowToArrowColumnarExec(child: SparkPlan) extends UnaryExecNode { rowCount += 1 } vectors.foreach(v => v.asInstanceOf[ArrowWritableColumnVector].setValueCount(rowCount)) - processTime.set(NANOSECONDS.toMillis(elapse)) + processTime += NANOSECONDS.toMillis(elapse) numInputRows += rowCount numOutputBatches += 1 last_cb = new ColumnarBatch(vectors.toArray, rowCount) diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ArrowCoalesceBatchesExec.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ArrowCoalesceBatchesExec.scala index dccc4a54b..7633bf34f 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ArrowCoalesceBatchesExec.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ArrowCoalesceBatchesExec.scala @@ -102,7 +102,6 @@ case class ArrowCoalesceBatchesExec(child: SparkPlan) extends UnaryExecNode { val arrBufSizes = new ArrayBuffer[Array[Long]]() val numrows = ListBuffer[Int]() - val beforeConcat = System.nanoTime while (hasNext && rowCount < recordsPerBatch) { val delta: ColumnarBatch = iter.next() delta.retain() @@ -134,16 +133,17 @@ case class ArrowCoalesceBatchesExec(child: SparkPlan) extends UnaryExecNode { val schema = new Schema(expected_output_arrow_fields.asJava) val arrowSchema = ConverterUtils.getSchemaBytesBuf(schema) + val beforeConcat = System.nanoTime val serializedRecordBatch = jniWrapper.nativeCoalesceBatches( arrowSchema, rowCount, numrows.toArray, arrBufAddrs.toArray, arrBufSizes.toArray, SparkMemoryUtils.contextMemoryPool().getNativeInstanceId) + concatTime += System.nanoTime - beforeConcat val rb = UnsafeRecordBatchSerializer.deserializeUnsafe(SparkMemoryUtils.contextAllocator(), serializedRecordBatch) val ColVecArr = ConverterUtils.fromArrowRecordBatch(schema, rb) val outputNumRows = rb.getLength ConverterUtils.releaseArrowRecordBatch(rb) val bigColBatch = new ColumnarBatch(ColVecArr.map(v => v.asInstanceOf[ColumnVector]).toArray, rowCount) - concatTime += System.nanoTime - beforeConcat numOutputRows += rowCount numInputBatches += batchesToAppend.length numOutputBatches += 1 diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ArrowRowToColumnarExec.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ArrowRowToColumnarExec.scala index 87f7de1fe..fe3000e4f 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ArrowRowToColumnarExec.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ArrowRowToColumnarExec.scala @@ -146,7 +146,6 @@ case class ArrowRowToColumnarExec(child: SparkPlan) extends UnaryExecNode { val rowLength = new ListBuffer[Long]() var rowCount = 0 var offset = 0 - val start = System.nanoTime() assert(firstRow.isInstanceOf[UnsafeRow]) val unsafeRow = firstRow.asInstanceOf[UnsafeRow] @@ -180,8 +179,10 @@ case class ArrowRowToColumnarExec(child: SparkPlan) extends UnaryExecNode { if (schemaBytes == null) { schemaBytes = ConverterUtils.getSchemaBytesBuf(arrowSchema) } + val start = System.nanoTime() val serializedRecordBatch = jniWrapper.nativeConvertRowToColumnar(schemaBytes, rowLength.toArray, arrowBuf.memoryAddress(), SparkMemoryUtils.contextMemoryPool().getNativeInstanceId) + elapse = System.nanoTime() - start numInputRows += rowCount numOutputBatches += 1 val rb = UnsafeRecordBatchSerializer.deserializeUnsafe(allocator, serializedRecordBatch) @@ -190,8 +191,7 @@ case class ArrowRowToColumnarExec(child: SparkPlan) extends UnaryExecNode { ConverterUtils.releaseArrowRecordBatch(rb) arrowBuf.close() last_cb = new ColumnarBatch(output.map(v => v.asInstanceOf[ColumnVector]).toArray, outputNumRows) - elapse = System.nanoTime() - start - processTime.set(NANOSECONDS.toMillis(elapse)) + processTime += NANOSECONDS.toMillis(elapse) last_cb } else { logInfo("not unsaferow, fallback to java based r2c") @@ -212,7 +212,7 @@ case class ArrowRowToColumnarExec(child: SparkPlan) extends UnaryExecNode { rowCount += 1 } vectors.foreach(v => v.asInstanceOf[ArrowWritableColumnVector].setValueCount(rowCount)) - processTime.set(NANOSECONDS.toMillis(elapse)) + processTime += NANOSECONDS.toMillis(elapse) numInputRows += rowCount numOutputBatches += 1 last_cb = new ColumnarBatch(vectors.toArray, rowCount)