diff --git a/native-sql-engine/core/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleDependency.scala b/native-sql-engine/core/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleDependency.scala index 734c32249..c4f3d7cee 100644 --- a/native-sql-engine/core/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleDependency.scala +++ b/native-sql-engine/core/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleDependency.scala @@ -44,6 +44,7 @@ import scala.reflect.ClassTag * @param bytesSpilled for shuffle spill size tracking * @param computePidTime partition id computation time metric * @param splitTime native split time metric + * @param prepareTime native split prepare time metric */ class ColumnarShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag]( @transient private val _rdd: RDD[_ <: Product2[K, V]], @@ -60,7 +61,8 @@ class ColumnarShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag]( val computePidTime: SQLMetric, val splitTime: SQLMetric, val spillTime: SQLMetric, - val compressTime: SQLMetric) + val compressTime: SQLMetric, + val prepareTime: SQLMetric) extends ShuffleDependency[K, V, C]( _rdd, partitioner, diff --git a/native-sql-engine/core/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala b/native-sql-engine/core/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala index d3497c2fd..de72369fb 100644 --- a/native-sql-engine/core/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala +++ b/native-sql-engine/core/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala @@ -124,6 +124,7 @@ class ColumnarShuffleWriter[K, V]( if (cb.numRows == 0 || cb.numCols == 0) { logInfo(s"Skip ColumnarBatch of ${cb.numRows} rows, ${cb.numCols} cols") } else { + val startTimeForPrepare = System.nanoTime() val bufAddrs = new ListBuffer[Long]() val bufSizes = new ListBuffer[Long]() val recordBatch = ConverterUtils.createArrowRecordBatch(cb) @@ -164,6 +165,7 @@ class ColumnarShuffleWriter[K, V]( } } firstRecordBatch = false + dep.prepareTime.add(System.nanoTime() - startTimeForPrepare) jniWrapper.split(nativeSplitter, cb.numRows, bufAddrs.toArray, bufSizes.toArray, firstRecordBatch) dep.splitTime.add(System.nanoTime() - startTime) diff --git a/native-sql-engine/core/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala b/native-sql-engine/core/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala index d36f50530..26ac26822 100644 --- a/native-sql-engine/core/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala +++ b/native-sql-engine/core/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala @@ -77,6 +77,7 @@ case class ColumnarShuffleExchangeExec( "splitTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "totaltime_split"), "spillTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "shuffle spill time"), "compressTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "totaltime_compress"), + "prepareTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "totaltime_prepare"), "avgReadBatchNumRows" -> SQLMetrics .createAverageMetric(sparkContext, "avg read batch num rows"), "numInputRows" -> SQLMetrics.createMetric(sparkContext, "number of input rows"), @@ -140,7 +141,8 @@ case class ColumnarShuffleExchangeExec( longMetric("computePidTime"), longMetric("splitTime"), longMetric("spillTime"), - longMetric("compressTime")) + longMetric("compressTime"), + longMetric("prepareTime")) } var cachedShuffleRDD: ShuffledColumnarBatchRDD = _ @@ -187,6 +189,7 @@ class ColumnarShuffleExchangeAdaptor( "splitTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "totaltime_split"), "spillTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "shuffle spill time"), "compressTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "totaltime_compress"), + "prepareTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "totaltime_prepare"), "avgReadBatchNumRows" -> SQLMetrics .createAverageMetric(sparkContext, "avg read batch num rows"), "numInputRows" -> SQLMetrics.createMetric(sparkContext, "number of input rows"), @@ -236,7 +239,8 @@ class ColumnarShuffleExchangeAdaptor( longMetric("computePidTime"), longMetric("splitTime"), longMetric("spillTime"), - longMetric("compressTime")) + longMetric("compressTime"), + longMetric("prepareTime")) } var cachedShuffleRDD: ShuffledColumnarBatchRDD = _ @@ -301,7 +305,8 @@ object ColumnarShuffleExchangeExec extends Logging { computePidTime: SQLMetric, splitTime: SQLMetric, spillTime: SQLMetric, - compressTime: SQLMetric): ShuffleDependency[Int, ColumnarBatch, ColumnarBatch] = { + compressTime: SQLMetric, + prepareTime: SQLMetric): ShuffleDependency[Int, ColumnarBatch, ColumnarBatch] = { val arrowFields = outputAttributes.map(attr => ConverterUtils.createArrowField(attr)) def serializeSchema(fields: Seq[Field]): Array[Byte] = { val schema = new Schema(fields.asJava) @@ -447,7 +452,8 @@ object ColumnarShuffleExchangeExec extends Logging { computePidTime = computePidTime, splitTime = splitTime, spillTime = spillTime, - compressTime = compressTime) + compressTime = compressTime, + prepareTime = prepareTime) dependency }