Skip to content

Commit

Permalink
SPARK-1321 Use Guava's top k implementation rather than our BoundedPr…
Browse files Browse the repository at this point in the history
…iorityQueue based implementation

Also updated the documentation for top and takeOrdered.

On my simple test of sorting 100 million (Int, Int) tuples using Spark, Guava's top k implementation (in Ordering) is much faster than the BoundedPriorityQueue implementation for roughly sorted input (10 - 20X faster), and still faster for purely random input (2 - 5X).

Author: Reynold Xin <rxin@apache.org>

Closes #229 from rxin/takeOrdered and squashes the following commits:

0d11844 [Reynold Xin] Use Guava's top k implementation rather than our BoundedPriorityQueue based implementation. Also updated the documentation for top and takeOrdered.
  • Loading branch information
rxin committed Mar 26, 2014
1 parent 4f7d547 commit b859853
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 16 deletions.
49 changes: 33 additions & 16 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -927,32 +927,49 @@ abstract class RDD[T: ClassTag](
}

/**
* Returns the top K elements from this RDD as defined by
* the specified implicit Ordering[T].
* Returns the top K (largest) elements from this RDD as defined by the specified
* implicit Ordering[T]. This does the opposite of [[takeOrdered]]. For example:
* {{{
* sc.parallelize([10, 4, 2, 12, 3]).top(1)
* // returns [12]
*
* sc.parallelize([2, 3, 4, 5, 6]).top(2)
* // returns [6, 5]
* }}}
*
* @param num the number of top elements to return
* @param ord the implicit ordering for T
* @return an array of top elements
*/
def top(num: Int)(implicit ord: Ordering[T]): Array[T] = {
mapPartitions { items =>
val queue = new BoundedPriorityQueue[T](num)
queue ++= items
Iterator.single(queue)
}.reduce { (queue1, queue2) =>
queue1 ++= queue2
queue1
}.toArray.sorted(ord.reverse)
}
def top(num: Int)(implicit ord: Ordering[T]): Array[T] = takeOrdered(num)(ord.reverse)

/**
* Returns the first K elements from this RDD as defined by
* the specified implicit Ordering[T] and maintains the
* ordering.
* Returns the first K (smallest) elements from this RDD as defined by the specified
* implicit Ordering[T] and maintains the ordering. This does the opposite of [[top]].
* For example:
* {{{
* sc.parallelize([10, 4, 2, 12, 3]).takeOrdered(1)
* // returns [12]
*
* sc.parallelize([2, 3, 4, 5, 6]).takeOrdered(2)
* // returns [2, 3]
* }}}
*
* @param num the number of top elements to return
* @param ord the implicit ordering for T
* @return an array of top elements
*/
def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = top(num)(ord.reverse)
def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = {
mapPartitions { items =>
// Priority keeps the largest elements, so let's reverse the ordering.
val queue = new BoundedPriorityQueue[T](num)(ord.reverse)
queue ++= util.collection.Utils.takeOrdered(items, num)(ord)
Iterator.single(queue)
}.reduce { (queue1, queue2) =>
queue1 ++= queue2
queue1
}.toArray.sorted(ord)
}

/**
* Returns the max of this RDD as defined by the implicit Ordering[T].
Expand Down
39 changes: 39 additions & 0 deletions core/src/main/scala/org/apache/spark/util/collection/Utils.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.util.collection

import scala.collection.JavaConversions.{collectionAsScalaIterable, asJavaIterator}

import com.google.common.collect.{Ordering => GuavaOrdering}

/**
* Utility functions for collections.
*/
private[spark] object Utils {

/**
* Returns the first K elements from the input as defined by the specified implicit Ordering[T]
* and maintains the ordering.
*/
def takeOrdered[T](input: Iterator[T], num: Int)(implicit ord: Ordering[T]): Iterator[T] = {
val ordering = new GuavaOrdering[T] {
override def compare(l: T, r: T) = ord.compare(l, r)
}
collectionAsScalaIterable(ordering.leastOf(asJavaIterator(input), num)).iterator
}
}

0 comments on commit b859853

Please sign in to comment.