Skip to content

Commit

Permalink
Skip proactive closure capture for runJob
Browse files Browse the repository at this point in the history
There are two possible cases for runJob calls: either they are called
by RDD action methods from inside Spark or they are called from client
code. There's no need to proactively check the closure argument to
runJob for serializability or force variable capture in either case:

1. if they are called by RDD actions, their closure arguments consist
of mapping an already-serializable closure (with an already-frozen
environment) to each element in the RDD;

2. in both cases, the closure is about to execute and thus the benefit
of proactively checking for serializability (or ensuring immediate
variable capture) is nonexistent.

(Note that ensuring capture via serializability on closure arguments to
runJob also causes pyspark accumulators to fail to update.)
  • Loading branch information
willb committed Mar 27, 2014
1 parent 8ee3ee7 commit 12ef6e3
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 3 deletions.
4 changes: 3 additions & 1 deletion core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -896,7 +896,9 @@ class SparkContext(
require(p >= 0 && p < rdd.partitions.size, s"Invalid partition requested: $p")
}
val callSite = getCallSite
val cleanedFunc = clean(func)
// There's no need to check this function for serializability,
// since it will be run right away.
val cleanedFunc = clean(func, false)
logInfo("Starting job: " + callSite)
val start = System.nanoTime
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, allowLocal,
Expand Down
6 changes: 4 additions & 2 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -640,14 +640,16 @@ abstract class RDD[T: ClassTag](
* Applies a function f to all elements of this RDD.
*/
def foreach(f: T => Unit) {
sc.runJob(this, (iter: Iterator[T]) => iter.foreach(f))
val cleanF = sc.clean(f)
sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
}

/**
* Applies a function f to each partition of this RDD.
*/
def foreachPartition(f: Iterator[T] => Unit) {
sc.runJob(this, (iter: Iterator[T]) => f(iter))
val cleanF = sc.clean(f)
sc.runJob(this, (iter: Iterator[T]) => cleanF(iter))
}

/**
Expand Down

0 comments on commit 12ef6e3

Please sign in to comment.