Skip to content

Commit

Permalink
Keep fetchFile private
Browse files Browse the repository at this point in the history
  • Loading branch information
li-zhihui committed Sep 4, 2014
1 parent 2ffd742 commit 3510eb0
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 34 deletions.
7 changes: 4 additions & 3 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -805,11 +805,12 @@ class SparkContext(config: SparkConf) extends Logging {
case "local" => "file:" + uri.getPath
case _ => path
}
addedFiles(key) = System.currentTimeMillis
val timestamp = System.currentTimeMillis
addedFiles(key) = timestamp

// Fetch the file locally in case a job is executed using DAGScheduler.runLocally().
Utils.fetchFile(path, new File(SparkFiles.getRootDirectory()), conf, env.securityManager,
hadoopConfiguration)
Utils.fetchCachedFile(path, new File(SparkFiles.getRootDirectory()), conf, env.securityManager,
timestamp, hadoopConfiguration, false)

logInfo("Added file " + path + " at " + key + " with timestamp " + addedFiles(key))
postEnvironmentUpdate()
Expand Down
5 changes: 3 additions & 2 deletions core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -323,12 +323,13 @@ private[spark] class Executor(
for ((name, timestamp) <- newFiles if currentFiles.getOrElse(name, -1L) < timestamp) {
logInfo("Fetching " + name + " with timestamp " + timestamp)
Utils.fetchCachedFile(name, new File(SparkFiles.getRootDirectory), conf,
env.securityManager, hadoopConf, timestamp)
env.securityManager, hadoopConf, timestamp, true)
currentFiles(name) = timestamp
}
for ((name, timestamp) <- newJars if currentJars.getOrElse(name, -1L) < timestamp) {
logInfo("Fetching " + name + " with timestamp " + timestamp)
Utils.fetchCachedFile(name, new File(SparkFiles.getRootDirectory), conf,
env.securityManager, hadoopConf, timestamp)
env.securityManager, hadoopConf, timestamp, true)
currentJars(name) = timestamp
// Add it to our class loader
val localName = name.split("/").last
Expand Down
64 changes: 35 additions & 29 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -314,30 +314,46 @@ private[spark] object Utils extends Logging {

/**
* Copy cached file to targetDir, if not exists, download it from url firstly.
* If useCache == false, download file to targetDir directly.
*/
def fetchCachedFile(url: String, targetDir: File, conf: SparkConf, securityMgr: SecurityManager,
timestamp: Long) {
timestamp: Long, useCache: Boolean) {
val fileName = url.split("/").last
val cachedFileName = fileName + timestamp
val targetFile = new File(targetDir, fileName)
val lockFileName = fileName + timestamp + "_lock"
val localDir = new File(getLocalDir(conf))
val lockFile = new File(localDir, lockFileName)
val raf = new RandomAccessFile(lockFile, "rw")
// Only one executor entry.
// The FileLock is only used to control synchronization for executors download file,
// it's always safe regardless of lock type(mandatory or advisory).
val lock = raf.getChannel().lock()
val cachedFile = new File(localDir, cachedFileName)
try {
if (!cachedFile.exists()) {
fetchFile(url, localDir, conf, securityMgr)
Files.move(new File(localDir, fileName), cachedFile)
if (useCache) {
val cachedFileName = fileName + timestamp
val lockFileName = fileName + timestamp + "_lock"
val localDir = new File(getLocalDir(conf))
val lockFile = new File(localDir, lockFileName)
val raf = new RandomAccessFile(lockFile, "rw")
// Only one executor entry.
// The FileLock is only used to control synchronization for executors download file,
// it's always safe regardless of lock type(mandatory or advisory).
val lock = raf.getChannel().lock()
val cachedFile = new File(localDir, cachedFileName)
try {
if (!cachedFile.exists()) {
fetchFile(url, localDir, conf, securityMgr)
Files.move(new File(localDir, fileName), cachedFile)
}
} finally {
lock.release()
}
} finally {
lock.release()
Files.copy(cachedFile, targetFile)
} else {
fetchFile(url, targetDir, conf, securityMgr)
}

// Decompress the file if it's a .tar or .tar.gz
if (fileName.endsWith(".tar.gz") || fileName.endsWith(".tgz")) {
logInfo("Untarring " + fileName)
Utils.execute(Seq("tar", "-xzf", fileName), targetDir)
} else if (fileName.endsWith(".tar")) {
logInfo("Untarring " + fileName)
Utils.execute(Seq("tar", "-xf", fileName), targetDir)
}
Files.copy(cachedFile, targetFile)
// Make the file executable - That's necessary for scripts
FileUtil.chmod(targetFile.getAbsolutePath, "a+x")
}

/**
Expand All @@ -347,7 +363,7 @@ private[spark] object Utils extends Logging {
* Throws SparkException if the target file already exists and has different contents than
* the requested file.
*/
def fetchFile(url: String, targetDir: File, conf: SparkConf, securityMgr: SecurityManager,
private def fetchFile(url: String, targetDir: File, conf: SparkConf, securityMgr: SecurityManager,
hadoopConf: Configuration) {
val filename = url.split("/").last
val tempDir = getLocalDir(conf)
Expand Down Expand Up @@ -437,16 +453,6 @@ private[spark] object Utils extends Logging {
}
Files.move(tempFile, targetFile)
}
// Decompress the file if it's a .tar or .tar.gz
if (filename.endsWith(".tar.gz") || filename.endsWith(".tgz")) {
logInfo("Untarring " + filename)
Utils.execute(Seq("tar", "-xzf", filename), targetDir)
} else if (filename.endsWith(".tar")) {
logInfo("Untarring " + filename)
Utils.execute(Seq("tar", "-xf", filename), targetDir)
}
// Make the file executable - That's necessary for scripts
FileUtil.chmod(targetFile.getAbsolutePath, "a+x")
}

/**
Expand Down

0 comments on commit 3510eb0

Please sign in to comment.