-
Notifications
You must be signed in to change notification settings - Fork 234
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
More accurate estimation for the result serialization time in RapidsShuffleThreadedWriterBase #11180
More accurate estimation for the result serialization time in RapidsShuffleThreadedWriterBase #11180
Changes from 5 commits
6cf48ab
0552fa8
f39353b
fc299dc
3b076cf
5c8d107
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -274,7 +274,36 @@ abstract class RapidsShuffleThreadedWriterBase[K, V]( | |
|
||
val diskBlockObjectWriters = new mutable.HashMap[Int, (Int, DiskBlockObjectWriter)]() | ||
|
||
/** | ||
* Simple wrapper that tracks the time spent iterating the given iterator. | ||
*/ | ||
class TimeTrackingIterator(delegate: Iterator[Product2[K, V]]) extends Iterator[Product2[K, V]] { | ||
private var iterateTimeNs: Long = 0L | ||
|
||
override def hasNext: Boolean = { | ||
val start = System.nanoTime() | ||
val ret = delegate.hasNext | ||
iterateTimeNs += System.nanoTime() - start | ||
ret | ||
} | ||
|
||
override def next(): Product2[K, V] = { | ||
val start = System.nanoTime() | ||
val ret = delegate.next | ||
iterateTimeNs += System.nanoTime() - start | ||
ret | ||
} | ||
|
||
def getIterateTimeNs: Long = iterateTimeNs | ||
} | ||
|
||
override def write(records: Iterator[Product2[K, V]]): Unit = { | ||
// Iterating the `records` may involve some heavy computations. | ||
// TimeTrackingIterator is used to track how much time we spend for such computations. | ||
write(new TimeTrackingIterator(records)) | ||
} | ||
|
||
def write(records: TimeTrackingIterator): Unit = { | ||
withResource(new NvtxRange("ThreadedWriter.write", NvtxColor.RED)) { _ => | ||
withResource(new NvtxRange("compute", NvtxColor.GREEN)) { _ => | ||
val mapOutputWriter = shuffleExecutorComponents.createMapOutputWriter( | ||
|
@@ -305,15 +334,18 @@ abstract class RapidsShuffleThreadedWriterBase[K, V]( | |
|
||
// we call write on every writer for every record in parallel | ||
val writeFutures = new mutable.Queue[Future[Unit]] | ||
val writeTimeStart: Long = System.nanoTime() | ||
// Accumulated record write time as if they were sequential | ||
val recordWriteTime: AtomicLong = new AtomicLong(0L) | ||
var computeTime: Long = 0L | ||
// Time spent waiting on the limiter | ||
var waitTimeOnLimiterNs: Long = 0L | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. for future work, we may want to expose There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks. This will be useful! |
||
// Time spent computing ColumnarBatch sizes | ||
var batchSizeComputeTimeNs: Long = 0L | ||
// Timestamp when the main processing begins | ||
val processingStart: Long = System.nanoTime() | ||
try { | ||
while (records.hasNext) { | ||
// get the record | ||
val computeStartTime = System.nanoTime() | ||
val record = records.next() | ||
computeTime += System.nanoTime() - computeStartTime | ||
val key = record._1 | ||
val value = record._2 | ||
val reducePartitionId: Int = partitioner.getPartition(key) | ||
|
@@ -326,14 +358,18 @@ abstract class RapidsShuffleThreadedWriterBase[K, V]( | |
} else { | ||
// we close batches actively in the `records` iterator as we get the next batch | ||
// this makes sure it is kept alive while a task is able to handle it. | ||
val sizeComputeStart = System.nanoTime() | ||
val (cb, size) = value match { | ||
case columnarBatch: ColumnarBatch => | ||
(SlicedGpuColumnVector.incRefCount(columnarBatch), | ||
SlicedGpuColumnVector.getTotalHostMemoryUsed(columnarBatch)) | ||
case _ => | ||
(null, 0L) | ||
} | ||
val waitOnLimiterStart = System.nanoTime() | ||
batchSizeComputeTimeNs += waitOnLimiterStart - sizeComputeStart | ||
limiter.acquireOrBlock(size) | ||
waitTimeOnLimiterNs += System.nanoTime() - waitOnLimiterStart | ||
writeFutures += RapidsShuffleInternalManagerBase.queueWriteTask(slotNum, () => { | ||
withResource(cb) { _ => | ||
try { | ||
|
@@ -371,9 +407,15 @@ abstract class RapidsShuffleThreadedWriterBase[K, V]( | |
} | ||
} | ||
|
||
// writeTime is the amount of time it took to push bytes through the stream | ||
// minus the amount of time it took to get the batch from the upstream execs | ||
val writeTimeNs = (System.nanoTime() - writeTimeStart) - computeTime | ||
// writeTimeNs is an approximation of the amount of time we spent in | ||
// DiskBlockObjectWriter.write, which involves serializing records and writing them | ||
// on disk. As we use multiple threads for writing, writeTimeNs is | ||
// estimated by 'the total amount of time it took to finish processing the entire logic | ||
// above' minus 'the amount of time it took to do anything expensive other than the | ||
// serialization and the write. The latter involves computations in upstream execs, | ||
// ColumnarBatch size estimation, and the time blocked on the limiter. | ||
val writeTimeNs = (System.nanoTime() - processingStart) - | ||
records.getIterateTimeNs - batchSizeComputeTimeNs - waitTimeOnLimiterNs | ||
|
||
val combineTimeStart = System.nanoTime() | ||
val pl = writePartitionedData(mapOutputWriter) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit, lets mark this
private
, I like the addition of the new method.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Made this and a couple of others private.