Skip to content

Commit

Permalink
[SPARK-33925][CORE] Remove unused SecurityManager in Utils.fetchFile
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

This is kind of a followup of #24033.
The first and last usage of that argument `SecurityManager` was removed in #24033.
After that,  we don't need to pass `SecurityManager` anymore in `Utils.fetchFile` and related code paths.

This PR proposes to remove it out.

### Why are the changes needed?

For better readability of codes.

### Does this PR introduce _any_ user-facing change?

No, dev-only.

### How was this patch tested?

Manually complied. GitHub Actions and Jenkins build should test it out as well.

Closes #30945 from HyukjinKwon/SPARK-33925.

Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
  • Loading branch information
HyukjinKwon authored and dongjoon-hyun committed Dec 29, 2020
1 parent c2eac1d commit b33fa53
Show file tree
Hide file tree
Showing 11 changed files with 35 additions and 54 deletions.
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1631,7 +1631,7 @@ class SparkContext(config: SparkConf) extends Logging {
// Fetch the file locally so that closures which are run on the driver can still use the
// SparkFiles API to access files.
Utils.fetchFile(uri.toString, new File(SparkFiles.getRootDirectory()), conf,
env.securityManager, hadoopConfiguration, timestamp, useCache = false)
hadoopConfiguration, timestamp, useCache = false)
postEnvironmentUpdate()
} else if (
isArchive &&
Expand All @@ -1643,7 +1643,7 @@ class SparkContext(config: SparkConf) extends Logging {
val uriToUse = if (!isLocal && scheme == "file") uri else new URI(key)
val uriToDownload = UriBuilder.fromUri(uriToUse).fragment(null).build()
val source = Utils.fetchFile(uriToDownload.toString, Utils.createTempDir(), conf,
env.securityManager, hadoopConfiguration, timestamp, useCache = false, shouldUntar = false)
hadoopConfiguration, timestamp, useCache = false, shouldUntar = false)
val dest = new File(
SparkFiles.getRootDirectory(),
if (uri.getFragment != null) uri.getFragment else source.getName)
Expand Down
12 changes: 6 additions & 6 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -374,13 +374,13 @@ private[spark] class SparkSubmit extends Logging {
var localPyFiles: String = null
if (deployMode == CLIENT) {
localPrimaryResource = Option(args.primaryResource).map {
downloadFile(_, targetDir, sparkConf, hadoopConf, secMgr)
downloadFile(_, targetDir, sparkConf, hadoopConf)
}.orNull
localJars = Option(args.jars).map {
downloadFileList(_, targetDir, sparkConf, hadoopConf, secMgr)
downloadFileList(_, targetDir, sparkConf, hadoopConf)
}.orNull
localPyFiles = Option(args.pyFiles).map {
downloadFileList(_, targetDir, sparkConf, hadoopConf, secMgr)
downloadFileList(_, targetDir, sparkConf, hadoopConf)
}.orNull

if (isKubernetesClusterModeDriver) {
Expand All @@ -389,14 +389,14 @@ private[spark] class SparkSubmit extends Logging {
// Explicitly download the related files here
args.jars = localJars
val filesLocalFiles = Option(args.files).map {
downloadFileList(_, targetDir, sparkConf, hadoopConf, secMgr)
downloadFileList(_, targetDir, sparkConf, hadoopConf)
}.orNull
val archiveLocalFiles = Option(args.archives).map { uris =>
val resolvedUris = Utils.stringToSeq(uris).map(Utils.resolveURI)
val localArchives = downloadFileList(
resolvedUris.map(
UriBuilder.fromUri(_).fragment(null).build().toString).mkString(","),
targetDir, sparkConf, hadoopConf, secMgr)
targetDir, sparkConf, hadoopConf)

// SPARK-33748: this mimics the behaviour of Yarn cluster mode. If the driver is running
// in cluster mode, the archives should be available in the driver's current working
Expand Down Expand Up @@ -447,7 +447,7 @@ private[spark] class SparkSubmit extends Logging {
if (file.exists()) {
file.toURI.toString
} else {
downloadFile(resource, targetDir, sparkConf, hadoopConf, secMgr)
downloadFile(resource, targetDir, sparkConf, hadoopConf)
}
case _ => uri.toString
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,6 @@ private[deploy] class DriverRunner(
driverDesc.jarUrl,
driverDir,
conf,
securityManager,
SparkHadoopUtil.get.newConfiguration(conf),
System.currentTimeMillis(),
useCache = false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,7 @@ object DriverWrapper extends Logging {
jarsProp
}
}
val localJars = DependencyUtils.resolveAndDownloadJars(jars, userJar, sparkConf, hadoopConf,
secMgr)
val localJars = DependencyUtils.resolveAndDownloadJars(jars, userJar, sparkConf, hadoopConf)
DependencyUtils.addJarsToClassPath(localJars, loader)
}
}
6 changes: 3 additions & 3 deletions core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -924,15 +924,15 @@ private[spark] class Executor(
logInfo(s"Fetching $name with timestamp $timestamp")
// Fetch file with useCache mode, close cache for local mode.
Utils.fetchFile(name, new File(SparkFiles.getRootDirectory()), conf,
env.securityManager, hadoopConf, timestamp, useCache = !isLocal)
hadoopConf, timestamp, useCache = !isLocal)
currentFiles(name) = timestamp
}
for ((name, timestamp) <- newArchives if currentArchives.getOrElse(name, -1L) < timestamp) {
logInfo(s"Fetching $name with timestamp $timestamp")
val sourceURI = new URI(name)
val uriToDownload = UriBuilder.fromUri(sourceURI).fragment(null).build()
val source = Utils.fetchFile(uriToDownload.toString, Utils.createTempDir(), conf,
env.securityManager, hadoopConf, timestamp, useCache = !isLocal, shouldUntar = false)
hadoopConf, timestamp, useCache = !isLocal, shouldUntar = false)
val dest = new File(
SparkFiles.getRootDirectory(),
if (sourceURI.getFragment != null) sourceURI.getFragment else source.getName)
Expand All @@ -951,7 +951,7 @@ private[spark] class Executor(
logInfo(s"Fetching $name with timestamp $timestamp")
// Fetch file with useCache mode, close cache for local mode.
Utils.fetchFile(name, new File(SparkFiles.getRootDirectory()), conf,
env.securityManager, hadoopConf, timestamp, useCache = !isLocal)
hadoopConf, timestamp, useCache = !isLocal)
currentJars(name) = timestamp
// Add it to our class loader
val url = new File(SparkFiles.getRootDirectory(), localName).toURI.toURL
Expand Down
20 changes: 7 additions & 13 deletions core/src/main/scala/org/apache/spark/util/DependencyUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.commons.lang3.StringUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}

import org.apache.spark.{SecurityManager, SparkConf, SparkException}
import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.deploy.SparkSubmitUtils
import org.apache.spark.internal.Logging

Expand Down Expand Up @@ -187,8 +187,7 @@ private[spark] object DependencyUtils extends Logging {
jars: String,
userJar: String,
sparkConf: SparkConf,
hadoopConf: Configuration,
secMgr: SecurityManager): String = {
hadoopConf: Configuration): String = {
val targetDir = Utils.createTempDir()
val userJarName = userJar.split(File.separatorChar).last
Option(jars)
Expand All @@ -199,7 +198,7 @@ private[spark] object DependencyUtils extends Logging {
.mkString(",")
}
.filterNot(_ == "")
.map(downloadFileList(_, targetDir, sparkConf, hadoopConf, secMgr))
.map(downloadFileList(_, targetDir, sparkConf, hadoopConf))
.orNull
}

Expand All @@ -219,18 +218,16 @@ private[spark] object DependencyUtils extends Logging {
* @param targetDir A temporary directory for which downloaded files.
* @param sparkConf Spark configuration.
* @param hadoopConf Hadoop configuration.
* @param secMgr Spark security manager.
* @return A comma separated local files list.
*/
def downloadFileList(
fileList: String,
targetDir: File,
sparkConf: SparkConf,
hadoopConf: Configuration,
secMgr: SecurityManager): String = {
hadoopConf: Configuration): String = {
require(fileList != null, "fileList cannot be null.")
Utils.stringToSeq(fileList)
.map(downloadFile(_, targetDir, sparkConf, hadoopConf, secMgr))
.map(downloadFile(_, targetDir, sparkConf, hadoopConf))
.mkString(",")
}

Expand All @@ -242,15 +239,13 @@ private[spark] object DependencyUtils extends Logging {
* @param targetDir A temporary directory for which downloaded files.
* @param sparkConf Spark configuration.
* @param hadoopConf Hadoop configuration.
* @param secMgr Spark security manager.
* @return Path to the local file.
*/
def downloadFile(
path: String,
targetDir: File,
sparkConf: SparkConf,
hadoopConf: Configuration,
secMgr: SecurityManager): String = {
hadoopConf: Configuration): String = {
require(path != null, "path cannot be null.")
val uri = Utils.resolveURI(path)

Expand All @@ -263,8 +258,7 @@ private[spark] object DependencyUtils extends Logging {
new File(targetDir, file.getName).toURI.toString
case _ =>
val fname = new Path(uri).getName()
val localFile = Utils.doFetchFile(uri.toString(), targetDir, fname, sparkConf, secMgr,
hadoopConf)
val localFile = Utils.doFetchFile(uri.toString(), targetDir, fname, sparkConf, hadoopConf)
localFile.toURI().toString()
}
}
Expand Down
6 changes: 2 additions & 4 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -496,7 +496,6 @@ private[spark] object Utils extends Logging {
url: String,
targetDir: File,
conf: SparkConf,
securityMgr: SecurityManager,
hadoopConf: Configuration,
timestamp: Long,
useCache: Boolean,
Expand Down Expand Up @@ -525,7 +524,7 @@ private[spark] object Utils extends Logging {
val cachedFile = new File(localDir, cachedFileName)
try {
if (!cachedFile.exists()) {
doFetchFile(url, localDir, cachedFileName, conf, securityMgr, hadoopConf)
doFetchFile(url, localDir, cachedFileName, conf, hadoopConf)
}
} finally {
lock.release()
Expand All @@ -538,7 +537,7 @@ private[spark] object Utils extends Logging {
conf.getBoolean("spark.files.overwrite", false)
)
} else {
doFetchFile(url, targetDir, fileName, conf, securityMgr, hadoopConf)
doFetchFile(url, targetDir, fileName, conf, hadoopConf)
}

if (shouldUntar) {
Expand Down Expand Up @@ -741,7 +740,6 @@ private[spark] object Utils extends Logging {
targetDir: File,
filename: String,
conf: SparkConf,
securityMgr: SecurityManager,
hadoopConf: Configuration): File = {
val targetFile = new File(targetDir, filename)
val uri = new URI(url)
Expand Down
23 changes: 8 additions & 15 deletions core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1121,8 +1121,7 @@ class SparkSubmitSuite
val sparkConf = new SparkConf(false)
intercept[IOException] {
DependencyUtils.downloadFile(
"abc:/my/file", Utils.createTempDir(), sparkConf, new Configuration(),
new SecurityManager(sparkConf))
"abc:/my/file", Utils.createTempDir(), sparkConf, new Configuration())
}
}

Expand All @@ -1132,19 +1131,17 @@ class SparkSubmitSuite
val tmpDir = Utils.createTempDir()
updateConfWithFakeS3Fs(hadoopConf)
intercept[FileNotFoundException] {
DependencyUtils.downloadFile("s3a:/no/such/file", tmpDir, sparkConf, hadoopConf,
new SecurityManager(sparkConf))
DependencyUtils.downloadFile("s3a:/no/such/file", tmpDir, sparkConf, hadoopConf)
}
}

test("downloadFile does not download local file") {
val sparkConf = new SparkConf(false)
val secMgr = new SecurityManager(sparkConf)
// empty path is considered as local file.
val tmpDir = Files.createTempDirectory("tmp").toFile
assert(DependencyUtils.downloadFile("", tmpDir, sparkConf, new Configuration(), secMgr) === "")
assert(DependencyUtils.downloadFile("/local/file", tmpDir, sparkConf, new Configuration(),
secMgr) === "/local/file")
assert(DependencyUtils.downloadFile("", tmpDir, sparkConf, new Configuration()) === "")
assert(DependencyUtils.downloadFile(
"/local/file", tmpDir, sparkConf, new Configuration()) === "/local/file")
}

test("download one file to local") {
Expand All @@ -1157,8 +1154,7 @@ class SparkSubmitSuite
val tmpDir = Files.createTempDirectory("tmp").toFile
updateConfWithFakeS3Fs(hadoopConf)
val sourcePath = s"s3a://${jarFile.toURI.getPath}"
val outputPath = DependencyUtils.downloadFile(sourcePath, tmpDir, sparkConf, hadoopConf,
new SecurityManager(sparkConf))
val outputPath = DependencyUtils.downloadFile(sourcePath, tmpDir, sparkConf, hadoopConf)
checkDownloadedFile(sourcePath, outputPath)
deleteTempOutputFile(outputPath)
}
Expand All @@ -1174,8 +1170,7 @@ class SparkSubmitSuite
updateConfWithFakeS3Fs(hadoopConf)
val sourcePaths = Seq("/local/file", s"s3a://${jarFile.toURI.getPath}")
val outputPaths = DependencyUtils
.downloadFileList(sourcePaths.mkString(","), tmpDir, sparkConf, hadoopConf,
new SecurityManager(sparkConf))
.downloadFileList(sourcePaths.mkString(","), tmpDir, sparkConf, hadoopConf)
.split(",")

assert(outputPaths.length === sourcePaths.length)
Expand All @@ -1189,16 +1184,14 @@ class SparkSubmitSuite
val fs = File.separator
val sparkConf = new SparkConf(false)
val hadoopConf = new Configuration()
val secMgr = new SecurityManager(sparkConf)

val appJarName = "myApp.jar"
val jar1Name = "myJar1.jar"
val jar2Name = "myJar2.jar"
val userJar = s"file:/path${fs}to${fs}app${fs}jar$fs$appJarName"
val jars = s"file:/$jar1Name,file:/$appJarName,file:/$jar2Name"

val resolvedJars = DependencyUtils
.resolveAndDownloadJars(jars, userJar, sparkConf, hadoopConf, secMgr)
val resolvedJars = DependencyUtils.resolveAndDownloadJars(jars, userJar, sparkConf, hadoopConf)

assert(!resolvedJars.contains(appJarName))
assert(resolvedJars.contains(jar1Name) && resolvedJars.contains(jar2Name))
Expand Down
7 changes: 3 additions & 4 deletions core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import org.mockito.Mockito.{mock, never, verify, when}
import org.scalatest.BeforeAndAfterAll
import org.scalatest.concurrent.Eventually._

import org.apache.spark.{SecurityManager, SparkConf, SparkEnv, SparkException, SparkFunSuite}
import org.apache.spark.{SparkConf, SparkEnv, SparkException, SparkFunSuite}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.config._
import org.apache.spark.util.{ThreadUtils, Utils}
Expand Down Expand Up @@ -901,7 +901,6 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
}
}

val sm = new SecurityManager(conf)
val hc = SparkHadoopUtil.get.conf

val files = Seq(
Expand All @@ -913,15 +912,15 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
(subFile2, dir2Uri + "/file2"))
files.foreach { case (f, uri) =>
val destFile = new File(destDir, f.getName())
Utils.fetchFile(uri, destDir, conf, sm, hc, 0L, false)
Utils.fetchFile(uri, destDir, conf, hc, 0L, false)
assert(Files.equal(f, destFile))
}

// Try to download files that do not exist.
Seq("files", "jars", "dir1").foreach { root =>
intercept[Exception] {
val uri = env.address.toSparkURL + s"/$root/doesNotExist"
Utils.fetchFile(uri, destDir, conf, sm, hc, 0L, false)
Utils.fetchFile(uri, destDir, conf, hc, 0L, false)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import scala.util.control.NonFatal
import org.apache.commons.lang3.{JavaVersion, SystemUtils}
import org.apache.hadoop.conf.Configuration

import org.apache.spark.{SecurityManager, SparkConf, TestUtils}
import org.apache.spark.{SparkConf, TestUtils}
import org.apache.spark.internal.config.MASTER_REST_SERVER_ENABLED
import org.apache.spark.internal.config.UI.UI_ENABLED
import org.apache.spark.sql.{QueryTest, Row, SparkSession}
Expand Down Expand Up @@ -134,7 +134,6 @@ class HiveExternalCatalogVersionsSuite extends SparkSubmitTestUtils {
// if the caller passes the name of an existing file, we want doFetchFile to write over it with
// the contents from the specified url.
conf.set("spark.files.overwrite", "true")
val securityManager = new SecurityManager(conf)
val hadoopConf = new Configuration

val outDir = new File(targetDir)
Expand All @@ -143,7 +142,7 @@ class HiveExternalCatalogVersionsSuite extends SparkSubmitTestUtils {
}

// propagate exceptions up to the caller of getFileFromUrl
Utils.doFetchFile(urlString, outDir, filename, conf, securityManager, hadoopConf)
Utils.doFetchFile(urlString, outDir, filename, conf, hadoopConf)
}

private def getStringFromUrl(urlString: String): String = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -686,7 +686,7 @@ private[hive] object HiveTestJars {
val fileName = urlString.split("/").last
val targetFile = new File(hiveTestJarsDir, fileName)
if (!targetFile.exists()) {
Utils.doFetchFile(urlString, hiveTestJarsDir, fileName, new SparkConf, null, null)
Utils.doFetchFile(urlString, hiveTestJarsDir, fileName, new SparkConf, null)
}
targetFile
}
Expand Down

0 comments on commit b33fa53

Please sign in to comment.