Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SPARK-1051. On YARN, executors don't doAs submitting user #29

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 10 additions & 8 deletions core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import org.apache.hadoop.security.UserGroupInformation

import org.apache.spark.{SparkContext, SparkException}

import scala.collection.JavaConversions._

/**
* Contains util methods to interact with Hadoop from Spark.
*/
Expand All @@ -33,15 +35,9 @@ class SparkHadoopUtil {
UserGroupInformation.setConfiguration(conf)

def runAsUser(user: String)(func: () => Unit) {
// if we are already running as the user intended there is no reason to do the doAs. It
// will actually break secure HDFS access as it doesn't fill in the credentials. Also if
// the user is UNKNOWN then we shouldn't be creating a remote unknown user
// (this is actually the path spark on yarn takes) since SPARK_USER is initialized only
// in SparkContext.
val currentUser = Option(System.getProperty("user.name")).
getOrElse(SparkContext.SPARK_UNKNOWN_USER)
if (user != SparkContext.SPARK_UNKNOWN_USER && currentUser != user) {
if (user != SparkContext.SPARK_UNKNOWN_USER) {
val ugi = UserGroupInformation.createRemoteUser(user)
transferCredentials(UserGroupInformation.getCurrentUser(), ugi)
ugi.doAs(new PrivilegedExceptionAction[Unit] {
def run: Unit = func()
})
Expand All @@ -50,6 +46,12 @@ class SparkHadoopUtil {
}
}

def transferCredentials(source: UserGroupInformation, dest: UserGroupInformation) {
for (token <- source.getTokens()) {
dest.addToken(token)
}
}

/**
* Return an appropriate (subclass) of Configuration. Creating config can initializes some Hadoop
* subsystems.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import org.apache.hadoop.yarn.ipc.YarnRPC
import org.apache.hadoop.yarn.util.{ConverterUtils, Records}

import org.apache.spark.{SparkConf, SparkContext, Logging}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.util.Utils

class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
Expand Down Expand Up @@ -67,6 +68,9 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,

private var registered = false

private val sparkUser = Option(System.getenv("SPARK_USER")).getOrElse(
SparkContext.SPARK_UNKNOWN_USER)

def run() {
// Setup the directories so things go to yarn approved directories rather
// then user specified and /tmp.
Expand Down Expand Up @@ -180,7 +184,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
false /* initialize */ ,
Thread.currentThread.getContextClassLoader).getMethod("main", classOf[Array[String]])
val t = new Thread {
override def run() {
override def run(): Unit = SparkHadoopUtil.get.runAsUser(sparkUser) { () =>
var successed = false
try {
// Copy
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,7 @@ trait ClientBase extends Logging {
ClientBase.populateClasspath(yarnConf, sparkConf, log4jConfLocalRes != null, env)
env("SPARK_YARN_MODE") = "true"
env("SPARK_YARN_STAGING_DIR") = stagingDir
env("SPARK_USER") = UserGroupInformation.getCurrentUser().getShortUserName()

// Set the environment variables to be passed on to the Workers.
distCacheMgr.setDistFilesEnv(env)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ import org.apache.hadoop.conf.Configuration
*/
class YarnSparkHadoopUtil extends SparkHadoopUtil {

override def transferCredentials(source: UserGroupInformation, dest: UserGroupInformation) {
dest.addCredentials(source.getCredentials())
}

// Note that all params which start with SPARK are propagated all the way through, so if in yarn mode, this MUST be set to true.
override def isYarnMode(): Boolean = { true }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import org.apache.hadoop.yarn.ipc.YarnRPC
import org.apache.hadoop.yarn.util.{ConverterUtils, Records}

import org.apache.spark.{SparkConf, SparkContext, Logging}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.util.Utils


Expand Down Expand Up @@ -68,6 +69,9 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
math.max(args.numWorkers * 2, 3))

private var registered = false

private val sparkUser = Option(System.getenv("SPARK_USER")).getOrElse(
SparkContext.SPARK_UNKNOWN_USER)

def run() {
// Setup the directories so things go to YARN approved directories rather
Expand Down Expand Up @@ -152,7 +156,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
false /* initialize */ ,
Thread.currentThread.getContextClassLoader).getMethod("main", classOf[Array[String]])
val t = new Thread {
override def run() {
override def run(): Unit = SparkHadoopUtil.get.runAsUser(sparkUser) { () =>
var successed = false
try {
// Copy
Expand Down