From 5fae8839b46e7c7669406cfa4ccbacde6ba9662f Mon Sep 17 00:00:00 2001 From: Alessandro Bellina Date: Tue, 1 Oct 2024 08:32:04 -0500 Subject: [PATCH] Fix negative rs. shuffle write time (#11548) * Fix negative rs. shuffle write time Signed-off-by: Alessandro Bellina * Stop double counting openTimeNs in shuffleWriteTimeMetric --------- Signed-off-by: Alessandro Bellina --- .../sql/rapids/RapidsShuffleInternalManagerBase.scala | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/RapidsShuffleInternalManagerBase.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/RapidsShuffleInternalManagerBase.scala index 3c3bf8ce3dc..da54735aaf4 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/RapidsShuffleInternalManagerBase.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/RapidsShuffleInternalManagerBase.scala @@ -308,6 +308,8 @@ abstract class RapidsShuffleThreadedWriterBase[K, V]( private def write(records: TimeTrackingIterator): Unit = { withResource(new NvtxRange("ThreadedWriter.write", NvtxColor.RED)) { _ => withResource(new NvtxRange("compute", NvtxColor.GREEN)) { _ => + // Timestamp when the main processing begins + val processingStart: Long = System.nanoTime() val mapOutputWriter = shuffleExecutorComponents.createMapOutputWriter( shuffleId, mapId, @@ -342,8 +344,7 @@ abstract class RapidsShuffleThreadedWriterBase[K, V]( var waitTimeOnLimiterNs: Long = 0L // Time spent computing ColumnarBatch sizes var batchSizeComputeTimeNs: Long = 0L - // Timestamp when the main processing begins - val processingStart: Long = System.nanoTime() + try { while (records.hasNext) { // get the record @@ -447,7 +448,7 @@ abstract class RapidsShuffleThreadedWriterBase[K, V]( serializationTimeMetric.foreach(_ += (serializationRatio * writeTimeNs).toLong) // we add all three here because this metric is meant to show the time // we are blocked on writes - shuffleWriteTimeMetric.foreach(_ += (openTimeNs + writeTimeNs + combineTimeNs)) + shuffleWriteTimeMetric.foreach(_ += (writeTimeNs + combineTimeNs)) shuffleCombineTimeMetric.foreach(_ += combineTimeNs) pl }