Skip to content

Commit

Permalink
update elemetsRead through addElementsRead method
Browse files Browse the repository at this point in the history
  • Loading branch information
tsdeng committed Nov 18, 2014
1 parent 74ca246 commit bb7ff28
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,7 @@ class ExternalAppendOnlyMap[K, V, C](
private val sparkConf = SparkEnv.get.conf
private val diskBlockManager = blockManager.diskBlockManager

// Number of pairs inserted since last spill; note that we count them even if a value is merged
// with a previous key in case we're doing something like groupBy where the result grows
protected[this] var elementsRead = 0L


/**
* Size of object batches when reading/writing from serializers.
Expand Down Expand Up @@ -132,7 +130,7 @@ class ExternalAppendOnlyMap[K, V, C](
currentMap = new SizeTrackingAppendOnlyMap[K, C]
}
currentMap.changeValue(curEntry._1, update)
elementsRead += 1
addElementsRead
}
}

Expand Down Expand Up @@ -209,8 +207,6 @@ class ExternalAppendOnlyMap[K, V, C](
}

spilledMaps.append(new DiskMapIterator(file, blockId, batchSizes))

elementsRead = 0
}

def diskBytesSpilled: Long = _diskBytesSpilled
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,10 +119,6 @@ private[spark] class ExternalSorter[K, V, C](
private var map = new SizeTrackingAppendOnlyMap[(Int, K), C]
private var buffer = new SizeTrackingPairBuffer[(Int, K), C]

// Number of pairs read from input since last spill; note that we count them even if a value is
// merged with a previous key in case we're doing something like groupBy where the result grows
protected[this] var elementsRead = 0L

// Total spilling statistics
private var _diskBytesSpilled = 0L

Expand Down Expand Up @@ -204,15 +200,15 @@ private[spark] class ExternalSorter[K, V, C](
if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2)
}
while (records.hasNext) {
elementsRead += 1
addElementsRead
kv = records.next()
map.changeValue((getPartition(kv._1), kv._1), update)
maybeSpillCollection(usingMap = true)
}
} else {
// Stick values into our buffer
while (records.hasNext) {
elementsRead += 1
addElementsRead
val kv = records.next()
buffer.insert((getPartition(kv._1), kv._1), kv._2.asInstanceOf[C])
maybeSpillCollection(usingMap = false)
Expand All @@ -233,12 +229,10 @@ private[spark] class ExternalSorter[K, V, C](
if (usingMap) {
if (maybeSpill(map, map.estimateSize())) {
map = new SizeTrackingAppendOnlyMap[(Int, K), C]
elementsRead = 0L
}
} else {
if (maybeSpill(buffer, buffer.estimateSize())) {
buffer = new SizeTrackingPairBuffer[(Int, K), C]
elementsRead = 0L
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,11 @@ private[spark] trait Spillable[C] {
protected def spill(collection: C): Unit

// Number of elements read from input since last spill
protected var elementsRead: Long
protected var elementsRead: Long = 0

// subclass calls this method to notify reading an element
// it's used to check spilling frequency
protected def addElementsRead = elementsRead += 1

// Memory manager that can be used to acquire/release memory
private[this] val shuffleMemoryManager = SparkEnv.get.shuffleMemoryManager
Expand Down Expand Up @@ -76,6 +80,7 @@ private[spark] trait Spillable[C] {

spill(collection)

elementsRead = 0
// Keep track of spills, and release memory
_memoryBytesSpilled += currentMemory
releaseMemoryForThisThread()
Expand Down

0 comments on commit bb7ff28

Please sign in to comment.