diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 1091ff54b0463..de55ede19c516 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -17,15 +17,20 @@ package org.apache.spark.deploy.yarn +import java.io.{ByteArrayInputStream, DataInputStream, File, FileOutputStream} import java.net.{InetAddress, UnknownHostException, URI, URISyntaxException} import java.nio.ByteBuffer +import java.security.PrivilegedExceptionAction +import java.util.UUID +import java.util.zip.{ZipEntry, ZipOutputStream} import scala.collection.JavaConversions._ -import scala.collection.mutable.{ArrayBuffer, HashMap, ListBuffer, Map} +import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, ListBuffer, Map} import scala.reflect.runtime.universe import scala.util.{Try, Success, Failure} import com.google.common.base.Objects +import com.google.common.io.Files import org.apache.hadoop.io.DataOutputBuffer import org.apache.hadoop.conf.Configuration @@ -33,10 +38,9 @@ import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifie import org.apache.hadoop.fs._ import org.apache.hadoop.fs.permission.FsPermission import org.apache.hadoop.io.Text -import org.apache.hadoop.mapred.Master import org.apache.hadoop.mapreduce.MRJobConfig import org.apache.hadoop.security.{Credentials, UserGroupInformation} -import org.apache.hadoop.security.token.Token +import org.apache.hadoop.security.token.{TokenIdentifier, Token} import org.apache.hadoop.util.StringUtils import org.apache.hadoop.yarn.api._ import org.apache.hadoop.yarn.api.ApplicationConstants.Environment @@ -47,14 +51,14 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException import org.apache.hadoop.yarn.util.Records -import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext, SparkException} import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext, SparkException} import org.apache.spark.util.Utils private[spark] class Client( - val args: ClientArguments, - val hadoopConf: Configuration, - val sparkConf: SparkConf) + val args: ClientArguments, + val hadoopConf: Configuration, + val sparkConf: SparkConf) extends Logging { import Client._ @@ -66,23 +70,19 @@ private[spark] class Client( private val yarnClient = YarnClient.createYarnClient private val yarnConf = new YarnConfiguration(hadoopConf) - private val credentials = UserGroupInformation.getCurrentUser.getCredentials + private var credentials: Credentials = null private val amMemoryOverhead = args.amMemoryOverhead // MB private val executorMemoryOverhead = args.executorMemoryOverhead // MB private val distCacheMgr = new ClientDistributedCacheManager() private val isClusterMode = args.isClusterMode + + private var loginFromKeytab = false private val fireAndForget = isClusterMode && !sparkConf.getBoolean("spark.yarn.submit.waitAppCompletion", true) def stop(): Unit = yarnClient.stop() - /* ------------------------------------------------------------------------------------- * - | The following methods have much in common in the stable and alpha versions of Client, | - | but cannot be implemented in the parent trait due to subtle API differences across | - | hadoop versions. | - * ------------------------------------------------------------------------------------- */ - /** * Submit an application running our ApplicationMaster to the ResourceManager. * @@ -91,6 +91,8 @@ private[spark] class Client( * available in the alpha API. */ def submitApplication(): ApplicationId = { + // Setup the credentials before doing anything else, so we have don't have issues at any point. + setupCredentials() yarnClient.init(yarnConf) yarnClient.start() @@ -120,8 +122,8 @@ private[spark] class Client( * This uses the YarnClientApplication not available in the Yarn alpha API. */ def createApplicationSubmissionContext( - newApp: YarnClientApplication, - containerContext: ContainerLaunchContext): ApplicationSubmissionContext = { + newApp: YarnClientApplication, + containerContext: ContainerLaunchContext): ApplicationSubmissionContext = { val appContext = newApp.getApplicationSubmissionContext appContext.setApplicationName(args.appName) appContext.setQueue(args.amQueue) @@ -130,7 +132,7 @@ private[spark] class Client( sparkConf.getOption("spark.yarn.maxAppAttempts").map(_.toInt) match { case Some(v) => appContext.setMaxAppAttempts(v) case None => logDebug("spark.yarn.maxAppAttempts is not set. " + - "Cluster's default value will be used.") + "Cluster's default value will be used.") } val capability = Records.newRecord(classOf[Resource]) capability.setMemory(args.amMemory + amMemoryOverhead) @@ -188,9 +190,9 @@ private[spark] class Client( * for preparing resources for launching the ApplicationMaster container. Exposed for testing. */ private[yarn] def copyFileToRemote( - destDir: Path, - srcPath: Path, - replication: Short): Path = { + destDir: Path, + srcPath: Path, + replication: Short): Path = { val destFs = destDir.getFileSystem(hadoopConf) val srcFs = srcPath.getFileSystem(hadoopConf) var destPath = srcPath @@ -222,9 +224,14 @@ private[spark] class Client( // and add them as local resources to the application master. val fs = FileSystem.get(hadoopConf) val dst = new Path(fs.getHomeDirectory(), appStagingDir) - val nns = getNameNodesToAccess(sparkConf) + dst - obtainTokensForNamenodes(nns, hadoopConf, credentials) + val nns = YarnSparkHadoopUtil.get.getNameNodesToAccess(sparkConf) + dst + YarnSparkHadoopUtil.get.obtainTokensForNamenodes(nns, hadoopConf, credentials) + // Used to keep track of URIs added to the distributed cache. If the same URI is added + // multiple times, YARN will fail to launch containers for the app with an internal + // error. + val distributedUris = new HashSet[String] obtainTokenForHiveMetastore(hadoopConf, credentials) + obtainTokenForHBase(hadoopConf, credentials) val replication = sparkConf.getInt("spark.yarn.submit.file.replication", fs.getDefaultReplication(dst)).toShort @@ -241,6 +248,31 @@ private[spark] class Client( "for alternatives.") } + // If we passed in a keytab, make sure we copy the keytab to the staging directory on + // HDFS, and setup the relevant environment vars, so the AM can login again. + if (loginFromKeytab) { + logInfo("To enable the AM to login from keytab, credentials are being copied over to the AM" + + " via the YARN Secure Distributed Cache.") + val localUri = new URI(args.keytab) + val localPath = getQualifiedLocalPath(localUri, hadoopConf) + val destinationPath = copyFileToRemote(dst, localPath, replication) + val destFs = FileSystem.get(destinationPath.toUri(), hadoopConf) + distCacheMgr.addResource( + destFs, hadoopConf, destinationPath, localResources, LocalResourceType.FILE, + sparkConf.get("spark.yarn.keytab"), statCache, appMasterOnly = true) + } + + def addDistributedUri(uri: URI): Boolean = { + val uriStr = uri.toString() + if (distributedUris.contains(uriStr)) { + logWarning(s"Resource $uri added multiple times to distributed cache.") + false + } else { + distributedUris += uriStr + true + } + } + /** * Copy the given main resource to the distributed cache if the scheme is not "local". * Otherwise, set the corresponding key in our SparkConf to handle it downstream. @@ -258,11 +290,13 @@ private[spark] class Client( if (!localPath.isEmpty()) { val localURI = new URI(localPath) if (localURI.getScheme != LOCAL_SCHEME) { - val src = getQualifiedLocalPath(localURI, hadoopConf) - val destPath = copyFileToRemote(dst, src, replication) - val destFs = FileSystem.get(destPath.toUri(), hadoopConf) - distCacheMgr.addResource(destFs, hadoopConf, destPath, - localResources, LocalResourceType.FILE, destName, statCache) + if (addDistributedUri(localURI)) { + val src = getQualifiedLocalPath(localURI, hadoopConf) + val destPath = copyFileToRemote(dst, src, replication) + val destFs = FileSystem.get(destPath.toUri(), hadoopConf) + distCacheMgr.addResource(destFs, hadoopConf, destPath, + localResources, LocalResourceType.FILE, destName, statCache) + } } else if (confKey != null) { // If the resource is intended for local use only, handle this downstream // by setting the appropriate property @@ -271,6 +305,13 @@ private[spark] class Client( } } + createConfArchive().foreach { file => + require(addDistributedUri(file.toURI())) + val destPath = copyFileToRemote(dst, new Path(file.toURI()), replication) + distCacheMgr.addResource(fs, hadoopConf, destPath, localResources, LocalResourceType.ARCHIVE, + LOCALIZED_HADOOP_CONF_DIR, statCache, appMasterOnly = true) + } + /** * Do the same for any additional resources passed in through ClientArguments. * Each resource category is represented by a 3-tuple of: @@ -288,13 +329,15 @@ private[spark] class Client( flist.split(',').foreach { file => val localURI = new URI(file.trim()) if (localURI.getScheme != LOCAL_SCHEME) { - val localPath = new Path(localURI) - val linkname = Option(localURI.getFragment()).getOrElse(localPath.getName()) - val destPath = copyFileToRemote(dst, localPath, replication) - distCacheMgr.addResource( - fs, hadoopConf, destPath, localResources, resType, linkname, statCache) - if (addToClasspath) { - cachedSecondaryJarLinks += linkname + if (addDistributedUri(localURI)) { + val localPath = new Path(localURI) + val linkname = Option(localURI.getFragment()).getOrElse(localPath.getName()) + val destPath = copyFileToRemote(dst, localPath, replication) + distCacheMgr.addResource( + fs, hadoopConf, destPath, localResources, resType, linkname, statCache) + if (addToClasspath) { + cachedSecondaryJarLinks += linkname + } } } else if (addToClasspath) { // Resource is intended for local use only and should be added to the class path @@ -310,6 +353,81 @@ private[spark] class Client( localResources } + /** + * Create an archive with the Hadoop config files for distribution. + * + * These are only used by the AM, since executors will use the configuration object broadcast by + * the driver. The files are zipped and added to the job as an archive, so that YARN will explode + * it when distributing to the AM. This directory is then added to the classpath of the AM + * process, just to make sure that everybody is using the same default config. + * + * This follows the order of precedence set by the startup scripts, in which HADOOP_CONF_DIR + * shows up in the classpath before YARN_CONF_DIR. + * + * Currently this makes a shallow copy of the conf directory. If there are cases where a + * Hadoop config directory contains subdirectories, this code will have to be fixed. + */ + private def createConfArchive(): Option[File] = { + val hadoopConfFiles = new HashMap[String, File]() + Seq("HADOOP_CONF_DIR", "YARN_CONF_DIR").foreach { envKey => + sys.env.get(envKey).foreach { path => + val dir = new File(path) + if (dir.isDirectory()) { + dir.listFiles().foreach { file => + if (file.isFile && !hadoopConfFiles.contains(file.getName())) { + hadoopConfFiles(file.getName()) = file + } + } + } + } + } + + if (!hadoopConfFiles.isEmpty) { + val hadoopConfArchive = File.createTempFile(LOCALIZED_HADOOP_CONF_DIR, ".zip", + new File(Utils.getLocalDir(sparkConf))) + + val hadoopConfStream = new ZipOutputStream(new FileOutputStream(hadoopConfArchive)) + try { + hadoopConfStream.setLevel(0) + hadoopConfFiles.foreach { case (name, file) => + if (file.canRead()) { + hadoopConfStream.putNextEntry(new ZipEntry(name)) + Files.copy(file, hadoopConfStream) + hadoopConfStream.closeEntry() + } + } + } finally { + hadoopConfStream.close() + } + + Some(hadoopConfArchive) + } else { + None + } + } + + /** + * Get the renewal interval for tokens. + */ + private def getTokenRenewalInterval(stagingDirPath: Path): Long = { + // We cannot use the tokens generated above since those have renewer yarn. Trying to renew + // those will fail with an access control issue. So create new tokens with the logged in + // user as renewer. + val creds = new Credentials() + val nns = YarnSparkHadoopUtil.get.getNameNodesToAccess(sparkConf) + stagingDirPath + YarnSparkHadoopUtil.get.obtainTokensForNamenodes( + nns, hadoopConf, creds, Some(sparkConf.get("spark.yarn.principal"))) + val t = creds.getAllTokens + .filter(_.getKind == DelegationTokenIdentifier.HDFS_DELEGATION_KIND) + .head + val newExpiration = t.renew(hadoopConf) + val identifier = new DelegationTokenIdentifier() + identifier.readFields(new DataInputStream(new ByteArrayInputStream(t.getIdentifier))) + val interval = newExpiration - identifier.getIssueDate + logInfo(s"Renewal Interval set to $interval") + interval + } + /** * Set up the environment for launching our ApplicationMaster container. */ @@ -317,11 +435,20 @@ private[spark] class Client( logInfo("Setting up the launch environment for our AM container") val env = new HashMap[String, String]() val extraCp = sparkConf.getOption("spark.driver.extraClassPath") - populateClasspath(args, yarnConf, sparkConf, env, extraCp) + populateClasspath(args, yarnConf, sparkConf, env, true, extraCp) env("SPARK_YARN_MODE") = "true" env("SPARK_YARN_STAGING_DIR") = stagingDir env("SPARK_USER") = UserGroupInformation.getCurrentUser().getShortUserName() - + if (loginFromKeytab) { + val remoteFs = FileSystem.get(hadoopConf) + val stagingDirPath = new Path(remoteFs.getHomeDirectory, stagingDir) + val credentialsFile = "credentials-" + UUID.randomUUID().toString + sparkConf.set( + "spark.yarn.credentials.file", new Path(stagingDirPath, credentialsFile).toString) + logInfo(s"Credentials file set to: $credentialsFile") + val renewalInterval = getTokenRenewalInterval(stagingDirPath) + sparkConf.set("spark.yarn.token.renewal.interval", renewalInterval.toString) + } // Set the environment variables to be passed on to the executors. distCacheMgr.setDistFilesEnv(env) distCacheMgr.setDistArchivesEnv(env) @@ -335,12 +462,23 @@ private[spark] class Client( // Keep this for backwards compatibility but users should move to the config sys.env.get("SPARK_YARN_USER_ENV").foreach { userEnvs => - // Allow users to specify some environment variables. + // Allow users to specify some environment variables. YarnSparkHadoopUtil.setEnvFromInputString(env, userEnvs) // Pass SPARK_YARN_USER_ENV itself to the AM so it can use it to set up executor environments. env("SPARK_YARN_USER_ENV") = userEnvs } + // if spark.submit.pyArchives is in sparkConf, append pyArchives to PYTHONPATH + // that can be passed on to the ApplicationMaster and the executors. + if (sparkConf.contains("spark.submit.pyArchives")) { + var pythonPath = sparkConf.get("spark.submit.pyArchives") + if (env.contains("PYTHONPATH")) { + pythonPath = Seq(env.get("PYTHONPATH"), pythonPath).mkString(File.pathSeparator) + } + env("PYTHONPATH") = pythonPath + sparkConf.setExecutorEnv("PYTHONPATH", pythonPath) + } + // In cluster mode, if the deprecated SPARK_JAVA_OPTS is set, we need to propagate it to // executors. But we can't just set spark.executor.extraJavaOptions, because the driver's // SparkContext will not let that set spark* system properties, which is expected behavior for @@ -384,9 +522,8 @@ private[spark] class Client( * This sets up the launch environment, java options, and the command for launching the AM. */ private def createContainerLaunchContext(newAppResponse: GetNewApplicationResponse) - : ContainerLaunchContext = { + : ContainerLaunchContext = { logInfo("Setting up container launch context for our AM") - val appId = newAppResponse.getApplicationId val appStagingDir = getAppStagingDir(appId) val localResources = prepareLocalResources(appStagingDir) @@ -467,6 +604,10 @@ private[spark] class Client( } javaOpts ++= Utils.splitCommandString(opts).map(YarnSparkHadoopUtil.escapeForShell) } + + sparkConf.getOption("spark.yarn.am.extraLibraryPath").foreach { paths => + prefixEnv = Some(Utils.libraryPathEnvPrefix(Seq(paths))) + } } // For log4j configuration to reference @@ -520,14 +661,14 @@ private[spark] class Client( val amArgs = Seq(amClass) ++ userClass ++ userJar ++ primaryPyFile ++ pyFiles ++ primaryRFile ++ userArgs ++ Seq( - "--executor-memory", args.executorMemory.toString + "m", - "--executor-cores", args.executorCores.toString, - "--num-executors ", args.numExecutors.toString) + "--executor-memory", args.executorMemory.toString + "m", + "--executor-cores", args.executorCores.toString, + "--num-executors ", args.numExecutors.toString) // Command for the ApplicationMaster val commands = prefixEnv ++ Seq( - YarnSparkHadoopUtil.expandEnvironment(Environment.JAVA_HOME) + "/bin/java", "-server" - ) ++ + YarnSparkHadoopUtil.expandEnvironment(Environment.JAVA_HOME) + "/bin/java", "-server" + ) ++ javaOpts ++ amArgs ++ Seq( "1>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout", @@ -557,6 +698,24 @@ private[spark] class Client( amContainer } + def setupCredentials(): Unit = { + if (args.principal != null) { + require(args.keytab != null, "Keytab must be specified when principal is specified.") + logInfo("Attempting to login to the Kerberos" + + s" using principal: ${args.principal} and keytab: ${args.keytab}") + val f = new File(args.keytab) + // Generate a file name that can be used for the keytab file, that does not conflict + // with any user file. + val keytabFileName = f.getName + "-" + UUID.randomUUID().toString + UserGroupInformation.loginUserFromKeytab(args.principal, args.keytab) + loginFromKeytab = true + sparkConf.set("spark.yarn.keytab", keytabFileName) + sparkConf.set("spark.yarn.principal", args.principal) + logInfo("Successfully logged into the KDC.") + } + credentials = UserGroupInformation.getCurrentUser.getCredentials + } + /** * Report the state of an application until it has exited, either successfully or * due to some failure, then return a pair of the yarn application state (FINISHED, FAILED, @@ -569,9 +728,9 @@ private[spark] class Client( * @return A pair of the yarn application state and the final application state. */ def monitorApplication( - appId: ApplicationId, - returnOnRunning: Boolean = false, - logApplicationReport: Boolean = true): (YarnApplicationState, FinalApplicationStatus) = { + appId: ApplicationId, + returnOnRunning: Boolean = false, + logApplicationReport: Boolean = true): (YarnApplicationState, FinalApplicationStatus) = { val interval = sparkConf.getLong("spark.yarn.report.interval", 1000) var lastState: YarnApplicationState = null while (true) { @@ -718,6 +877,9 @@ object Client extends Logging { // Distribution-defined classpath to add to processes val ENV_DIST_CLASSPATH = "SPARK_DIST_CLASSPATH" + // Subdirectory where the user's hadoop config files will be placed. + val LOCALIZED_HADOOP_CONF_DIR = "__hadoop_conf__" + /** * Find the user-defined Spark jar if configured, or return the jar containing this * class if not. @@ -750,7 +912,7 @@ object Client extends Logging { * classpath specified through the Hadoop and Yarn configurations. */ private[yarn] def populateHadoopClasspath(conf: Configuration, env: HashMap[String, String]) - : Unit = { + : Unit = { val classPathElementsToAdd = getYarnAppClasspath(conf) ++ getMRAppClasspath(conf) for (c <- classPathElementsToAdd.flatten) { YarnSparkHadoopUtil.addPathToEnvironment(env, Environment.CLASSPATH.name, c.trim) @@ -827,15 +989,23 @@ object Client extends Logging { * @param args Client arguments (when starting the AM) or null (when starting executors). */ private[yarn] def populateClasspath( - args: ClientArguments, - conf: Configuration, - sparkConf: SparkConf, - env: HashMap[String, String], - extraClassPath: Option[String] = None): Unit = { + args: ClientArguments, + conf: Configuration, + sparkConf: SparkConf, + env: HashMap[String, String], + isAM: Boolean, + extraClassPath: Option[String] = None): Unit = { extraClassPath.foreach(addClasspathEntry(_, env)) addClasspathEntry( YarnSparkHadoopUtil.expandEnvironment(Environment.PWD), env ) + + if (isAM) { + addClasspathEntry( + YarnSparkHadoopUtil.expandEnvironment(Environment.PWD) + Path.SEPARATOR + + LOCALIZED_HADOOP_CONF_DIR, env) + } + if (sparkConf.getBoolean("spark.yarn.user.classpath.first", false)) { val userClassPath = if (args != null) { @@ -863,8 +1033,8 @@ object Client extends Logging { } private def getUserClasspath( - mainJar: Option[String], - secondaryJars: Option[String]): Array[URI] = { + mainJar: Option[String], + secondaryJars: Option[String]): Array[URI] = { val mainUri = mainJar.orElse(Some(APP_JAR)).map(new URI(_)) val secondaryUris = secondaryJars.map(_.split(",")).toSeq.flatten.map(new URI(_)) (mainUri ++ secondaryUris).toArray @@ -883,9 +1053,9 @@ object Client extends Logging { * @param env Map holding the environment variables. */ private def addFileToClasspath( - uri: URI, - fileName: String, - env: HashMap[String, String]): Unit = { + uri: URI, + fileName: String, + env: HashMap[String, String]): Unit = { if (uri != null && uri.getScheme == LOCAL_SCHEME) { addClasspathEntry(uri.getPath, env) } else if (fileName != null) { @@ -901,46 +1071,6 @@ object Client extends Logging { private def addClasspathEntry(path: String, env: HashMap[String, String]): Unit = YarnSparkHadoopUtil.addPathToEnvironment(env, Environment.CLASSPATH.name, path) - /** - * Get the list of namenodes the user may access. - */ - private[yarn] def getNameNodesToAccess(sparkConf: SparkConf): Set[Path] = { - sparkConf.get("spark.yarn.access.namenodes", "") - .split(",") - .map(_.trim()) - .filter(!_.isEmpty) - .map(new Path(_)) - .toSet - } - - private[yarn] def getTokenRenewer(conf: Configuration): String = { - val delegTokenRenewer = Master.getMasterPrincipal(conf) - logDebug("delegation token renewer is: " + delegTokenRenewer) - if (delegTokenRenewer == null || delegTokenRenewer.length() == 0) { - val errorMessage = "Can't get Master Kerberos principal for use as renewer" - logError(errorMessage) - throw new SparkException(errorMessage) - } - delegTokenRenewer - } - - /** - * Obtains tokens for the namenodes passed in and adds them to the credentials. - */ - private def obtainTokensForNamenodes( - paths: Set[Path], - conf: Configuration, - creds: Credentials): Unit = { - if (UserGroupInformation.isSecurityEnabled()) { - val delegTokenRenewer = getTokenRenewer(conf) - paths.foreach { dst => - val dstFs = dst.getFileSystem(conf) - logDebug("getting token for namenode: " + dst) - dstFs.addDelegationTokens(delegTokenRenewer, creds) - } - } - } - /** * Obtains token for the Hive metastore and adds them to the credentials. */ @@ -999,6 +1129,41 @@ object Client extends Logging { } } + /** + * Obtain security token for HBase. + */ + def obtainTokenForHBase(conf: Configuration, credentials: Credentials): Unit = { + if (UserGroupInformation.isSecurityEnabled) { + val mirror = universe.runtimeMirror(getClass.getClassLoader) + + try { + val confCreate = mirror.classLoader. + loadClass("org.apache.hadoop.hbase.HBaseConfiguration"). + getMethod("create", classOf[Configuration]) + val obtainToken = mirror.classLoader. + loadClass("org.apache.hadoop.hbase.security.token.TokenUtil"). + getMethod("obtainToken", classOf[Configuration]) + + logDebug("Attempting to fetch HBase security token.") + + val hbaseConf = confCreate.invoke(null, conf) + val token = obtainToken.invoke(null, hbaseConf).asInstanceOf[Token[TokenIdentifier]] + credentials.addToken(token.getService, token) + + logInfo("Added HBase security token to credentials.") + } catch { + case e:java.lang.NoSuchMethodException => + logInfo("HBase Method not found: " + e) + case e:java.lang.ClassNotFoundException => + logDebug("HBase Class not found: " + e) + case e:java.lang.NoClassDefFoundError => + logDebug("HBase Class not found: " + e) + case e:Exception => + logError("Exception when obtaining HBase security token: " + e) + } + } + } + /** * Return whether the two file systems are the same. */ @@ -1052,8 +1217,7 @@ object Client extends Logging { if (isDriver) { conf.getBoolean("spark.driver.userClassPathFirst", false) } else { - conf.getBoolean("spark.executor.userClassPathFirst", - conf.getBoolean("spark.files.userClassPathFirst", false)) + conf.getBoolean("spark.executor.userClassPathFirst", false) } } @@ -1064,4 +1228,4 @@ object Client extends Logging { components.mkString(Path.SEPARATOR) } -} +} \ No newline at end of file