Skip to content

Commit

Permalink
Executors of same application in same host should only download files…
Browse files Browse the repository at this point in the history
… & jars once
  • Loading branch information
li-zhihui committed Sep 4, 2014
1 parent 1bed0a3 commit 6b997bf
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 5 deletions.
9 changes: 4 additions & 5 deletions core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -322,14 +322,13 @@ private[spark] class Executor(
// Fetch missing dependencies
for ((name, timestamp) <- newFiles if currentFiles.getOrElse(name, -1L) < timestamp) {
logInfo("Fetching " + name + " with timestamp " + timestamp)
Utils.fetchFile(name, new File(SparkFiles.getRootDirectory), conf, env.securityManager,
hadoopConf)
currentFiles(name) = timestamp
Utils.fetchCachedFile(name, new File(SparkFiles.getRootDirectory), conf,
env.securityManager, hadoopConf, timestamp)
}
for ((name, timestamp) <- newJars if currentJars.getOrElse(name, -1L) < timestamp) {
logInfo("Fetching " + name + " with timestamp " + timestamp)
Utils.fetchFile(name, new File(SparkFiles.getRootDirectory), conf, env.securityManager,
hadoopConf)
Utils.fetchCachedFile(name, new File(SparkFiles.getRootDirectory), conf,
env.securityManager, hadoopConf, timestamp)
currentJars(name) = timestamp
// Add it to our class loader
val localName = name.split("/").last
Expand Down
22 changes: 22 additions & 0 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,28 @@ private[spark] object Utils extends Logging {
uri.getQuery(), uri.getFragment())
}

/**
* Copy cached file to targetDir, if not exists, download it from url.
*/
def fetchCachedFile(url: String, targetDir: File, conf: SparkConf, securityMgr: SecurityManager,
timestamp: Long) {
val fileName = url.split("/").last
val cachedFileName = fileName + timestamp
val targetFile = new File(targetDir, fileName)
val lockFileName = fileName + timestamp + "_lock"
val localDir = new File(getLocalDir(conf))
val lockFile = new File(localDir, lockFileName)
val raf = new RandomAccessFile(lockFile, "rw")
val lock = raf.getChannel().lock() // only one executor entry
val cachedFile = new File(localDir, cachedFileName)
if (!cachedFile.exists()) {
fetchFile(url, localDir, conf, securityMgr)
Files.move(new File(localDir, fileName), cachedFile)
}
Files.copy(cachedFile, targetFile)
lock.release()
}

/**
* Download a file requested by the executor. Supports fetching the file in a variety of ways,
* including HTTP, HDFS and files on a standard filesystem, based on the URL parameter.
Expand Down

0 comments on commit 6b997bf

Please sign in to comment.