From 74ca2469f021490fa2eec2e752ba20cf6485b2e6 Mon Sep 17 00:00:00 2001 From: Tianshuo Deng Date: Sun, 16 Nov 2014 22:37:33 -0800 Subject: [PATCH 1/4] fix elements read count --- .../scala/org/apache/spark/util/collection/ExternalSorter.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala index c1ce13683b569..01312931637bd 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala @@ -233,10 +233,12 @@ private[spark] class ExternalSorter[K, V, C]( if (usingMap) { if (maybeSpill(map, map.estimateSize())) { map = new SizeTrackingAppendOnlyMap[(Int, K), C] + elementsRead = 0L } } else { if (maybeSpill(buffer, buffer.estimateSize())) { buffer = new SizeTrackingPairBuffer[(Int, K), C] + elementsRead = 0L } } } From bb7ff28cab383b9773a7163c8afe938caa47720f Mon Sep 17 00:00:00 2001 From: Tianshuo Deng Date: Mon, 17 Nov 2014 19:06:35 -0800 Subject: [PATCH 2/4] update elemetsRead through addElementsRead method --- .../spark/util/collection/ExternalAppendOnlyMap.scala | 8 ++------ .../apache/spark/util/collection/ExternalSorter.scala | 10 ++-------- .../org/apache/spark/util/collection/Spillable.scala | 7 ++++++- 3 files changed, 10 insertions(+), 15 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala index 26fa0cb6d7bde..1227b6072cbdf 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala @@ -76,9 +76,7 @@ class ExternalAppendOnlyMap[K, V, C]( private val sparkConf = SparkEnv.get.conf private val diskBlockManager = blockManager.diskBlockManager - // Number of pairs inserted since last spill; note that we count them even if a value is merged - // with a previous key in case we're doing something like groupBy where the result grows - protected[this] var elementsRead = 0L + /** * Size of object batches when reading/writing from serializers. @@ -132,7 +130,7 @@ class ExternalAppendOnlyMap[K, V, C]( currentMap = new SizeTrackingAppendOnlyMap[K, C] } currentMap.changeValue(curEntry._1, update) - elementsRead += 1 + addElementsRead } } @@ -209,8 +207,6 @@ class ExternalAppendOnlyMap[K, V, C]( } spilledMaps.append(new DiskMapIterator(file, blockId, batchSizes)) - - elementsRead = 0 } def diskBytesSpilled: Long = _diskBytesSpilled diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala index 01312931637bd..2d8edb33486a5 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala @@ -119,10 +119,6 @@ private[spark] class ExternalSorter[K, V, C]( private var map = new SizeTrackingAppendOnlyMap[(Int, K), C] private var buffer = new SizeTrackingPairBuffer[(Int, K), C] - // Number of pairs read from input since last spill; note that we count them even if a value is - // merged with a previous key in case we're doing something like groupBy where the result grows - protected[this] var elementsRead = 0L - // Total spilling statistics private var _diskBytesSpilled = 0L @@ -204,7 +200,7 @@ private[spark] class ExternalSorter[K, V, C]( if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2) } while (records.hasNext) { - elementsRead += 1 + addElementsRead kv = records.next() map.changeValue((getPartition(kv._1), kv._1), update) maybeSpillCollection(usingMap = true) @@ -212,7 +208,7 @@ private[spark] class ExternalSorter[K, V, C]( } else { // Stick values into our buffer while (records.hasNext) { - elementsRead += 1 + addElementsRead val kv = records.next() buffer.insert((getPartition(kv._1), kv._1), kv._2.asInstanceOf[C]) maybeSpillCollection(usingMap = false) @@ -233,12 +229,10 @@ private[spark] class ExternalSorter[K, V, C]( if (usingMap) { if (maybeSpill(map, map.estimateSize())) { map = new SizeTrackingAppendOnlyMap[(Int, K), C] - elementsRead = 0L } } else { if (maybeSpill(buffer, buffer.estimateSize())) { buffer = new SizeTrackingPairBuffer[(Int, K), C] - elementsRead = 0L } } } diff --git a/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala b/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala index d7dccd4af8c6e..d3b8964c88915 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala @@ -36,7 +36,11 @@ private[spark] trait Spillable[C] { protected def spill(collection: C): Unit // Number of elements read from input since last spill - protected var elementsRead: Long + protected var elementsRead: Long = 0 + + // subclass calls this method to notify reading an element + // it's used to check spilling frequency + protected def addElementsRead = elementsRead += 1 // Memory manager that can be used to acquire/release memory private[this] val shuffleMemoryManager = SparkEnv.get.shuffleMemoryManager @@ -76,6 +80,7 @@ private[spark] trait Spillable[C] { spill(collection) + elementsRead = 0 // Keep track of spills, and release memory _memoryBytesSpilled += currentMemory releaseMemoryForThisThread() From 782c7dec435f85219b911e4b12caa83b5348a587 Mon Sep 17 00:00:00 2001 From: Tianshuo Deng Date: Tue, 18 Nov 2014 10:45:58 -0800 Subject: [PATCH 3/4] make elementsRead private, fix comment --- .../apache/spark/util/collection/Spillable.scala | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala b/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala index d3b8964c88915..ef37b8f63ceab 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala @@ -36,11 +36,11 @@ private[spark] trait Spillable[C] { protected def spill(collection: C): Unit // Number of elements read from input since last spill - protected var elementsRead: Long = 0 + protected def elementsRead = _elementsRead - // subclass calls this method to notify reading an element - // it's used to check spilling frequency - protected def addElementsRead = elementsRead += 1 + // Called by subclasses every time a record is read + // It's used for checking spilling frequency + protected def addElementsRead() = { _elementsRead += 1 } // Memory manager that can be used to acquire/release memory private[this] val shuffleMemoryManager = SparkEnv.get.shuffleMemoryManager @@ -48,6 +48,9 @@ private[spark] trait Spillable[C] { // What threshold of elementsRead we start estimating collection size at private[this] val trackMemoryThreshold = 1000 + // Number of elements read from input since last spill + private[this] var _elementsRead = 0L + // How much of the shared memory pool this collection has claimed private[this] var myMemoryThreshold = 0L @@ -80,7 +83,7 @@ private[spark] trait Spillable[C] { spill(collection) - elementsRead = 0 + _elementsRead = 0 // Keep track of spills, and release memory _memoryBytesSpilled += currentMemory releaseMemoryForThisThread() From 7b56ca021be58e6a3eb698ab2e4d2c50274286f9 Mon Sep 17 00:00:00 2001 From: Tianshuo Deng Date: Tue, 18 Nov 2014 13:54:45 -0800 Subject: [PATCH 4/4] fix method signature --- .../apache/spark/util/collection/ExternalAppendOnlyMap.scala | 2 +- .../org/apache/spark/util/collection/ExternalSorter.scala | 4 ++-- .../scala/org/apache/spark/util/collection/Spillable.scala | 4 ++-- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala index 1227b6072cbdf..7afdc00e5b3cb 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala @@ -130,7 +130,7 @@ class ExternalAppendOnlyMap[K, V, C]( currentMap = new SizeTrackingAppendOnlyMap[K, C] } currentMap.changeValue(curEntry._1, update) - addElementsRead + addElementsRead() } } diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala index 2d8edb33486a5..c617ff5c51d04 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala @@ -200,7 +200,7 @@ private[spark] class ExternalSorter[K, V, C]( if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2) } while (records.hasNext) { - addElementsRead + addElementsRead() kv = records.next() map.changeValue((getPartition(kv._1), kv._1), update) maybeSpillCollection(usingMap = true) @@ -208,7 +208,7 @@ private[spark] class ExternalSorter[K, V, C]( } else { // Stick values into our buffer while (records.hasNext) { - addElementsRead + addElementsRead() val kv = records.next() buffer.insert((getPartition(kv._1), kv._1), kv._2.asInstanceOf[C]) maybeSpillCollection(usingMap = false) diff --git a/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala b/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala index ef37b8f63ceab..4dbff95d8b957 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala @@ -36,11 +36,11 @@ private[spark] trait Spillable[C] { protected def spill(collection: C): Unit // Number of elements read from input since last spill - protected def elementsRead = _elementsRead + protected def elementsRead: Long = _elementsRead // Called by subclasses every time a record is read // It's used for checking spilling frequency - protected def addElementsRead() = { _elementsRead += 1 } + protected def addElementsRead(): Unit = { _elementsRead += 1 } // Memory manager that can be used to acquire/release memory private[this] val shuffleMemoryManager = SparkEnv.get.shuffleMemoryManager