Skip to content

Commit

Permalink
SPARK-2685. Update ExternalAppendOnlyMap to avoid buffer.remove()
Browse files Browse the repository at this point in the history
Replaces this with an O(1) operation that does not have to shift over
the whole tail of the array into the gap produced by the element removed.

Author: Matei Zaharia <matei@databricks.com>

Closes #1773 from mateiz/SPARK-2685 and squashes the following commits:

1ea028a [Matei Zaharia] Update comments in StreamBuffer and EAOM, and reuse ArrayBuffers
eb1abfd [Matei Zaharia] Update ExternalAppendOnlyMap to avoid buffer.remove()

(cherry picked from commit 066765d)
Signed-off-by: Matei Zaharia <matei@databricks.com>
  • Loading branch information
mateiz committed Aug 5, 2014
1 parent 4ed7b5a commit a092285
Showing 1 changed file with 35 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -286,30 +286,32 @@ class ExternalAppendOnlyMap[K, V, C](
private val inputStreams = (Seq(sortedMap) ++ spilledMaps).map(it => it.buffered)

inputStreams.foreach { it =>
val kcPairs = getMorePairs(it)
val kcPairs = new ArrayBuffer[(K, C)]
readNextHashCode(it, kcPairs)
if (kcPairs.length > 0) {
mergeHeap.enqueue(new StreamBuffer(it, kcPairs))
}
}

/**
* Fetch from the given iterator until a key of different hash is retrieved.
* Fill a buffer with the next set of keys with the same hash code from a given iterator. We
* read streams one hash code at a time to ensure we don't miss elements when they are merged.
*
* Assumes the given iterator is in sorted order of hash code.
*
* In the event of key hash collisions, this ensures no pairs are hidden from being merged.
* Assume the given iterator is in sorted order.
* @param it iterator to read from
* @param buf buffer to write the results into
*/
private def getMorePairs(it: BufferedIterator[(K, C)]): ArrayBuffer[(K, C)] = {
val kcPairs = new ArrayBuffer[(K, C)]
private def readNextHashCode(it: BufferedIterator[(K, C)], buf: ArrayBuffer[(K, C)]): Unit = {
if (it.hasNext) {
var kc = it.next()
kcPairs += kc
buf += kc
val minHash = hashKey(kc)
while (it.hasNext && it.head._1.hashCode() == minHash) {
kc = it.next()
kcPairs += kc
buf += kc
}
}
kcPairs
}

/**
Expand All @@ -321,14 +323,29 @@ class ExternalAppendOnlyMap[K, V, C](
while (i < buffer.pairs.length) {
val pair = buffer.pairs(i)
if (pair._1 == key) {
buffer.pairs.remove(i)
// Note that there's at most one pair in the buffer with a given key, since we always
// merge stuff in a map before spilling, so it's safe to return after the first we find
removeFromBuffer(buffer.pairs, i)
return mergeCombiners(baseCombiner, pair._2)
}
i += 1
}
baseCombiner
}

/**
* Remove the index'th element from an ArrayBuffer in constant time, swapping another element
* into its place. This is more efficient than the ArrayBuffer.remove method because it does
* not have to shift all the elements in the array over. It works for our array buffers because
* we don't care about the order of elements inside, we just want to search them for a key.
*/
private def removeFromBuffer[T](buffer: ArrayBuffer[T], index: Int): T = {
val elem = buffer(index)
buffer(index) = buffer(buffer.size - 1) // This also works if index == buffer.size - 1
buffer.reduceToSize(buffer.size - 1)
elem
}

/**
* Return true if there exists an input stream that still has unvisited pairs.
*/
Expand All @@ -346,7 +363,7 @@ class ExternalAppendOnlyMap[K, V, C](
val minBuffer = mergeHeap.dequeue()
val minPairs = minBuffer.pairs
val minHash = minBuffer.minKeyHash
val minPair = minPairs.remove(0)
val minPair = removeFromBuffer(minPairs, 0)
val minKey = minPair._1
var minCombiner = minPair._2
assert(hashKey(minPair) == minHash)
Expand All @@ -363,7 +380,7 @@ class ExternalAppendOnlyMap[K, V, C](
// Repopulate each visited stream buffer and add it back to the queue if it is non-empty
mergedBuffers.foreach { buffer =>
if (buffer.isEmpty) {
buffer.pairs ++= getMorePairs(buffer.iterator)
readNextHashCode(buffer.iterator, buffer.pairs)
}
if (!buffer.isEmpty) {
mergeHeap.enqueue(buffer)
Expand All @@ -375,10 +392,13 @@ class ExternalAppendOnlyMap[K, V, C](

/**
* A buffer for streaming from a map iterator (in-memory or on-disk) sorted by key hash.
* Each buffer maintains the lowest-ordered keys in the corresponding iterator. Due to
* hash collisions, it is possible for multiple keys to be "tied" for being the lowest.
* Each buffer maintains all of the key-value pairs with what is currently the lowest hash
* code among keys in the stream. There may be multiple keys if there are hash collisions.
* Note that because when we spill data out, we only spill one value for each key, there is
* at most one element for each key.
*
* StreamBuffers are ordered by the minimum key hash found across all of their own pairs.
* StreamBuffers are ordered by the minimum key hash currently available in their stream so
* that we can put them into a heap and sort that.
*/
private class StreamBuffer(
val iterator: BufferedIterator[(K, C)],
Expand Down

0 comments on commit a092285

Please sign in to comment.