Skip to content

Commit

Permalink
[SPARK-4818][Core] Add 'iterator' to reduce memory consumed by join
Browse files Browse the repository at this point in the history
In Scala, `map` and `flatMap` of `Iterable` will copy the contents of `Iterable` to a new `Seq`. Such as,
```Scala
  val iterable = Seq(1, 2, 3).map(v => {
    println(v)
    v
  })
  println("Iterable map done")

  val iterator = Seq(1, 2, 3).iterator.map(v => {
    println(v)
    v
  })
  println("Iterator map done")
```
outputed
```
1
2
3
Iterable map done
Iterator map done
```
So we should use 'iterator' to reduce memory consumed by join.

Found by Johannes Simon in http://mail-archives.apache.org/mod_mbox/spark-user/201412.mbox/%3C5BE70814-9D03-4F61-AE2C-0D63F2DE4446%40mail.de%3E

Author: zsxwing <zsxwing@gmail.com>

Closes #3671 from zsxwing/SPARK-4824 and squashes the following commits:

48ee7b9 [zsxwing] Remove the explicit types
95d59d6 [zsxwing] Add 'iterator' to reduce memory consumed by join

(cherry picked from commit c233ab3)
Signed-off-by: Josh Rosen <joshrosen@databricks.com>

Conflicts:
	core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
  • Loading branch information
zsxwing authored and JoshRosen committed Dec 22, 2014
1 parent e5f2752 commit 3bce43f
Showing 1 changed file with 5 additions and 5 deletions.
10 changes: 5 additions & 5 deletions core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -472,7 +472,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
*/
def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = {
this.cogroup(other, partitioner).flatMapValues( pair =>
for (v <- pair._1; w <- pair._2) yield (v, w)
for (v <- pair._1.iterator; w <- pair._2.iterator) yield (v, w)
)
}

Expand All @@ -485,9 +485,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
def leftOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, Option[W]))] = {
this.cogroup(other, partitioner).flatMapValues { pair =>
if (pair._2.isEmpty) {
pair._1.map(v => (v, None))
pair._1.iterator.map(v => (v, None))
} else {
for (v <- pair._1; w <- pair._2) yield (v, Some(w))
for (v <- pair._1.iterator; w <- pair._2.iterator) yield (v, Some(w))
}
}
}
Expand All @@ -502,9 +502,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
: RDD[(K, (Option[V], W))] = {
this.cogroup(other, partitioner).flatMapValues { pair =>
if (pair._1.isEmpty) {
pair._2.map(w => (None, w))
pair._2.iterator.map(w => (None, w))
} else {
for (v <- pair._1; w <- pair._2) yield (Some(v), w)
for (v <- pair._1.iterator; w <- pair._2.iterator) yield (Some(v), w)
}
}
}
Expand Down

0 comments on commit 3bce43f

Please sign in to comment.