From 26b312b13de8f75684bb1d02898c31d9b29282d2 Mon Sep 17 00:00:00 2001 From: Ilya Ganelin Date: Fri, 9 Jan 2015 19:12:02 -0500 Subject: [PATCH] Debugging tests --- .../main/scala/org/apache/spark/Aggregator.scala | 8 ++++---- .../org/apache/spark/api/python/PythonRDD.scala | 4 ++-- .../scala/org/apache/spark/executor/Executor.scala | 2 +- .../org/apache/spark/executor/TaskMetrics.scala | 13 +++++-------- .../scala/org/apache/spark/rdd/CoGroupedRDD.scala | 4 ++-- .../spark/shuffle/hash/HashShuffleReader.scala | 4 ++-- .../apache/spark/storage/BlockObjectWriter.scala | 8 ++++---- .../spark/storage/ShuffleBlockFetcherIterator.scala | 8 ++++---- .../spark/util/collection/ExternalSorter.scala | 8 ++++---- 9 files changed, 28 insertions(+), 31 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/Aggregator.scala b/core/src/main/scala/org/apache/spark/Aggregator.scala index 09eb9605fb799..3b684bbeceaf2 100644 --- a/core/src/main/scala/org/apache/spark/Aggregator.scala +++ b/core/src/main/scala/org/apache/spark/Aggregator.scala @@ -61,8 +61,8 @@ case class Aggregator[K, V, C] ( // Update task metrics if context is not null // TODO: Make context non optional in a future release Option(context).foreach { c => - c.taskMetrics.memoryBytesSpilled += combiners.memoryBytesSpilled - c.taskMetrics.diskBytesSpilled += combiners.diskBytesSpilled + c.taskMetrics.incMemoryBytesSpilled(combiners.memoryBytesSpilled) + c.taskMetrics.incDiskBytesSpilled(combiners.diskBytesSpilled) } combiners.iterator } @@ -95,8 +95,8 @@ case class Aggregator[K, V, C] ( // Update task metrics if context is not null // TODO: Make context non-optional in a future release Option(context).foreach { c => - c.taskMetrics.memoryBytesSpilled += combiners.memoryBytesSpilled - c.taskMetrics.diskBytesSpilled += combiners.diskBytesSpilled + c.taskMetrics.incMemoryBytesSpilled(combiners.memoryBytesSpilled) + c.taskMetrics.incDiskBytesSpilled(combiners.diskBytesSpilled) } combiners.iterator } diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index bad40e6529f74..4ac666c54fbcd 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -125,8 +125,8 @@ private[spark] class PythonRDD( init, finish)) val memoryBytesSpilled = stream.readLong() val diskBytesSpilled = stream.readLong() - context.taskMetrics.memoryBytesSpilled += memoryBytesSpilled - context.taskMetrics.diskBytesSpilled += diskBytesSpilled + context.taskMetrics.incMemoryBytesSpilled(memoryBytesSpilled) + context.taskMetrics.incDiskBytesSpilled(diskBytesSpilled) read() case SpecialLengths.PYTHON_EXCEPTION_THROWN => // Signals that an exception has been thrown in python diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 3a5d2638a32e3..4d95386b7db53 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -370,7 +370,7 @@ private[spark] class Executor( if (!taskRunner.attemptedTask.isEmpty) { Option(taskRunner.task).flatMap(_.metrics).foreach { metrics => metrics.updateShuffleReadMetrics - metrics.jvmGCTime = curGCTime - taskRunner.startGCTime + metrics.incJvmGCTime(curGCTime - taskRunner.startGCTime) if (isLocal) { // JobProgressListener will hold an reference of it during // onExecutorMetricsUpdate(), then JobProgressListener can not see diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala index 5903c1091e795..869abe92b04f5 100644 --- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala @@ -164,10 +164,10 @@ class TaskMetrics extends Serializable { private[spark] def updateShuffleReadMetrics() = synchronized { val merged = new ShuffleReadMetrics() for (depMetrics <- depsShuffleReadMetrics) { - merged.fetchWaitTime += depMetrics.fetchWaitTime - merged.localBlocksFetched += depMetrics.localBlocksFetched - merged.remoteBlocksFetched += depMetrics.remoteBlocksFetched - merged.remoteBytesRead += depMetrics.remoteBytesRead + merged.incFetchWaitTime(depMetrics.fetchWaitTime) + merged.incLocalBlocksFetched(depMetrics.localBlocksFetched) + merged.incRemoteBlocksFetched(depMetrics.remoteBlocksFetched) + merged.incRemoteBytesRead(depMetrics.remoteBytesRead) } _shuffleReadMetrics = Some(merged) } @@ -272,10 +272,7 @@ class ShuffleReadMetrics extends Serializable { /** * Number of blocks fetched in this shuffle by this task (remote or local) */ - private var _totalBlocksFetched: Int = remoteBlocksFetched + localBlocksFetched - def totalBlocksFetched = _totalBlocksFetched - def incTotalBlocksFetched(value: Int) = _totalBlocksFetched += value - def decTotalBlocksFetched(value: Int) = _totalBlocksFetched -= value + def totalBlocksFetched = _remoteBlocksFetched + _localBlocksFetched } /** diff --git a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala index 70edf191d928a..07398a6fa62f6 100644 --- a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala @@ -159,8 +159,8 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part: for ((it, depNum) <- rddIterators) { map.insertAll(it.map(pair => (pair._1, new CoGroupValue(pair._2, depNum)))) } - context.taskMetrics.memoryBytesSpilled += map.memoryBytesSpilled - context.taskMetrics.diskBytesSpilled += map.diskBytesSpilled + context.taskMetrics.incMemoryBytesSpilled(map.memoryBytesSpilled) + context.taskMetrics.incDiskBytesSpilled(map.diskBytesSpilled) new InterruptibleIterator(context, map.iterator.asInstanceOf[Iterator[(K, Array[Iterable[_]])]]) } diff --git a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleReader.scala b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleReader.scala index de72148ccc7ac..41bafabde05b9 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleReader.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleReader.scala @@ -59,8 +59,8 @@ private[spark] class HashShuffleReader[K, C]( // the ExternalSorter won't spill to disk. val sorter = new ExternalSorter[K, C, C](ordering = Some(keyOrd), serializer = Some(ser)) sorter.insertAll(aggregatedIter) - context.taskMetrics.memoryBytesSpilled += sorter.memoryBytesSpilled - context.taskMetrics.diskBytesSpilled += sorter.diskBytesSpilled + context.taskMetrics.incMemoryBytesSpilled(sorter.memoryBytesSpilled) + context.taskMetrics.incDiskBytesSpilled(sorter.diskBytesSpilled) sorter.iterator case None => aggregatedIter diff --git a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala index 9c469370ffe1f..3198d766fca37 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala @@ -160,14 +160,14 @@ private[spark] class DiskBlockObjectWriter( } finalPosition = file.length() // In certain compression codecs, more bytes are written after close() is called - writeMetrics.shuffleBytesWritten += (finalPosition - reportedPosition) + writeMetrics.incShuffleBytesWritten(finalPosition - reportedPosition) } // Discard current writes. We do this by flushing the outstanding writes and then // truncating the file to its initial position. override def revertPartialWritesAndClose() { try { - writeMetrics.shuffleBytesWritten -= (reportedPosition - initialPosition) + writeMetrics.decShuffleBytesWritten(reportedPosition - initialPosition) if (initialized) { objOut.flush() @@ -212,14 +212,14 @@ private[spark] class DiskBlockObjectWriter( */ private def updateBytesWritten() { val pos = channel.position() - writeMetrics.shuffleBytesWritten += (pos - reportedPosition) + writeMetrics.incShuffleBytesWritten(pos - reportedPosition) reportedPosition = pos } private def callWithTiming(f: => Unit) = { val start = System.nanoTime() f - writeMetrics.shuffleWriteTime += (System.nanoTime() - start) + writeMetrics.incShuffleWriteTime(System.nanoTime() - start) } // For testing diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala index 2499c11a65b0e..ab9ee4f0096bf 100644 --- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala +++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala @@ -156,8 +156,8 @@ final class ShuffleBlockFetcherIterator( // This needs to be released after use. buf.retain() results.put(new SuccessFetchResult(BlockId(blockId), sizeMap(blockId), buf)) - shuffleMetrics.remoteBytesRead += buf.size - shuffleMetrics.remoteBlocksFetched += 1 + shuffleMetrics.incRemoteBytesRead(buf.size) + shuffleMetrics.incRemoteBlocksFetched(1) } logTrace("Got remote block " + blockId + " after " + Utils.getUsedTimeMs(startTime)) } @@ -233,7 +233,7 @@ final class ShuffleBlockFetcherIterator( val blockId = iter.next() try { val buf = blockManager.getBlockData(blockId) - shuffleMetrics.localBlocksFetched += 1 + shuffleMetrics.incLocalBlocksFetched(1) buf.retain() results.put(new SuccessFetchResult(blockId, 0, buf)) } catch { @@ -277,7 +277,7 @@ final class ShuffleBlockFetcherIterator( currentResult = results.take() val result = currentResult val stopFetchWait = System.currentTimeMillis() - shuffleMetrics.fetchWaitTime += (stopFetchWait - startFetchWait) + shuffleMetrics.incFetchWaitTime(stopFetchWait - startFetchWait) result match { case SuccessFetchResult(_, size, _) => bytesInFlight -= size diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala index 15bda1c9cc29c..6ba03841f746b 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala @@ -757,12 +757,12 @@ private[spark] class ExternalSorter[K, V, C]( } } - context.taskMetrics.memoryBytesSpilled += memoryBytesSpilled - context.taskMetrics.diskBytesSpilled += diskBytesSpilled + context.taskMetrics.incMemoryBytesSpilled(memoryBytesSpilled) + context.taskMetrics.incDiskBytesSpilled(diskBytesSpilled) context.taskMetrics.shuffleWriteMetrics.filter(_ => bypassMergeSort).foreach { m => if (curWriteMetrics != null) { - m.shuffleBytesWritten += curWriteMetrics.shuffleBytesWritten - m.shuffleWriteTime += curWriteMetrics.shuffleWriteTime + m.incShuffleBytesWritten(curWriteMetrics.shuffleBytesWritten) + m.incShuffleWriteTime(curWriteMetrics.shuffleWriteTime) } }