Skip to content

Commit

Permalink
change to have doAs in executor higher up.
Browse files Browse the repository at this point in the history
  • Loading branch information
tgravescs committed May 2, 2014
1 parent 98b6559 commit 9398853
Show file tree
Hide file tree
Showing 8 changed files with 72 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,25 +24,30 @@ import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.security.Credentials
import org.apache.hadoop.security.UserGroupInformation

import org.apache.spark.{SparkContext, SparkException}
import org.apache.spark.{Logging, SparkContext, SparkException}

import scala.collection.JavaConversions._

/**
* Contains util methods to interact with Hadoop from Spark.
*/
class SparkHadoopUtil {
class SparkHadoopUtil extends Logging {
val conf: Configuration = newConfiguration()
UserGroupInformation.setConfiguration(conf)

// IMPORTANT NOTE: If this function is going to be called repeated in the same process
// you need to look https://issues.apache.org/jira/browse/HDFS-3545 and possibly
// do a FileSystem.closeAllForUGI in order to avoid leaking Filesystems
def runAsUser(user: String)(func: () => Unit) {
if (user != SparkContext.SPARK_UNKNOWN_USER) {
logInfo("running as user: " + user)
val ugi = UserGroupInformation.createRemoteUser(user)
transferCredentials(UserGroupInformation.getCurrentUser(), ugi)
ugi.doAs(new PrivilegedExceptionAction[Unit] {
def run: Unit = func()
})
} else {
logInfo("running as SPARK_UNKNOWN_USER")
func()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@ import java.nio.ByteBuffer
import akka.actor._
import akka.remote._

import org.apache.spark.{SecurityManager, SparkConf, Logging}
import org.apache.spark.{SparkContext, Logging, SecurityManager, SparkConf}
import org.apache.spark.TaskState.TaskState
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.deploy.worker.WorkerWatcher
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
import org.apache.spark.util.{AkkaUtils, Utils}
Expand Down Expand Up @@ -94,25 +95,32 @@ private[spark] class CoarseGrainedExecutorBackend(

private[spark] object CoarseGrainedExecutorBackend {
def run(driverUrl: String, executorId: String, hostname: String, cores: Int,
workerUrl: Option[String]) {
// Debug code
Utils.checkHost(hostname)

val conf = new SparkConf
// Create a new ActorSystem to run the backend, because we can't create a SparkEnv / Executor
// before getting started with all our system properties, etc
val (actorSystem, boundPort) = AkkaUtils.createActorSystem("sparkExecutor", hostname, 0,
indestructible = true, conf = conf, new SecurityManager(conf))
// set it
val sparkHostPort = hostname + ":" + boundPort
actorSystem.actorOf(
Props(classOf[CoarseGrainedExecutorBackend], driverUrl, executorId,
sparkHostPort, cores),
name = "Executor")
workerUrl.foreach{ url =>
actorSystem.actorOf(Props(classOf[WorkerWatcher], url), name = "WorkerWatcher")
workerUrl: Option[String]) {

val sparkUser = Option(System.getenv("SPARK_USER")).getOrElse(SparkContext.SPARK_UNKNOWN_USER)
SparkHadoopUtil.get.runAsUser(sparkUser) { () =>

// Debug code
Utils.checkHost(hostname)

val conf = new SparkConf
// Create a new ActorSystem to run the backend, because we can't create a SparkEnv / Executor
// before getting started with all our system properties, etc
val (actorSystem, boundPort) = AkkaUtils.createActorSystem("sparkExecutor", hostname, 0,
indestructible = true, conf = conf, new SecurityManager(conf))
// set it
val sparkHostPort = hostname + ":" + boundPort
actorSystem.actorOf(
Props(classOf[CoarseGrainedExecutorBackend], driverUrl, executorId,
sparkHostPort, cores),
name = "Executor")
workerUrl.foreach {
url =>
actorSystem.actorOf(Props(classOf[WorkerWatcher], url), name = "WorkerWatcher")
}
actorSystem.awaitTermination()

}
actorSystem.awaitTermination()
}

def main(args: Array[String]) {
Expand Down
4 changes: 1 addition & 3 deletions core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,6 @@ private[spark] class Executor(
// Maintains the list of running tasks.
private val runningTasks = new ConcurrentHashMap[Long, TaskRunner]

val sparkUser = Option(System.getenv("SPARK_USER")).getOrElse(SparkContext.SPARK_UNKNOWN_USER)

def launchTask(context: ExecutorBackend, taskId: Long, serializedTask: ByteBuffer) {
val tr = new TaskRunner(context, taskId, serializedTask)
runningTasks.put(taskId, tr)
Expand Down Expand Up @@ -172,7 +170,7 @@ private[spark] class Executor(
}
}

override def run(): Unit = SparkHadoopUtil.get.runAsUser(sparkUser) { () =>
override def run() {
val startTime = System.currentTimeMillis()
SparkEnv.set(env)
Thread.currentThread.setContextClassLoader(replClassLoader)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@ import com.google.protobuf.ByteString
import org.apache.mesos.{Executor => MesosExecutor, ExecutorDriver, MesosExecutorDriver, MesosNativeLibrary}
import org.apache.mesos.Protos.{TaskStatus => MesosTaskStatus, _}

import org.apache.spark.Logging
import org.apache.spark.TaskState
import org.apache.spark.{SparkContext, Logging, TaskState}
import org.apache.spark.TaskState.TaskState
import org.apache.spark.util.Utils
import org.apache.spark.deploy.SparkHadoopUtil

private[spark] class MesosExecutorBackend
extends MesosExecutor
Expand Down Expand Up @@ -95,9 +95,13 @@ private[spark] class MesosExecutorBackend
*/
private[spark] object MesosExecutorBackend {
def main(args: Array[String]) {
MesosNativeLibrary.load()
// Create a new Executor and start it running
val runner = new MesosExecutorBackend()
new MesosExecutorDriver(runner).run()
val sparkUser = Option(System.getenv("SPARK_USER")).getOrElse(SparkContext.SPARK_UNKNOWN_USER)
SparkHadoopUtil.get.runAsUser(sparkUser) { () =>

MesosNativeLibrary.load()
// Create a new Executor and start it running
val runner = new MesosExecutorBackend()
new MesosExecutorDriver(runner).run()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,6 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,

private var registered = false

private val sparkUser = Option(System.getenv("SPARK_USER")).getOrElse(
SparkContext.SPARK_UNKNOWN_USER)

def run() {
// Setup the directories so things go to yarn approved directories rather
// then user specified and /tmp.
Expand Down Expand Up @@ -192,7 +189,8 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
false /* initialize */ ,
Thread.currentThread.getContextClassLoader).getMethod("main", classOf[Array[String]])
val t = new Thread {
override def run(): Unit = SparkHadoopUtil.get.runAsUser(sparkUser) { () =>
override def run() {

var successed = false
try {
// Copy
Expand Down Expand Up @@ -480,6 +478,10 @@ object ApplicationMaster {

def main(argStrings: Array[String]) {
val args = new ApplicationMasterArguments(argStrings)
new ApplicationMaster(args).run()
val sparkUser = Option(System.getenv("SPARK_USER")).getOrElse(
SparkContext.SPARK_UNKNOWN_USER)
SparkHadoopUtil.get.runAsUser(sparkUser) { () =>
new ApplicationMaster(args).run()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext}
import org.apache.spark.util.{Utils, AkkaUtils}
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
import org.apache.spark.scheduler.SplitInfo
import org.apache.spark.deploy.SparkHadoopUtil

/**
* An application master that allocates executors on behalf of a driver that is running outside
Expand Down Expand Up @@ -279,6 +280,10 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
object ExecutorLauncher {
def main(argStrings: Array[String]) {
val args = new ApplicationMasterArguments(argStrings)
new ExecutorLauncher(args).run()
val sparkUser = Option(System.getenv("SPARK_USER")).getOrElse(
SparkContext.SPARK_UNKNOWN_USER)
SparkHadoopUtil.get.runAsUser(sparkUser) { () =>
new ExecutorLauncher(args).run()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,6 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
sparkConf.getInt("spark.yarn.max.worker.failures", math.max(args.numExecutors * 2, 3)))

private var registered = false

private val sparkUser = Option(System.getenv("SPARK_USER")).getOrElse(
SparkContext.SPARK_UNKNOWN_USER)

def run() {
// Setup the directories so things go to YARN approved directories rather
Expand Down Expand Up @@ -179,8 +176,9 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
false /* initialize */ ,
Thread.currentThread.getContextClassLoader).getMethod("main", classOf[Array[String]])
val t = new Thread {
override def run(): Unit = SparkHadoopUtil.get.runAsUser(sparkUser) { () =>
var successed = false
override def run() {

var successed = false
try {
// Copy
var mainArgs: Array[String] = new Array[String](args.userArgs.size)
Expand Down Expand Up @@ -462,6 +460,10 @@ object ApplicationMaster {

def main(argStrings: Array[String]) {
val args = new ApplicationMasterArguments(argStrings)
new ApplicationMaster(args).run()
val sparkUser = Option(System.getenv("SPARK_USER")).getOrElse(
SparkContext.SPARK_UNKNOWN_USER)
SparkHadoopUtil.get.runAsUser(sparkUser) { () =>
new ApplicationMaster(args).run()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
import org.apache.spark.scheduler.SplitInfo
import org.apache.hadoop.yarn.client.api.AMRMClient
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
import org.apache.spark.deploy.SparkHadoopUtil

/**
* An application master that allocates executors on behalf of a driver that is running outside
Expand Down Expand Up @@ -255,6 +256,10 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
object ExecutorLauncher {
def main(argStrings: Array[String]) {
val args = new ApplicationMasterArguments(argStrings)
new ExecutorLauncher(args).run()
val sparkUser = Option(System.getenv("SPARK_USER")).getOrElse(
SparkContext.SPARK_UNKNOWN_USER)
SparkHadoopUtil.get.runAsUser(sparkUser) { () =>
new ExecutorLauncher(args).run()
}
}
}

0 comments on commit 9398853

Please sign in to comment.