Skip to content

Commit

Permalink
More accurate estimation for the result serialization time in RapidsS…
Browse files Browse the repository at this point in the history
…huffleThreadedWriterBase (#11180)

* Exclude the processing time in records.hasNext from the serialization time estimation

Signed-off-by: Jihoon Son <ghoonson@gmail.com>

* Exclude the wait time on limiter

* Exclude batch size computing time as well

* fix outdated comment; add more comments

* Add a function that takes a TimeTrackingIterator

* make stuff private

---------

Signed-off-by: Jihoon Son <ghoonson@gmail.com>
  • Loading branch information
jihoonson committed Jul 18, 2024
1 parent 7e899a0 commit f8439b4
Showing 1 changed file with 52 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -272,9 +272,40 @@ abstract class RapidsShuffleThreadedWriterBase[K, V](
*/
private var stopping = false

val diskBlockObjectWriters = new mutable.HashMap[Int, (Int, DiskBlockObjectWriter)]()
private val diskBlockObjectWriters = new mutable.HashMap[Int, (Int, DiskBlockObjectWriter)]()

/**
* Simple wrapper that tracks the time spent iterating the given iterator.
*/
private class TimeTrackingIterator(delegate: Iterator[Product2[K, V]])
extends Iterator[Product2[K, V]] {

private var iterateTimeNs: Long = 0L

override def hasNext: Boolean = {
val start = System.nanoTime()
val ret = delegate.hasNext
iterateTimeNs += System.nanoTime() - start
ret
}

override def next(): Product2[K, V] = {
val start = System.nanoTime()
val ret = delegate.next
iterateTimeNs += System.nanoTime() - start
ret
}

def getIterateTimeNs: Long = iterateTimeNs
}

override def write(records: Iterator[Product2[K, V]]): Unit = {
// Iterating the `records` may involve some heavy computations.
// TimeTrackingIterator is used to track how much time we spend for such computations.
write(new TimeTrackingIterator(records))
}

private def write(records: TimeTrackingIterator): Unit = {
withResource(new NvtxRange("ThreadedWriter.write", NvtxColor.RED)) { _ =>
withResource(new NvtxRange("compute", NvtxColor.GREEN)) { _ =>
val mapOutputWriter = shuffleExecutorComponents.createMapOutputWriter(
Expand Down Expand Up @@ -305,15 +336,18 @@ abstract class RapidsShuffleThreadedWriterBase[K, V](

// we call write on every writer for every record in parallel
val writeFutures = new mutable.Queue[Future[Unit]]
val writeTimeStart: Long = System.nanoTime()
// Accumulated record write time as if they were sequential
val recordWriteTime: AtomicLong = new AtomicLong(0L)
var computeTime: Long = 0L
// Time spent waiting on the limiter
var waitTimeOnLimiterNs: Long = 0L
// Time spent computing ColumnarBatch sizes
var batchSizeComputeTimeNs: Long = 0L
// Timestamp when the main processing begins
val processingStart: Long = System.nanoTime()
try {
while (records.hasNext) {
// get the record
val computeStartTime = System.nanoTime()
val record = records.next()
computeTime += System.nanoTime() - computeStartTime
val key = record._1
val value = record._2
val reducePartitionId: Int = partitioner.getPartition(key)
Expand All @@ -326,14 +360,18 @@ abstract class RapidsShuffleThreadedWriterBase[K, V](
} else {
// we close batches actively in the `records` iterator as we get the next batch
// this makes sure it is kept alive while a task is able to handle it.
val sizeComputeStart = System.nanoTime()
val (cb, size) = value match {
case columnarBatch: ColumnarBatch =>
(SlicedGpuColumnVector.incRefCount(columnarBatch),
SlicedGpuColumnVector.getTotalHostMemoryUsed(columnarBatch))
case _ =>
(null, 0L)
}
val waitOnLimiterStart = System.nanoTime()
batchSizeComputeTimeNs += waitOnLimiterStart - sizeComputeStart
limiter.acquireOrBlock(size)
waitTimeOnLimiterNs += System.nanoTime() - waitOnLimiterStart
writeFutures += RapidsShuffleInternalManagerBase.queueWriteTask(slotNum, () => {
withResource(cb) { _ =>
try {
Expand Down Expand Up @@ -371,9 +409,15 @@ abstract class RapidsShuffleThreadedWriterBase[K, V](
}
}

// writeTime is the amount of time it took to push bytes through the stream
// minus the amount of time it took to get the batch from the upstream execs
val writeTimeNs = (System.nanoTime() - writeTimeStart) - computeTime
// writeTimeNs is an approximation of the amount of time we spent in
// DiskBlockObjectWriter.write, which involves serializing records and writing them
// on disk. As we use multiple threads for writing, writeTimeNs is
// estimated by 'the total amount of time it took to finish processing the entire logic
// above' minus 'the amount of time it took to do anything expensive other than the
// serialization and the write. The latter involves computations in upstream execs,
// ColumnarBatch size estimation, and the time blocked on the limiter.
val writeTimeNs = (System.nanoTime() - processingStart) -
records.getIterateTimeNs - batchSizeComputeTimeNs - waitTimeOnLimiterNs

val combineTimeStart = System.nanoTime()
val pl = writePartitionedData(mapOutputWriter)
Expand Down

0 comments on commit f8439b4

Please sign in to comment.