Skip to content

Commit

Permalink
Predictable closure environment capture
Browse files Browse the repository at this point in the history
The environments of serializable closures are now captured as
part of closure cleaning. Since we already proactively check most
closures for serializability, ClosureCleaner.clean now returns
the result of deserializing the serialized version of the cleaned
closure.
  • Loading branch information
willb committed Mar 27, 2014
1 parent 12c63a7 commit 8ee3ee7
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 6 deletions.
5 changes: 2 additions & 3 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1031,17 +1031,16 @@ class SparkContext(
* Clean a closure to make it ready to serialized and send to tasks
* (removes unreferenced variables in $outer's, updates REPL variables)
*/
private[spark] def clean[F <: AnyRef](f: F): F = {
private[spark] def clean[F <: AnyRef : ClassTag](f: F): F = {
clean(f, true)
}

/**
* Clean a closure to make it ready to serialized and send to tasks
* (removes unreferenced variables in $outer's, updates REPL variables)
*/
private[spark] def clean[F <: AnyRef](f: F, checkSerializable: Boolean): F = {
private[spark] def clean[F <: AnyRef : ClassTag](f: F, checkSerializable: Boolean): F = {
ClosureCleaner.clean(f, checkSerializable)
f
}

/**
Expand Down
11 changes: 8 additions & 3 deletions core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
import scala.collection.mutable.Map
import scala.collection.mutable.Set

import scala.reflect.ClassTag

import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.{ClassReader, ClassVisitor, MethodVisitor, Type}
import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.Opcodes._

Expand Down Expand Up @@ -103,7 +105,7 @@ private[spark] object ClosureCleaner extends Logging {
}
}

def clean(func: AnyRef, checkSerializable: Boolean = true) {
def clean[F <: AnyRef : ClassTag](func: F, checkSerializable: Boolean = true): F = {
// TODO: cache outerClasses / innerClasses / accessedFields
val outerClasses = getOuterClasses(func)
val innerClasses = getInnerClasses(func)
Expand Down Expand Up @@ -155,12 +157,15 @@ private[spark] object ClosureCleaner extends Logging {

if (checkSerializable) {
ensureSerializable(func)
} else {
func
}
}

private def ensureSerializable(func: AnyRef) {
private def ensureSerializable[T: ClassTag](func: T) = {
try {
SparkEnv.get.closureSerializer.newInstance().serialize(func)
val serializer = SparkEnv.get.closureSerializer.newInstance()
serializer.deserialize[T](serializer.serialize[T](func))
} catch {
case ex: Exception => throw new SparkException("Task not serializable: " + ex.toString)
}
Expand Down

0 comments on commit 8ee3ee7

Please sign in to comment.