diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 1b43040c6d918..190820ab48f07 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -927,32 +927,49 @@ abstract class RDD[T: ClassTag]( } /** - * Returns the top K elements from this RDD as defined by - * the specified implicit Ordering[T]. + * Returns the top K (largest) elements from this RDD as defined by the specified + * implicit Ordering[T]. This does the opposite of [[takeOrdered]]. For example: + * {{{ + * sc.parallelize([10, 4, 2, 12, 3]).top(1) + * // returns [12] + * + * sc.parallelize([2, 3, 4, 5, 6]).top(2) + * // returns [6, 5] + * }}} + * * @param num the number of top elements to return * @param ord the implicit ordering for T * @return an array of top elements */ - def top(num: Int)(implicit ord: Ordering[T]): Array[T] = { - mapPartitions { items => - val queue = new BoundedPriorityQueue[T](num) - queue ++= items - Iterator.single(queue) - }.reduce { (queue1, queue2) => - queue1 ++= queue2 - queue1 - }.toArray.sorted(ord.reverse) - } + def top(num: Int)(implicit ord: Ordering[T]): Array[T] = takeOrdered(num)(ord.reverse) /** - * Returns the first K elements from this RDD as defined by - * the specified implicit Ordering[T] and maintains the - * ordering. + * Returns the first K (smallest) elements from this RDD as defined by the specified + * implicit Ordering[T] and maintains the ordering. This does the opposite of [[top]]. + * For example: + * {{{ + * sc.parallelize([10, 4, 2, 12, 3]).takeOrdered(1) + * // returns [12] + * + * sc.parallelize([2, 3, 4, 5, 6]).takeOrdered(2) + * // returns [2, 3] + * }}} + * * @param num the number of top elements to return * @param ord the implicit ordering for T * @return an array of top elements */ - def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = top(num)(ord.reverse) + def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = { + mapPartitions { items => + // Priority keeps the largest elements, so let's reverse the ordering. + val queue = new BoundedPriorityQueue[T](num)(ord.reverse) + queue ++= util.collection.Utils.takeOrdered(items, num)(ord) + Iterator.single(queue) + }.reduce { (queue1, queue2) => + queue1 ++= queue2 + queue1 + }.toArray.sorted(ord) + } /** * Returns the max of this RDD as defined by the implicit Ordering[T]. diff --git a/core/src/main/scala/org/apache/spark/util/collection/Utils.scala b/core/src/main/scala/org/apache/spark/util/collection/Utils.scala new file mode 100644 index 0000000000000..c5268c0fae0ef --- /dev/null +++ b/core/src/main/scala/org/apache/spark/util/collection/Utils.scala @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.collection + +import scala.collection.JavaConversions.{collectionAsScalaIterable, asJavaIterator} + +import com.google.common.collect.{Ordering => GuavaOrdering} + +/** + * Utility functions for collections. + */ +private[spark] object Utils { + + /** + * Returns the first K elements from the input as defined by the specified implicit Ordering[T] + * and maintains the ordering. + */ + def takeOrdered[T](input: Iterator[T], num: Int)(implicit ord: Ordering[T]): Iterator[T] = { + val ordering = new GuavaOrdering[T] { + override def compare(l: T, r: T) = ord.compare(l, r) + } + collectionAsScalaIterable(ordering.leastOf(asJavaIterator(input), num)).iterator + } +}