Skip to content

Commit

Permalink
Clean function in several RDD methods
Browse files Browse the repository at this point in the history
  • Loading branch information
tedyu committed May 7, 2015
1 parent 14502d5 commit 6c124a9
Showing 1 changed file with 8 additions and 4 deletions.
12 changes: 8 additions & 4 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -741,9 +741,10 @@ abstract class RDD[T: ClassTag](
def mapWith[A, U: ClassTag]
(constructA: Int => A, preservesPartitioning: Boolean = false)
(f: (T, A) => U): RDD[U] = withScope {
val cleanF = sc.clean(f)
mapPartitionsWithIndex((index, iter) => {
val a = constructA(index)
iter.map(t => f(t, a))
iter.map(t => cleanF(t, a))
}, preservesPartitioning)
}

Expand All @@ -756,9 +757,10 @@ abstract class RDD[T: ClassTag](
def flatMapWith[A, U: ClassTag]
(constructA: Int => A, preservesPartitioning: Boolean = false)
(f: (T, A) => Seq[U]): RDD[U] = withScope {
val cleanF = sc.clean(f)
mapPartitionsWithIndex((index, iter) => {
val a = constructA(index)
iter.flatMap(t => f(t, a))
iter.flatMap(t => cleanF(t, a))
}, preservesPartitioning)
}

Expand All @@ -769,9 +771,10 @@ abstract class RDD[T: ClassTag](
*/
@deprecated("use mapPartitionsWithIndex and foreach", "1.0.0")
def foreachWith[A](constructA: Int => A)(f: (T, A) => Unit): Unit = withScope {
val cleanF = sc.clean(f)
mapPartitionsWithIndex { (index, iter) =>
val a = constructA(index)
iter.map(t => {f(t, a); t})
iter.map(t => {cleanF(t, a); t})
}
}

Expand Down Expand Up @@ -901,7 +904,8 @@ abstract class RDD[T: ClassTag](
* Return an RDD that contains all matching values by applying `f`.
*/
def collect[U: ClassTag](f: PartialFunction[T, U]): RDD[U] = withScope {
filter(f.isDefinedAt).map(f)
val cleanF = sc.clean(f)
filter(cleanF.isDefinedAt).map(cleanF)
}

/**
Expand Down

0 comments on commit 6c124a9

Please sign in to comment.