Skip to content

Commit

Permalink
[WIP] SPARK-1676: Cache Hadoop UGIs by default to prevent FileSystem …
Browse files Browse the repository at this point in the history
…leak

Move the doAs in Executor higher up so that we only have 1 ugi and aren't leaking filesystems.
Fix spark on yarn to work when the cluster is running as user "yarn" but the clients are launched as the user and want to read/write to hdfs as the user.

Note this hasn't been fully tested yet.  Need to test in standalone mode.

Putting this up for people to look at and possibly test.  I don't have access to a mesos cluster.

This is alternative to apache#607

Author: Thomas Graves <tgraves@apache.org>

Closes apache#621 from tgravescs/SPARK-1676 and squashes the following commits:

244d55a [Thomas Graves] fix line length
44163d4 [Thomas Graves] Rework
9398853 [Thomas Graves] change to have doAs in executor higher up.
  • Loading branch information
tgravescs authored and pdeyhim committed Jun 25, 2014
1 parent 25ea311 commit 18da078
Show file tree
Hide file tree
Showing 8 changed files with 69 additions and 46 deletions.
17 changes: 14 additions & 3 deletions core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,25 +24,36 @@ import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.security.Credentials
import org.apache.hadoop.security.UserGroupInformation

import org.apache.spark.{SparkContext, SparkException}
import org.apache.spark.{Logging, SparkContext, SparkException}

import scala.collection.JavaConversions._

