Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

More accurate estimation for the result serialization time in RapidsShuffleThreadedWriterBase #11180

Merged
merged 6 commits into from
Jul 18, 2024
Merged
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,28 @@ abstract class RapidsShuffleThreadedWriterBase[K, V](
val diskBlockObjectWriters = new mutable.HashMap[Int, (Int, DiskBlockObjectWriter)]()

override def write(records: Iterator[Product2[K, V]]): Unit = {
// Iterating the `records` may involve some heavy computations.
// This iterator is used to track how much time we spend for such computations.
class TimeTrackingIterator extends Iterator[Product2[K, V]] {
var iterateTimeNs: Long = 0L

override def hasNext: Boolean = {
val start = System.nanoTime()
val ret = records.hasNext
jihoonson marked this conversation as resolved.
Show resolved Hide resolved
iterateTimeNs += System.nanoTime() - start
ret
}

override def next(): Product2[K, V] = {
val start = System.nanoTime()
val ret = records.next
iterateTimeNs += System.nanoTime() - start
ret
}
}

val timeTrackingIterator = new TimeTrackingIterator

withResource(new NvtxRange("ThreadedWriter.write", NvtxColor.RED)) { _ =>
withResource(new NvtxRange("compute", NvtxColor.GREEN)) { _ =>
val mapOutputWriter = shuffleExecutorComponents.createMapOutputWriter(
Expand All @@ -283,7 +305,7 @@ abstract class RapidsShuffleThreadedWriterBase[K, V](
numPartitions)
try {
var openTimeNs = 0L
val partLengths = if (!records.hasNext) {
val partLengths = if (!timeTrackingIterator.hasNext) {
commitAllPartitions(mapOutputWriter, true /*empty checksum*/)
} else {
// per reduce partition id
Expand All @@ -305,15 +327,18 @@ abstract class RapidsShuffleThreadedWriterBase[K, V](

// we call write on every writer for every record in parallel
val writeFutures = new mutable.Queue[Future[Unit]]
val writeTimeStart: Long = System.nanoTime()
// Accumulated record write time as if they were sequential
val recordWriteTime: AtomicLong = new AtomicLong(0L)
var computeTime: Long = 0L
// Time spent waiting on the limiter
var waitTimeOnLimiterNs: Long = 0L
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for future work, we may want to expose waitTimeOnLimiterNs as a metric. It's hard to figure out we are waiting for a limit otherwise. Filed #11187

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. This will be useful!

// Time spent computing ColumnarBatch sizes
var batchSizeComputeTimeNs: Long = 0L
// Timestamp when the main processing begins
val processingStart: Long = System.nanoTime()
try {
while (records.hasNext) {
while (timeTrackingIterator.hasNext) {
// get the record
val computeStartTime = System.nanoTime()
val record = records.next()
computeTime += System.nanoTime() - computeStartTime
val record = timeTrackingIterator.next()
val key = record._1
val value = record._2
val reducePartitionId: Int = partitioner.getPartition(key)
Expand All @@ -326,14 +351,18 @@ abstract class RapidsShuffleThreadedWriterBase[K, V](
} else {
// we close batches actively in the `records` iterator as we get the next batch
// this makes sure it is kept alive while a task is able to handle it.
val sizeComputeStart = System.nanoTime()
val (cb, size) = value match {
case columnarBatch: ColumnarBatch =>
(SlicedGpuColumnVector.incRefCount(columnarBatch),
SlicedGpuColumnVector.getTotalHostMemoryUsed(columnarBatch))
case _ =>
(null, 0L)
}
val waitOnLimiterStart = System.nanoTime()
batchSizeComputeTimeNs += waitOnLimiterStart - sizeComputeStart
limiter.acquireOrBlock(size)
waitTimeOnLimiterNs += System.nanoTime() - waitOnLimiterStart
writeFutures += RapidsShuffleInternalManagerBase.queueWriteTask(slotNum, () => {
withResource(cb) { _ =>
try {
Expand Down Expand Up @@ -371,9 +400,15 @@ abstract class RapidsShuffleThreadedWriterBase[K, V](
}
}

// writeTime is the amount of time it took to push bytes through the stream
// minus the amount of time it took to get the batch from the upstream execs
val writeTimeNs = (System.nanoTime() - writeTimeStart) - computeTime
// writeTimeNs is an approximation of the amount of time we spent in
// DiskBlockObjectWriter.write, which involves serializing records and writing them
// on disk. As we use multiple threads for writing, writeTimeNs is
// estimated by 'the total amount of time it took to finish processing the entire logic
// above' minus 'the amount of time it took to do anything expensive other than the
// serialization and the write. The latter involves computations in upstream execs,
// ColumnarBatch size estimation, and the time blocked on the limiter.
val writeTimeNs = (System.nanoTime() - processingStart) -
timeTrackingIterator.iterateTimeNs - batchSizeComputeTimeNs - waitTimeOnLimiterNs

val combineTimeStart = System.nanoTime()
val pl = writePartitionedData(mapOutputWriter)
Expand Down