Skip to content

Commit

Permalink
Use Guava's top k implementation rather than our BoundedPriorityQueue…
Browse files Browse the repository at this point in the history
… based implementation. Also updated the documentation for top and takeOrdered.

Guava's top k implementation (in Ordering) is much faster than the BoundedPriorityQueue implementation for roughly sorted input (10 - 20X faster), and still faster for purely random input (2 - 5X).
  • Loading branch information
rxin committed Mar 26, 2014
1 parent 007a733 commit 0d11844
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 16 deletions.
49 changes: 33 additions & 16 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -927,32 +927,49 @@ abstract class RDD[T: ClassTag](
}

/**
* Returns the top K elements from this RDD as defined by
* the specified implicit Ordering[T].
* Returns the top K (largest) elements from this RDD as defined by the specified
* implicit Ordering[T]. This does the opposite of [[takeOrdered]]. For example:
* {{{
* sc.parallelize([10, 4, 2, 12, 3]).top(1)
* // returns [12]
*
* sc.parallelize([2, 3, 4, 5, 6]).top(2)
* // returns [6, 5]
* }}}
*
* @param num the number of top elements to return
* @param ord the implicit ordering for T
* @return an array of top elements
*/
def top(num: Int)(implicit ord: Ordering[T]): Array[T] = {
mapPartitions { items =>
val queue = new BoundedPriorityQueue[T](num)
queue ++= items
Iterator.single(queue)
}.reduce { (queue1, queue2) =>
queue1 ++= queue2
queue1
}.toArray.sorted(ord.reverse)
}
def top(num: Int)(implicit ord: Ordering[T]): Array[T] = takeOrdered(num)(ord.reverse)

/**
* Returns the first K elements from this RDD as defined by
* the specified implicit Ordering[T] and maintains the
* ordering.
* Returns the first K (smallest) elements from this RDD as defined by the specified
* implicit Ordering[T] and maintains the ordering. This does the opposite of [[top]].
* For example:
* {{{
* sc.parallelize([10, 4, 2, 12, 3]).takeOrdered(1)
* // returns [12]
*
* sc.parallelize([2, 3, 4, 5, 6]).takeOrdered(2)
* // returns [2, 3]
* }}}
*
* @param num the number of top elements to return
* @param ord the implicit ordering for T
* @return an array of top elements
*/
def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = top(num)(ord.reverse)
def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = {
mapPartitions { items =>
// Priority keeps the largest elements, so let's reverse the ordering.
val queue = new BoundedPriorityQueue[T](num)(ord.reverse)
queue ++= util.collection.Utils.takeOrdered(items, num)(ord)
Iterator.single(queue)
}.reduce { (queue1, queue2) =>
queue1 ++= queue2
queue1
}.toArray.sorted(ord)
}

/**
* Returns the max of this RDD as defined by the implicit Ordering[T].
Expand Down
39 changes: 39 additions & 0 deletions core/src/main/scala/org/apache/spark/util/collection/Utils.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.util.collection

import scala.collection.JavaConversions.{collectionAsScalaIterable, asJavaIterator}

import com.google.common.collect.{Ordering => GuavaOrdering}

/**
* Utility functions for collections.
*/
private[spark] object Utils {

/**
* Returns the first K elements from the input as defined by the specified implicit Ordering[T]
* and maintains the ordering.
*/
def takeOrdered[T](input: Iterator[T], num: Int)(implicit ord: Ordering[T]): Iterator[T] = {
val ordering = new GuavaOrdering[T] {
override def compare(l: T, r: T) = ord.compare(l, r)
}
collectionAsScalaIterable(ordering.leastOf(asJavaIterator(input), num)).iterator
}
}

0 comments on commit 0d11844

Please sign in to comment.