Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

More accurate estimation for the result serialization time in RapidsShuffleThreadedWriterBase #11180

Merged
merged 6 commits into from
Jul 18, 2024
Merged
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,26 @@ abstract class RapidsShuffleThreadedWriterBase[K, V](
val diskBlockObjectWriters = new mutable.HashMap[Int, (Int, DiskBlockObjectWriter)]()

override def write(records: Iterator[Product2[K, V]]): Unit = {
class TimeTrackingIterator extends Iterator[Product2[K, V]] {
var iterateTimeNs: Long = 0L

override def hasNext: Boolean = {
val start = System.nanoTime()
val ret = records.hasNext
jihoonson marked this conversation as resolved.
Show resolved Hide resolved
iterateTimeNs += System.nanoTime() - start
ret
}

override def next(): Product2[K, V] = {
val start = System.nanoTime()
val ret = records.next
iterateTimeNs += System.nanoTime() - start
ret
}
}

val timeTrackingIterator = new TimeTrackingIterator

withResource(new NvtxRange("ThreadedWriter.write", NvtxColor.RED)) { _ =>
withResource(new NvtxRange("compute", NvtxColor.GREEN)) { _ =>
val mapOutputWriter = shuffleExecutorComponents.createMapOutputWriter(
Expand All @@ -283,7 +303,7 @@ abstract class RapidsShuffleThreadedWriterBase[K, V](
numPartitions)
try {
var openTimeNs = 0L
val partLengths = if (!records.hasNext) {
val partLengths = if (!timeTrackingIterator.hasNext) {
commitAllPartitions(mapOutputWriter, true /*empty checksum*/)
} else {
// per reduce partition id
Expand All @@ -305,15 +325,14 @@ abstract class RapidsShuffleThreadedWriterBase[K, V](

// we call write on every writer for every record in parallel
val writeFutures = new mutable.Queue[Future[Unit]]
val writeTimeStart: Long = System.nanoTime()
val recordWriteTime: AtomicLong = new AtomicLong(0L)
var computeTime: Long = 0L
var waitTimeOnLimiterNs: Long = 0L
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for future work, we may want to expose waitTimeOnLimiterNs as a metric. It's hard to figure out we are waiting for a limit otherwise. Filed #11187

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. This will be useful!

var batchSizeComputeTimeNs: Long = 0L
val writeTimeStart: Long = System.nanoTime()
try {
while (records.hasNext) {
while (timeTrackingIterator.hasNext) {
// get the record
val computeStartTime = System.nanoTime()
val record = records.next()
computeTime += System.nanoTime() - computeStartTime
val record = timeTrackingIterator.next()
val key = record._1
val value = record._2
val reducePartitionId: Int = partitioner.getPartition(key)
Expand All @@ -326,14 +345,18 @@ abstract class RapidsShuffleThreadedWriterBase[K, V](
} else {
// we close batches actively in the `records` iterator as we get the next batch
// this makes sure it is kept alive while a task is able to handle it.
val sizeComputeStart = System.nanoTime()
val (cb, size) = value match {
case columnarBatch: ColumnarBatch =>
(SlicedGpuColumnVector.incRefCount(columnarBatch),
SlicedGpuColumnVector.getTotalHostMemoryUsed(columnarBatch))
case _ =>
(null, 0L)
}
val waitOnLimiterStart = System.nanoTime()
batchSizeComputeTimeNs += waitOnLimiterStart - sizeComputeStart
limiter.acquireOrBlock(size)
waitTimeOnLimiterNs += System.nanoTime() - waitOnLimiterStart
writeFutures += RapidsShuffleInternalManagerBase.queueWriteTask(slotNum, () => {
withResource(cb) { _ =>
try {
Expand Down Expand Up @@ -373,7 +396,8 @@ abstract class RapidsShuffleThreadedWriterBase[K, V](

// writeTime is the amount of time it took to push bytes through the stream
// minus the amount of time it took to get the batch from the upstream execs
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment is out of date

Copy link
Collaborator Author

@jihoonson jihoonson Jul 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, fixed now.

val writeTimeNs = (System.nanoTime() - writeTimeStart) - computeTime
val writeTimeNs = (System.nanoTime() - writeTimeStart) -
timeTrackingIterator.iterateTimeNs - batchSizeComputeTimeNs - waitTimeOnLimiterNs

val combineTimeStart = System.nanoTime()
val pl = writePartitionedData(mapOutputWriter)
Expand Down