/**
* Contains util methods to interact with Hadoop from Spark.
*/
class SparkHadoopUtil {
class SparkHadoopUtil extends Logging {
val conf: Configuration = newConfiguration()
UserGroupInformation.setConfiguration(conf)

def runAsUser(user: String)(func: () => Unit) {
/**
* Runs the given function with a Hadoop UserGroupInformation as a thread local variable
* (distributed to child threads), used for authenticating HDFS and YARN calls.
*
* IMPORTANT NOTE: If this function is going to be called repeated in the same process
* you need to look https://issues.apache.org/jira/browse/HDFS-3545 and possibly
* do a FileSystem.closeAllForUGI in order to avoid leaking Filesystems
*/
def runAsSparkUser(func: () => Unit) {
val user = Option(System.getenv("SPARK_USER")).getOrElse(SparkContext.SPARK_UNKNOWN_USER)
if (user != SparkContext.SPARK_UNKNOWN_USER) {
logDebug("running as user: " + user)
val ugi = UserGroupInformation.createRemoteUser(user)
transferCredentials(UserGroupInformation.getCurrentUser(), ugi)
ugi.doAs(new PrivilegedExceptionAction[Unit] {
def run: Unit = func()
})
} else {
logDebug("running as SPARK_UNKNOWN_USER")
func()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@ import java.nio.ByteBuffer
import akka.actor._
import akka.remote._

import org.apache.spark.{SecurityManager, SparkConf, Logging}
import org.apache.spark.{Logging, SecurityManager, SparkConf}
import org.apache.spark.TaskState.TaskState
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.deploy.worker.WorkerWatcher
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
import org.apache.spark.util.{AkkaUtils, Utils}
Expand Down Expand Up @@ -94,25 +95,30 @@ private[spark] class CoarseGrainedExecutorBackend(

private[spark] object CoarseGrainedExecutorBackend {
def run(driverUrl: String, executorId: String, hostname: String, cores: Int,
workerUrl: Option[String]) {
// Debug code
Utils.checkHost(hostname)

val conf = new SparkConf
// Create a new ActorSystem to run the backend, because we can't create a SparkEnv / Executor
// before getting started with all our system properties, etc
val (actorSystem, boundPort) = AkkaUtils.createActorSystem("sparkExecutor", hostname, 0,
indestructible = true, conf = conf, new SecurityManager(conf))
// set it
val sparkHostPort = hostname + ":" + boundPort
actorSystem.actorOf(
Props(classOf[CoarseGrainedExecutorBackend], driverUrl, executorId,
sparkHostPort, cores),
name = "Executor")
workerUrl.foreach{ url =>
actorSystem.actorOf(Props(classOf[WorkerWatcher], url), name = "WorkerWatcher")
workerUrl: Option[String]) {

SparkHadoopUtil.get.runAsSparkUser { () =>
// Debug code
Utils.checkHost(hostname)

val conf = new SparkConf
// Create a new ActorSystem to run the backend, because we can't create a
// SparkEnv / Executor before getting started with all our system properties, etc
val (actorSystem, boundPort) = AkkaUtils.createActorSystem("sparkExecutor", hostname, 0,
indestructible = true, conf = conf, new SecurityManager(conf))
// set it
val sparkHostPort = hostname + ":" + boundPort
actorSystem.actorOf(
Props(classOf[CoarseGrainedExecutorBackend], driverUrl, executorId,
sparkHostPort, cores),
name = "Executor")
workerUrl.foreach {
url =>
actorSystem.actorOf(Props(classOf[WorkerWatcher], url), name = "WorkerWatcher")
}
actorSystem.awaitTermination()

}
actorSystem.awaitTermination()
}

def main(args: Array[String]) {
Expand Down
4 changes: 1 addition & 3 deletions core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,6 @@ private[spark] class Executor(
// Maintains the list of running tasks.
private val runningTasks = new ConcurrentHashMap[Long, TaskRunner]

val sparkUser = Option(System.getenv("SPARK_USER")).getOrElse(SparkContext.SPARK_UNKNOWN_USER)

def launchTask(context: ExecutorBackend, taskId: Long, serializedTask: ByteBuffer) {
val tr = new TaskRunner(context, taskId, serializedTask)
runningTasks.put(taskId, tr)
Expand Down Expand Up @@ -172,7 +170,7 @@ private[spark] class Executor(
}
}

override def run(): Unit = SparkHadoopUtil.get.runAsUser(sparkUser) { () =>
override def run() {
val startTime = System.currentTimeMillis()
SparkEnv.set(env)
Thread.currentThread.setContextClassLoader(replClassLoader)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@ import com.google.protobuf.ByteString
import org.apache.mesos.{Executor => MesosExecutor, ExecutorDriver, MesosExecutorDriver, MesosNativeLibrary}
import org.apache.mesos.Protos.{TaskStatus => MesosTaskStatus, _}

import org.apache.spark.Logging
import org.apache.spark.TaskState
import org.apache.spark.{Logging, TaskState}
import org.apache.spark.TaskState.TaskState
import org.apache.spark.util.Utils
import org.apache.spark.deploy.SparkHadoopUtil

private[spark] class MesosExecutorBackend
extends MesosExecutor
Expand Down Expand Up @@ -95,9 +95,11 @@ private[spark] class MesosExecutorBackend
*/
private[spark] object MesosExecutorBackend {
def main(args: Array[String]) {
MesosNativeLibrary.load()
// Create a new Executor and start it running
val runner = new MesosExecutorBackend()
new MesosExecutorDriver(runner).run()
SparkHadoopUtil.get.runAsSparkUser { () =>
MesosNativeLibrary.load()
// Create a new Executor and start it running
val runner = new MesosExecutorBackend()
new MesosExecutorDriver(runner).run()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,6 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,

private var registered = false

private val sparkUser = Option(System.getenv("SPARK_USER")).getOrElse(
SparkContext.SPARK_UNKNOWN_USER)

def run() {
// Setup the directories so things go to yarn approved directories rather
// then user specified and /tmp.
Expand Down Expand Up @@ -192,7 +189,8 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
false /* initialize */ ,
Thread.currentThread.getContextClassLoader).getMethod("main", classOf[Array[String]])
val t = new Thread {
override def run(): Unit = SparkHadoopUtil.get.runAsUser(sparkUser) { () =>
override def run() {

var successed = false
try {
// Copy
Expand Down Expand Up @@ -480,6 +478,8 @@ object ApplicationMaster {

def main(argStrings: Array[String]) {
val args = new ApplicationMasterArguments(argStrings)
new ApplicationMaster(args).run()
SparkHadoopUtil.get.runAsSparkUser { () =>
new ApplicationMaster(args).run()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,11 @@ import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
import akka.actor._
import akka.remote._
import akka.actor.Terminated
import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext}
import org.apache.spark.{Logging, SecurityManager, SparkConf}
import org.apache.spark.util.{Utils, AkkaUtils}
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
import org.apache.spark.scheduler.SplitInfo
import org.apache.spark.deploy.SparkHadoopUtil

/**
* An application master that allocates executors on behalf of a driver that is running outside
Expand Down Expand Up @@ -279,6 +280,8 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
object ExecutorLauncher {
def main(argStrings: Array[String]) {
val args = new ApplicationMasterArguments(argStrings)
new ExecutorLauncher(args).run()
SparkHadoopUtil.get.runAsSparkUser { () =>
new ExecutorLauncher(args).run()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,6 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
sparkConf.getInt("spark.yarn.max.worker.failures", math.max(args.numExecutors * 2, 3)))

private var registered = false

private val sparkUser = Option(System.getenv("SPARK_USER")).getOrElse(
SparkContext.SPARK_UNKNOWN_USER)

def run() {
// Setup the directories so things go to YARN approved directories rather
Expand Down Expand Up @@ -179,8 +176,9 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
false /* initialize */ ,
Thread.currentThread.getContextClassLoader).getMethod("main", classOf[Array[String]])
val t = new Thread {
override def run(): Unit = SparkHadoopUtil.get.runAsUser(sparkUser) { () =>
var successed = false
override def run() {

var successed = false
try {
// Copy
var mainArgs: Array[String] = new Array[String](args.userArgs.size)
Expand Down Expand Up @@ -462,6 +460,8 @@ object ApplicationMaster {

def main(argStrings: Array[String]) {
val args = new ApplicationMasterArguments(argStrings)
new ApplicationMaster(args).run()
SparkHadoopUtil.get.runAsSparkUser { () =>
new ApplicationMaster(args).run()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,13 @@ import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
import akka.actor._
import akka.remote._
import akka.actor.Terminated
import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext}
import org.apache.spark.{Logging, SecurityManager, SparkConf}
import org.apache.spark.util.{Utils, AkkaUtils}
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
import org.apache.spark.scheduler.SplitInfo
import org.apache.hadoop.yarn.client.api.AMRMClient
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
import org.apache.spark.deploy.SparkHadoopUtil

/**
* An application master that allocates executors on behalf of a driver that is running outside
Expand Down Expand Up @@ -255,6 +256,8 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
object ExecutorLauncher {
def main(argStrings: Array[String]) {
val args = new ApplicationMasterArguments(argStrings)
new ExecutorLauncher(args).run()
SparkHadoopUtil.get.runAsSparkUser { () =>
new ExecutorLauncher(args).run()
}
}
}

0 comments on commit 18da078

Please sign in to comment.