Skip to content

Commit

Permalink
[SPARK-6144]When in cluster mode using ADD JAR with a hdfs:// sourced…
Browse files Browse the repository at this point in the history
… jar will fail
  • Loading branch information
trystanleftwich authored and Marcelo Vanzin committed Mar 4, 2015
1 parent f6773ed commit 91733b7
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 4 deletions.
10 changes: 6 additions & 4 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -624,7 +624,8 @@ private[spark] object Utils extends Logging {
case _ =>
val fs = getHadoopFileSystem(uri, hadoopConf)
val path = new Path(uri)
fetchHcfsFile(path, new File(targetDir, path.getName), fs, conf, hadoopConf, fileOverwrite)
fetchHcfsFile(path, targetDir, fs, conf, hadoopConf, fileOverwrite,
Some(filename))
}
}

Expand All @@ -639,8 +640,9 @@ private[spark] object Utils extends Logging {
fs: FileSystem,
conf: SparkConf,
hadoopConf: Configuration,
fileOverwrite: Boolean): Unit = {
if (!targetDir.mkdir()) {
fileOverwrite: Boolean,
filename: Option[String] = None): Unit = {
if (!targetDir.exists() && !targetDir.mkdir()) {
throw new IOException(s"Failed to create directory ${targetDir.getPath}")
}
fs.listStatus(path).foreach { fileStatus =>
Expand All @@ -650,7 +652,7 @@ private[spark] object Utils extends Logging {
fileOverwrite)
} else {
val in = fs.open(innerPath)
val targetFile = new File(targetDir, innerPath.getName)
val targetFile = new File(targetDir, filename.getOrElse(innerPath.getName))
downloadFile(innerPath.toString, in, targetFile, fileOverwrite)
}
}
Expand Down
12 changes: 12 additions & 0 deletions core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,7 @@ class UtilsSuite extends FunSuite with ResetSystemProperties {
val innerTempDir = Utils.createTempDir(tempDir.getPath)
val tempFile = File.createTempFile("someprefix", "somesuffix", innerTempDir)
val targetDir = new File("target-dir")
val testFileDir = new File("test-filename")
Files.write("some text", tempFile, UTF_8)

try {
Expand All @@ -399,6 +400,8 @@ class UtilsSuite extends FunSuite with ResetSystemProperties {
Utils.fetchHcfsFile(path, targetDir, fs, new SparkConf(), conf, false)
assert(targetDir.exists())
assert(targetDir.isDirectory())
// Testing to make sure it doesn't error if the dir already exists
Utils.fetchHcfsFile(path, targetDir, fs, new SparkConf(), conf, false)
val newInnerDir = new File(targetDir, innerTempDir.getName)
println("inner temp dir: " + innerTempDir.getName)
targetDir.listFiles().map(_.getName).foreach(println)
Expand All @@ -407,9 +410,18 @@ class UtilsSuite extends FunSuite with ResetSystemProperties {
val newInnerFile = new File(newInnerDir, tempFile.getName)
assert(newInnerFile.exists())
assert(newInnerFile.isFile())
val filePath = new Path("file://" + tempFile.getAbsolutePath)
val testFileName = "testFName"
val testFilefs = Utils.getHadoopFileSystem(filePath.toString, conf)
Utils.fetchHcfsFile(filePath, testFileDir, testFilefs, new SparkConf(),
conf, false, Some(testFileName))
val newFileName = new File(testFileDir, testFileName)
assert(newFileName.exists())
assert(newFileName.isFile())
} finally {
Utils.deleteRecursively(tempDir)
Utils.deleteRecursively(targetDir)
Utils.deleteRecursively(testFileDir)
}
}
}

0 comments on commit 91733b7

Please sign in to comment.