diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index dd903dc65d204..da1121879c780 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -322,14 +322,13 @@ private[spark] class Executor( // Fetch missing dependencies for ((name, timestamp) <- newFiles if currentFiles.getOrElse(name, -1L) < timestamp) { logInfo("Fetching " + name + " with timestamp " + timestamp) - Utils.fetchFile(name, new File(SparkFiles.getRootDirectory), conf, env.securityManager, - hadoopConf) - currentFiles(name) = timestamp + Utils.fetchCachedFile(name, new File(SparkFiles.getRootDirectory), conf, + env.securityManager, hadoopConf, timestamp) } for ((name, timestamp) <- newJars if currentJars.getOrElse(name, -1L) < timestamp) { logInfo("Fetching " + name + " with timestamp " + timestamp) - Utils.fetchFile(name, new File(SparkFiles.getRootDirectory), conf, env.securityManager, - hadoopConf) + Utils.fetchCachedFile(name, new File(SparkFiles.getRootDirectory), conf, + env.securityManager, hadoopConf, timestamp) currentJars(name) = timestamp // Add it to our class loader val localName = name.split("/").last diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 0ae28f911e302..8c321300faa88 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -312,6 +312,28 @@ private[spark] object Utils extends Logging { uri.getQuery(), uri.getFragment()) } + /** + * Copy cached file to targetDir, if not exists, download it from url. + */ + def fetchCachedFile(url: String, targetDir: File, conf: SparkConf, securityMgr: SecurityManager, + timestamp: Long) { + val fileName = url.split("/").last + val cachedFileName = fileName + timestamp + val targetFile = new File(targetDir, fileName) + val lockFileName = fileName + timestamp + "_lock" + val localDir = new File(getLocalDir(conf)) + val lockFile = new File(localDir, lockFileName) + val raf = new RandomAccessFile(lockFile, "rw") + val lock = raf.getChannel().lock() // only one executor entry + val cachedFile = new File(localDir, cachedFileName) + if (!cachedFile.exists()) { + fetchFile(url, localDir, conf, securityMgr) + Files.move(new File(localDir, fileName), cachedFile) + } + Files.copy(cachedFile, targetFile) + lock.release() + } + /** * Download a file requested by the executor. Supports fetching the file in a variety of ways, * including HTTP, HDFS and files on a standard filesystem, based on the URL parameter.