diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 6eaf6794764c7..255e5d0e98369 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -805,11 +805,12 @@ class SparkContext(config: SparkConf) extends Logging { case "local" => "file:" + uri.getPath case _ => path } - addedFiles(key) = System.currentTimeMillis + val timestamp = System.currentTimeMillis + addedFiles(key) = timestamp // Fetch the file locally in case a job is executed using DAGScheduler.runLocally(). - Utils.fetchFile(path, new File(SparkFiles.getRootDirectory()), conf, env.securityManager, - hadoopConfiguration) + Utils.fetchCachedFile(path, new File(SparkFiles.getRootDirectory()), conf, env.securityManager, + timestamp, hadoopConfiguration, false) logInfo("Added file " + path + " at " + key + " with timestamp " + addedFiles(key)) postEnvironmentUpdate() diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index da1121879c780..fac21582ffb1a 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -323,12 +323,13 @@ private[spark] class Executor( for ((name, timestamp) <- newFiles if currentFiles.getOrElse(name, -1L) < timestamp) { logInfo("Fetching " + name + " with timestamp " + timestamp) Utils.fetchCachedFile(name, new File(SparkFiles.getRootDirectory), conf, - env.securityManager, hadoopConf, timestamp) + env.securityManager, hadoopConf, timestamp, true) + currentFiles(name) = timestamp } for ((name, timestamp) <- newJars if currentJars.getOrElse(name, -1L) < timestamp) { logInfo("Fetching " + name + " with timestamp " + timestamp) Utils.fetchCachedFile(name, new File(SparkFiles.getRootDirectory), conf, - env.securityManager, hadoopConf, timestamp) + env.securityManager, hadoopConf, timestamp, true) currentJars(name) = timestamp // Add it to our class loader val localName = name.split("/").last diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 5105bede24c35..55c23c46be722 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -314,30 +314,46 @@ private[spark] object Utils extends Logging { /** * Copy cached file to targetDir, if not exists, download it from url firstly. + * If useCache == false, download file to targetDir directly. */ def fetchCachedFile(url: String, targetDir: File, conf: SparkConf, securityMgr: SecurityManager, - timestamp: Long) { + timestamp: Long, useCache: Boolean) { val fileName = url.split("/").last - val cachedFileName = fileName + timestamp val targetFile = new File(targetDir, fileName) - val lockFileName = fileName + timestamp + "_lock" - val localDir = new File(getLocalDir(conf)) - val lockFile = new File(localDir, lockFileName) - val raf = new RandomAccessFile(lockFile, "rw") - // Only one executor entry. - // The FileLock is only used to control synchronization for executors download file, - // it's always safe regardless of lock type(mandatory or advisory). - val lock = raf.getChannel().lock() - val cachedFile = new File(localDir, cachedFileName) - try { - if (!cachedFile.exists()) { - fetchFile(url, localDir, conf, securityMgr) - Files.move(new File(localDir, fileName), cachedFile) + if (useCache) { + val cachedFileName = fileName + timestamp + val lockFileName = fileName + timestamp + "_lock" + val localDir = new File(getLocalDir(conf)) + val lockFile = new File(localDir, lockFileName) + val raf = new RandomAccessFile(lockFile, "rw") + // Only one executor entry. + // The FileLock is only used to control synchronization for executors download file, + // it's always safe regardless of lock type(mandatory or advisory). + val lock = raf.getChannel().lock() + val cachedFile = new File(localDir, cachedFileName) + try { + if (!cachedFile.exists()) { + fetchFile(url, localDir, conf, securityMgr) + Files.move(new File(localDir, fileName), cachedFile) + } + } finally { + lock.release() } - } finally { - lock.release() + Files.copy(cachedFile, targetFile) + } else { + fetchFile(url, targetDir, conf, securityMgr) + } + + // Decompress the file if it's a .tar or .tar.gz + if (fileName.endsWith(".tar.gz") || fileName.endsWith(".tgz")) { + logInfo("Untarring " + fileName) + Utils.execute(Seq("tar", "-xzf", fileName), targetDir) + } else if (fileName.endsWith(".tar")) { + logInfo("Untarring " + fileName) + Utils.execute(Seq("tar", "-xf", fileName), targetDir) } - Files.copy(cachedFile, targetFile) + // Make the file executable - That's necessary for scripts + FileUtil.chmod(targetFile.getAbsolutePath, "a+x") } /** @@ -347,7 +363,7 @@ private[spark] object Utils extends Logging { * Throws SparkException if the target file already exists and has different contents than * the requested file. */ - def fetchFile(url: String, targetDir: File, conf: SparkConf, securityMgr: SecurityManager, + private def fetchFile(url: String, targetDir: File, conf: SparkConf, securityMgr: SecurityManager, hadoopConf: Configuration) { val filename = url.split("/").last val tempDir = getLocalDir(conf) @@ -437,16 +453,6 @@ private[spark] object Utils extends Logging { } Files.move(tempFile, targetFile) } - // Decompress the file if it's a .tar or .tar.gz - if (filename.endsWith(".tar.gz") || filename.endsWith(".tgz")) { - logInfo("Untarring " + filename) - Utils.execute(Seq("tar", "-xzf", filename), targetDir) - } else if (filename.endsWith(".tar")) { - logInfo("Untarring " + filename) - Utils.execute(Seq("tar", "-xf", filename), targetDir) - } - // Make the file executable - That's necessary for scripts - FileUtil.chmod(targetFile.getAbsolutePath, "a+x") } /**