Skip to content

Commit

Permalink
Rework
Browse files Browse the repository at this point in the history
  • Loading branch information
tgravescs committed May 3, 2014
1 parent 9398853 commit 44163d4
Show file tree
Hide file tree
Showing 7 changed files with 22 additions and 28 deletions.
18 changes: 12 additions & 6 deletions core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,19 +35,25 @@ class SparkHadoopUtil extends Logging {
val conf: Configuration = newConfiguration()
UserGroupInformation.setConfiguration(conf)

// IMPORTANT NOTE: If this function is going to be called repeated in the same process
// you need to look https://issues.apache.org/jira/browse/HDFS-3545 and possibly
// do a FileSystem.closeAllForUGI in order to avoid leaking Filesystems
def runAsUser(user: String)(func: () => Unit) {
/**
* Runs the given function with a Hadoop UserGroupInformation as a thread local variable
* (distributed to child threads), used for authenticating HDFS and YARN calls.
*
* IMPORTANT NOTE: If this function is going to be called repeated in the same process
* you need to look https://issues.apache.org/jira/browse/HDFS-3545 and possibly
* do a FileSystem.closeAllForUGI in order to avoid leaking Filesystems
*/
def runAsSparkUser(func: () => Unit) {
val user = Option(System.getenv("SPARK_USER")).getOrElse(SparkContext.SPARK_UNKNOWN_USER)
if (user != SparkContext.SPARK_UNKNOWN_USER) {
logInfo("running as user: " + user)
logDebug("running as user: " + user)
val ugi = UserGroupInformation.createRemoteUser(user)
transferCredentials(UserGroupInformation.getCurrentUser(), ugi)
ugi.doAs(new PrivilegedExceptionAction[Unit] {
def run: Unit = func()
})
} else {
logInfo("running as SPARK_UNKNOWN_USER")
logDebug("running as SPARK_UNKNOWN_USER")
func()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.nio.ByteBuffer
import akka.actor._
import akka.remote._

import org.apache.spark.{SparkContext, Logging, SecurityManager, SparkConf}
import org.apache.spark.{Logging, SecurityManager, SparkConf}
import org.apache.spark.TaskState.TaskState
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.deploy.worker.WorkerWatcher
Expand Down Expand Up @@ -97,9 +97,7 @@ private[spark] object CoarseGrainedExecutorBackend {
def run(driverUrl: String, executorId: String, hostname: String, cores: Int,
workerUrl: Option[String]) {

val sparkUser = Option(System.getenv("SPARK_USER")).getOrElse(SparkContext.SPARK_UNKNOWN_USER)
SparkHadoopUtil.get.runAsUser(sparkUser) { () =>

SparkHadoopUtil.get.runAsSparkUser { () =>
// Debug code
Utils.checkHost(hostname)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import com.google.protobuf.ByteString
import org.apache.mesos.{Executor => MesosExecutor, ExecutorDriver, MesosExecutorDriver, MesosNativeLibrary}
import org.apache.mesos.Protos.{TaskStatus => MesosTaskStatus, _}

import org.apache.spark.{SparkContext, Logging, TaskState}
import org.apache.spark.{Logging, TaskState}
import org.apache.spark.TaskState.TaskState
import org.apache.spark.util.Utils
import org.apache.spark.deploy.SparkHadoopUtil
Expand Down Expand Up @@ -95,9 +95,7 @@ private[spark] class MesosExecutorBackend
*/
private[spark] object MesosExecutorBackend {
def main(args: Array[String]) {
val sparkUser = Option(System.getenv("SPARK_USER")).getOrElse(SparkContext.SPARK_UNKNOWN_USER)
SparkHadoopUtil.get.runAsUser(sparkUser) { () =>

SparkHadoopUtil.get.runAsSparkUser { () =>
MesosNativeLibrary.load()
// Create a new Executor and start it running
val runner = new MesosExecutorBackend()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -478,9 +478,7 @@ object ApplicationMaster {

def main(argStrings: Array[String]) {
val args = new ApplicationMasterArguments(argStrings)
val sparkUser = Option(System.getenv("SPARK_USER")).getOrElse(
SparkContext.SPARK_UNKNOWN_USER)
SparkHadoopUtil.get.runAsUser(sparkUser) { () =>
SparkHadoopUtil.get.runAsSparkUser { () =>
new ApplicationMaster(args).run()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
import akka.actor._
import akka.remote._
import akka.actor.Terminated
import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext}
import org.apache.spark.{Logging, SecurityManager, SparkConf}
import org.apache.spark.util.{Utils, AkkaUtils}
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
import org.apache.spark.scheduler.SplitInfo
Expand Down Expand Up @@ -280,9 +280,7 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
object ExecutorLauncher {
def main(argStrings: Array[String]) {
val args = new ApplicationMasterArguments(argStrings)
val sparkUser = Option(System.getenv("SPARK_USER")).getOrElse(
SparkContext.SPARK_UNKNOWN_USER)
SparkHadoopUtil.get.runAsUser(sparkUser) { () =>
SparkHadoopUtil.get.runAsSparkUser { () =>
new ExecutorLauncher(args).run()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -460,9 +460,7 @@ object ApplicationMaster {

def main(argStrings: Array[String]) {
val args = new ApplicationMasterArguments(argStrings)
val sparkUser = Option(System.getenv("SPARK_USER")).getOrElse(
SparkContext.SPARK_UNKNOWN_USER)
SparkHadoopUtil.get.runAsUser(sparkUser) { () =>
SparkHadoopUtil.get.runAsSparkUser { () =>
new ApplicationMaster(args).run()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
import akka.actor._
import akka.remote._
import akka.actor.Terminated
import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext}
import org.apache.spark.{Logging, SecurityManager, SparkConf}
import org.apache.spark.util.{Utils, AkkaUtils}
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
import org.apache.spark.scheduler.SplitInfo
Expand Down Expand Up @@ -256,9 +256,7 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
object ExecutorLauncher {
def main(argStrings: Array[String]) {
val args = new ApplicationMasterArguments(argStrings)
val sparkUser = Option(System.getenv("SPARK_USER")).getOrElse(
SparkContext.SPARK_UNKNOWN_USER)
SparkHadoopUtil.get.runAsUser(sparkUser) { () =>
SparkHadoopUtil.get.runAsSparkUser { () =>
new ExecutorLauncher(args).run()
}
}
Expand Down

0 comments on commit 44163d4

Please sign in to comment.