diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index aae340953c5b2..f6e8a5694dbdf 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1631,7 +1631,7 @@ class SparkContext(config: SparkConf) extends Logging { // Fetch the file locally so that closures which are run on the driver can still use the // SparkFiles API to access files. Utils.fetchFile(uri.toString, new File(SparkFiles.getRootDirectory()), conf, - env.securityManager, hadoopConfiguration, timestamp, useCache = false) + hadoopConfiguration, timestamp, useCache = false) postEnvironmentUpdate() } else if ( isArchive && @@ -1643,7 +1643,7 @@ class SparkContext(config: SparkConf) extends Logging { val uriToUse = if (!isLocal && scheme == "file") uri else new URI(key) val uriToDownload = UriBuilder.fromUri(uriToUse).fragment(null).build() val source = Utils.fetchFile(uriToDownload.toString, Utils.createTempDir(), conf, - env.securityManager, hadoopConfiguration, timestamp, useCache = false, shouldUntar = false) + hadoopConfiguration, timestamp, useCache = false, shouldUntar = false) val dest = new File( SparkFiles.getRootDirectory(), if (uri.getFragment != null) uri.getFragment else source.getName) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 7b7d9dd72a344..0cf309f148156 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -374,13 +374,13 @@ private[spark] class SparkSubmit extends Logging { var localPyFiles: String = null if (deployMode == CLIENT) { localPrimaryResource = Option(args.primaryResource).map { - downloadFile(_, targetDir, sparkConf, hadoopConf, secMgr) + downloadFile(_, targetDir, sparkConf, hadoopConf) }.orNull localJars = Option(args.jars).map { - downloadFileList(_, targetDir, sparkConf, hadoopConf, secMgr) + downloadFileList(_, targetDir, sparkConf, hadoopConf) }.orNull localPyFiles = Option(args.pyFiles).map { - downloadFileList(_, targetDir, sparkConf, hadoopConf, secMgr) + downloadFileList(_, targetDir, sparkConf, hadoopConf) }.orNull if (isKubernetesClusterModeDriver) { @@ -389,14 +389,14 @@ private[spark] class SparkSubmit extends Logging { // Explicitly download the related files here args.jars = localJars val filesLocalFiles = Option(args.files).map { - downloadFileList(_, targetDir, sparkConf, hadoopConf, secMgr) + downloadFileList(_, targetDir, sparkConf, hadoopConf) }.orNull val archiveLocalFiles = Option(args.archives).map { uris => val resolvedUris = Utils.stringToSeq(uris).map(Utils.resolveURI) val localArchives = downloadFileList( resolvedUris.map( UriBuilder.fromUri(_).fragment(null).build().toString).mkString(","), - targetDir, sparkConf, hadoopConf, secMgr) + targetDir, sparkConf, hadoopConf) // SPARK-33748: this mimics the behaviour of Yarn cluster mode. If the driver is running // in cluster mode, the archives should be available in the driver's current working @@ -447,7 +447,7 @@ private[spark] class SparkSubmit extends Logging { if (file.exists()) { file.toURI.toString } else { - downloadFile(resource, targetDir, sparkConf, hadoopConf, secMgr) + downloadFile(resource, targetDir, sparkConf, hadoopConf) } case _ => uri.toString } diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala index 4f9c497fc3d76..776d9164cdbbe 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala @@ -160,7 +160,6 @@ private[deploy] class DriverRunner( driverDesc.jarUrl, driverDir, conf, - securityManager, SparkHadoopUtil.get.newConfiguration(conf), System.currentTimeMillis(), useCache = false) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala index 7cf961f42112c..61fb92999cfe3 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala @@ -91,8 +91,7 @@ object DriverWrapper extends Logging { jarsProp } } - val localJars = DependencyUtils.resolveAndDownloadJars(jars, userJar, sparkConf, hadoopConf, - secMgr) + val localJars = DependencyUtils.resolveAndDownloadJars(jars, userJar, sparkConf, hadoopConf) DependencyUtils.addJarsToClassPath(localJars, loader) } } diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index e7f1b8f3cf17a..c58009c166a60 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -924,7 +924,7 @@ private[spark] class Executor( logInfo(s"Fetching $name with timestamp $timestamp") // Fetch file with useCache mode, close cache for local mode. Utils.fetchFile(name, new File(SparkFiles.getRootDirectory()), conf, - env.securityManager, hadoopConf, timestamp, useCache = !isLocal) + hadoopConf, timestamp, useCache = !isLocal) currentFiles(name) = timestamp } for ((name, timestamp) <- newArchives if currentArchives.getOrElse(name, -1L) < timestamp) { @@ -932,7 +932,7 @@ private[spark] class Executor( val sourceURI = new URI(name) val uriToDownload = UriBuilder.fromUri(sourceURI).fragment(null).build() val source = Utils.fetchFile(uriToDownload.toString, Utils.createTempDir(), conf, - env.securityManager, hadoopConf, timestamp, useCache = !isLocal, shouldUntar = false) + hadoopConf, timestamp, useCache = !isLocal, shouldUntar = false) val dest = new File( SparkFiles.getRootDirectory(), if (sourceURI.getFragment != null) sourceURI.getFragment else source.getName) @@ -951,7 +951,7 @@ private[spark] class Executor( logInfo(s"Fetching $name with timestamp $timestamp") // Fetch file with useCache mode, close cache for local mode. Utils.fetchFile(name, new File(SparkFiles.getRootDirectory()), conf, - env.securityManager, hadoopConf, timestamp, useCache = !isLocal) + hadoopConf, timestamp, useCache = !isLocal) currentJars(name) = timestamp // Add it to our class loader val url = new File(SparkFiles.getRootDirectory(), localName).toURI.toURL diff --git a/core/src/main/scala/org/apache/spark/util/DependencyUtils.scala b/core/src/main/scala/org/apache/spark/util/DependencyUtils.scala index 0d78af2dafc99..789811fa5f3a4 100644 --- a/core/src/main/scala/org/apache/spark/util/DependencyUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/DependencyUtils.scala @@ -24,7 +24,7 @@ import org.apache.commons.lang3.StringUtils import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} -import org.apache.spark.{SecurityManager, SparkConf, SparkException} +import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.deploy.SparkSubmitUtils import org.apache.spark.internal.Logging @@ -187,8 +187,7 @@ private[spark] object DependencyUtils extends Logging { jars: String, userJar: String, sparkConf: SparkConf, - hadoopConf: Configuration, - secMgr: SecurityManager): String = { + hadoopConf: Configuration): String = { val targetDir = Utils.createTempDir() val userJarName = userJar.split(File.separatorChar).last Option(jars) @@ -199,7 +198,7 @@ private[spark] object DependencyUtils extends Logging { .mkString(",") } .filterNot(_ == "") - .map(downloadFileList(_, targetDir, sparkConf, hadoopConf, secMgr)) + .map(downloadFileList(_, targetDir, sparkConf, hadoopConf)) .orNull } @@ -219,18 +218,16 @@ private[spark] object DependencyUtils extends Logging { * @param targetDir A temporary directory for which downloaded files. * @param sparkConf Spark configuration. * @param hadoopConf Hadoop configuration. - * @param secMgr Spark security manager. * @return A comma separated local files list. */ def downloadFileList( fileList: String, targetDir: File, sparkConf: SparkConf, - hadoopConf: Configuration, - secMgr: SecurityManager): String = { + hadoopConf: Configuration): String = { require(fileList != null, "fileList cannot be null.") Utils.stringToSeq(fileList) - .map(downloadFile(_, targetDir, sparkConf, hadoopConf, secMgr)) + .map(downloadFile(_, targetDir, sparkConf, hadoopConf)) .mkString(",") } @@ -242,15 +239,13 @@ private[spark] object DependencyUtils extends Logging { * @param targetDir A temporary directory for which downloaded files. * @param sparkConf Spark configuration. * @param hadoopConf Hadoop configuration. - * @param secMgr Spark security manager. * @return Path to the local file. */ def downloadFile( path: String, targetDir: File, sparkConf: SparkConf, - hadoopConf: Configuration, - secMgr: SecurityManager): String = { + hadoopConf: Configuration): String = { require(path != null, "path cannot be null.") val uri = Utils.resolveURI(path) @@ -263,8 +258,7 @@ private[spark] object DependencyUtils extends Logging { new File(targetDir, file.getName).toURI.toString case _ => val fname = new Path(uri).getName() - val localFile = Utils.doFetchFile(uri.toString(), targetDir, fname, sparkConf, secMgr, - hadoopConf) + val localFile = Utils.doFetchFile(uri.toString(), targetDir, fname, sparkConf, hadoopConf) localFile.toURI().toString() } } diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 416fc43dc44aa..5e68dcd9df7fc 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -496,7 +496,6 @@ private[spark] object Utils extends Logging { url: String, targetDir: File, conf: SparkConf, - securityMgr: SecurityManager, hadoopConf: Configuration, timestamp: Long, useCache: Boolean, @@ -525,7 +524,7 @@ private[spark] object Utils extends Logging { val cachedFile = new File(localDir, cachedFileName) try { if (!cachedFile.exists()) { - doFetchFile(url, localDir, cachedFileName, conf, securityMgr, hadoopConf) + doFetchFile(url, localDir, cachedFileName, conf, hadoopConf) } } finally { lock.release() @@ -538,7 +537,7 @@ private[spark] object Utils extends Logging { conf.getBoolean("spark.files.overwrite", false) ) } else { - doFetchFile(url, targetDir, fileName, conf, securityMgr, hadoopConf) + doFetchFile(url, targetDir, fileName, conf, hadoopConf) } if (shouldUntar) { @@ -741,7 +740,6 @@ private[spark] object Utils extends Logging { targetDir: File, filename: String, conf: SparkConf, - securityMgr: SecurityManager, hadoopConf: Configuration): File = { val targetFile = new File(targetDir, filename) val uri = new URI(url) diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala index c64f1b5814c20..edcebf5fc60dd 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala @@ -1121,8 +1121,7 @@ class SparkSubmitSuite val sparkConf = new SparkConf(false) intercept[IOException] { DependencyUtils.downloadFile( - "abc:/my/file", Utils.createTempDir(), sparkConf, new Configuration(), - new SecurityManager(sparkConf)) + "abc:/my/file", Utils.createTempDir(), sparkConf, new Configuration()) } } @@ -1132,19 +1131,17 @@ class SparkSubmitSuite val tmpDir = Utils.createTempDir() updateConfWithFakeS3Fs(hadoopConf) intercept[FileNotFoundException] { - DependencyUtils.downloadFile("s3a:/no/such/file", tmpDir, sparkConf, hadoopConf, - new SecurityManager(sparkConf)) + DependencyUtils.downloadFile("s3a:/no/such/file", tmpDir, sparkConf, hadoopConf) } } test("downloadFile does not download local file") { val sparkConf = new SparkConf(false) - val secMgr = new SecurityManager(sparkConf) // empty path is considered as local file. val tmpDir = Files.createTempDirectory("tmp").toFile - assert(DependencyUtils.downloadFile("", tmpDir, sparkConf, new Configuration(), secMgr) === "") - assert(DependencyUtils.downloadFile("/local/file", tmpDir, sparkConf, new Configuration(), - secMgr) === "/local/file") + assert(DependencyUtils.downloadFile("", tmpDir, sparkConf, new Configuration()) === "") + assert(DependencyUtils.downloadFile( + "/local/file", tmpDir, sparkConf, new Configuration()) === "/local/file") } test("download one file to local") { @@ -1157,8 +1154,7 @@ class SparkSubmitSuite val tmpDir = Files.createTempDirectory("tmp").toFile updateConfWithFakeS3Fs(hadoopConf) val sourcePath = s"s3a://${jarFile.toURI.getPath}" - val outputPath = DependencyUtils.downloadFile(sourcePath, tmpDir, sparkConf, hadoopConf, - new SecurityManager(sparkConf)) + val outputPath = DependencyUtils.downloadFile(sourcePath, tmpDir, sparkConf, hadoopConf) checkDownloadedFile(sourcePath, outputPath) deleteTempOutputFile(outputPath) } @@ -1174,8 +1170,7 @@ class SparkSubmitSuite updateConfWithFakeS3Fs(hadoopConf) val sourcePaths = Seq("/local/file", s"s3a://${jarFile.toURI.getPath}") val outputPaths = DependencyUtils - .downloadFileList(sourcePaths.mkString(","), tmpDir, sparkConf, hadoopConf, - new SecurityManager(sparkConf)) + .downloadFileList(sourcePaths.mkString(","), tmpDir, sparkConf, hadoopConf) .split(",") assert(outputPaths.length === sourcePaths.length) @@ -1189,7 +1184,6 @@ class SparkSubmitSuite val fs = File.separator val sparkConf = new SparkConf(false) val hadoopConf = new Configuration() - val secMgr = new SecurityManager(sparkConf) val appJarName = "myApp.jar" val jar1Name = "myJar1.jar" @@ -1197,8 +1191,7 @@ class SparkSubmitSuite val userJar = s"file:/path${fs}to${fs}app${fs}jar$fs$appJarName" val jars = s"file:/$jar1Name,file:/$appJarName,file:/$jar2Name" - val resolvedJars = DependencyUtils - .resolveAndDownloadJars(jars, userJar, sparkConf, hadoopConf, secMgr) + val resolvedJars = DependencyUtils.resolveAndDownloadJars(jars, userJar, sparkConf, hadoopConf) assert(!resolvedJars.contains(appJarName)) assert(resolvedJars.contains(jar1Name) && resolvedJars.contains(jar2Name)) diff --git a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala index d25fd20340d48..bec96e523e9e5 100644 --- a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala +++ b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala @@ -33,7 +33,7 @@ import org.mockito.Mockito.{mock, never, verify, when} import org.scalatest.BeforeAndAfterAll import org.scalatest.concurrent.Eventually._ -import org.apache.spark.{SecurityManager, SparkConf, SparkEnv, SparkException, SparkFunSuite} +import org.apache.spark.{SparkConf, SparkEnv, SparkException, SparkFunSuite} import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.config._ import org.apache.spark.util.{ThreadUtils, Utils} @@ -901,7 +901,6 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll { } } - val sm = new SecurityManager(conf) val hc = SparkHadoopUtil.get.conf val files = Seq( @@ -913,7 +912,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll { (subFile2, dir2Uri + "/file2")) files.foreach { case (f, uri) => val destFile = new File(destDir, f.getName()) - Utils.fetchFile(uri, destDir, conf, sm, hc, 0L, false) + Utils.fetchFile(uri, destDir, conf, hc, 0L, false) assert(Files.equal(f, destFile)) } @@ -921,7 +920,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll { Seq("files", "jars", "dir1").foreach { root => intercept[Exception] { val uri = env.address.toSparkURL + s"/$root/doesNotExist" - Utils.fetchFile(uri, destDir, conf, sm, hc, 0L, false) + Utils.fetchFile(uri, destDir, conf, hc, 0L, false) } } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala index 07d8dacf98252..37287fc394647 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala @@ -27,7 +27,7 @@ import scala.util.control.NonFatal import org.apache.commons.lang3.{JavaVersion, SystemUtils} import org.apache.hadoop.conf.Configuration -import org.apache.spark.{SecurityManager, SparkConf, TestUtils} +import org.apache.spark.{SparkConf, TestUtils} import org.apache.spark.internal.config.MASTER_REST_SERVER_ENABLED import org.apache.spark.internal.config.UI.UI_ENABLED import org.apache.spark.sql.{QueryTest, Row, SparkSession} @@ -134,7 +134,6 @@ class HiveExternalCatalogVersionsSuite extends SparkSubmitTestUtils { // if the caller passes the name of an existing file, we want doFetchFile to write over it with // the contents from the specified url. conf.set("spark.files.overwrite", "true") - val securityManager = new SecurityManager(conf) val hadoopConf = new Configuration val outDir = new File(targetDir) @@ -143,7 +142,7 @@ class HiveExternalCatalogVersionsSuite extends SparkSubmitTestUtils { } // propagate exceptions up to the caller of getFileFromUrl - Utils.doFetchFile(urlString, outDir, filename, conf, securityManager, hadoopConf) + Utils.doFetchFile(urlString, outDir, filename, conf, hadoopConf) } private def getStringFromUrl(urlString: String): String = { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala index 962efa8303f9b..b70afd3e6b98f 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala @@ -686,7 +686,7 @@ private[hive] object HiveTestJars { val fileName = urlString.split("/").last val targetFile = new File(hiveTestJarsDir, fileName) if (!targetFile.exists()) { - Utils.doFetchFile(urlString, hiveTestJarsDir, fileName, new SparkConf, null, null) + Utils.doFetchFile(urlString, hiveTestJarsDir, fileName, new SparkConf, null) } targetFile }