Skip to content

Commit

Permalink
Fix negative rs. shuffle write time (#11548)
Browse files Browse the repository at this point in the history
* Fix negative rs. shuffle write time

Signed-off-by: Alessandro Bellina <abellina@nvidia.com>

* Stop double counting openTimeNs in shuffleWriteTimeMetric

---------

Signed-off-by: Alessandro Bellina <abellina@nvidia.com>
  • Loading branch information
abellina authored Oct 1, 2024
1 parent c74e2dd commit 5fae883
Showing 1 changed file with 4 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,8 @@ abstract class RapidsShuffleThreadedWriterBase[K, V](
private def write(records: TimeTrackingIterator): Unit = {
withResource(new NvtxRange("ThreadedWriter.write", NvtxColor.RED)) { _ =>
withResource(new NvtxRange("compute", NvtxColor.GREEN)) { _ =>
// Timestamp when the main processing begins
val processingStart: Long = System.nanoTime()
val mapOutputWriter = shuffleExecutorComponents.createMapOutputWriter(
shuffleId,
mapId,
Expand Down Expand Up @@ -342,8 +344,7 @@ abstract class RapidsShuffleThreadedWriterBase[K, V](
var waitTimeOnLimiterNs: Long = 0L
// Time spent computing ColumnarBatch sizes
var batchSizeComputeTimeNs: Long = 0L
// Timestamp when the main processing begins
val processingStart: Long = System.nanoTime()

try {
while (records.hasNext) {
// get the record
Expand Down Expand Up @@ -447,7 +448,7 @@ abstract class RapidsShuffleThreadedWriterBase[K, V](
serializationTimeMetric.foreach(_ += (serializationRatio * writeTimeNs).toLong)
// we add all three here because this metric is meant to show the time
// we are blocked on writes
shuffleWriteTimeMetric.foreach(_ += (openTimeNs + writeTimeNs + combineTimeNs))
shuffleWriteTimeMetric.foreach(_ += (writeTimeNs + combineTimeNs))
shuffleCombineTimeMetric.foreach(_ += combineTimeNs)
pl
}
Expand Down

0 comments on commit 5fae883

Please sign in to comment.