Skip to content

Commit

Permalink
One loader workers.
Browse files Browse the repository at this point in the history
  • Loading branch information
holdenk committed Apr 8, 2014
1 parent f27e56a commit e1d9f71
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 11 deletions.
3 changes: 2 additions & 1 deletion core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,8 @@ private[spark] class Executor(
val urls = currentJars.keySet.map { uri =>
new File(uri.split("/").last).toURI.toURL
}.toArray
new ExecutorURLClassLoader(urls, loader)
val userClassPathFirst = conf.getBoolean("spark.classpath.userClassPathFirst", false)
new ExecutorURLClassLoader(urls, loader, userClassPathFirst)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,48 @@ import java.net.{URLClassLoader, URL}

/**
* The addURL method in URLClassLoader is protected. We subclass it to make this accessible.
* We also make changes so user classes can come before the default classes.
*/
private[spark] class ExecutorURLClassLoader(urls: Array[URL], parent: ClassLoader)
extends URLClassLoader(urls, parent) {

override def addURL(url: URL) {
super.addURL(url)
private[spark] class ExecutorURLClassLoader(urls: Array[URL], parent: ClassLoader, userFirst: Boolean)
extends ClassLoader {
class OverridenURLClassLoader(urls: Array[URL], parent: ClassLoader) extends URLClassLoader(urls, parent){
override def addURL(url: URL) {
super.addURL(url)
}
override def findClass(name: String): Class[_] = {
super.findClass(name)
}
}
val userClassLoader = new OverridenURLClassLoader(urls, null)

object childClassLoader extends ClassLoader(parent) {
override def findClass(name: String): Class[_] = {
super.findClass(name)
}
}

override def findClass(name: String): Class[_] = {
if (!userFirst) {
try {
childClassLoader.findClass(name)
} catch {
case e: ClassNotFoundException => userClassLoader.findClass(name)
}
} else {
try {
userClassLoader.findClass(name)
} catch {
case e: ClassNotFoundException => childClassLoader.findClass(name)
}
}
}

def addURL(url: URL) {
userClassLoader.addURL(url)
}

def getURLs() = {
userClassLoader.getURLs()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,12 @@ import org.apache.spark.util.Utils
import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm._
import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.Opcodes._


/**
* A ClassLoader that reads classes from a Hadoop FileSystem or HTTP URI,
* used to load classes defined by the interpreter when the REPL is used
*/
class ExecutorClassLoader(classUri: String, parent: ClassLoader)
extends ClassLoader(parent) {
class ExecutorClassLoader(classUri: String, parent: ClassLoader, userClassPathFirst: Boolean)
extends ClassLoader {
val uri = new URI(classUri)
val directory = uri.getPath

Expand All @@ -49,8 +48,27 @@ extends ClassLoader(parent) {
FileSystem.get(uri, new Configuration())
}
}

override def findClass(name: String): Class[_] = {
userClassPathFirst match {
case true => findClassLocally(name).getOrElse(parent.findClass(name))
case false => {
try {
parent.findClass(name)
} catch {
case e: ClassNotFoundException => {
val classOption = findClassLocally(name)
classOption match {
case None => throw new ClassNotFoundException(name, e)
case Some(a) => a
}
}
}
}
}
}

def findClassLocaly(name: String): Option[Class[_]] = {
try {
val pathInDirectory = name.replace('.', '/') + ".class"
val inputStream = {
Expand All @@ -68,9 +86,9 @@ extends ClassLoader(parent) {
}
val bytes = readAndTransformClass(name, inputStream)
inputStream.close()
return defineClass(name, bytes, 0, bytes.length)
Some(defineClass(name, bytes, 0, bytes.length))
} catch {
case e: Exception => throw new ClassNotFoundException(name, e)
case e: Exception => None
}
}

Expand Down

0 comments on commit e1d9f71

Please sign in to comment.