Skip to content

Commit

Permalink
SPARK-1259 Make RDD locally iterable
Browse files Browse the repository at this point in the history
  • Loading branch information
epahomov committed Mar 16, 2014
1 parent 0283665 commit 33ecb17
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 0 deletions.
16 changes: 16 additions & 0 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -663,6 +663,22 @@ abstract class RDD[T: ClassTag](
Array.concat(results: _*)
}

/**
* Return a Stream that contains all of the elements in this RDD.
*
* In case of iterating it consumes memory as the biggest partition in cluster.
*/
def toStream(): Stream[T] = {
def collectPartition(p: Int): Array[T] = sc.runJob(this, (iter: Iterator[T]) => iter.toArray, Seq(p), allowLocal = false).head
var buffer = Stream.empty[T]
for (p <- 0 until this.partitions.length) {
buffer = buffer #::: {
collectPartition(p).toStream
}
}
buffer
}

/**
* Return an array that contains all of the elements in this RDD.
*/
Expand Down
1 change: 1 addition & 0 deletions core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ class RDDSuite extends FunSuite with SharedSparkContext {
test("basic operations") {
val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
assert(nums.collect().toList === List(1, 2, 3, 4))
assert(nums.toStream().toList === List(1, 2, 3, 4))
val dups = sc.makeRDD(Array(1, 1, 2, 2, 3, 3, 4, 4), 2)
assert(dups.distinct().count() === 4)
assert(dups.distinct.count === 4) // Can distinct and count be called without parentheses?
Expand Down

0 comments on commit 33ecb17

Please sign in to comment.