Skip to content

Commit

Permalink
[SPARK-12220][CORE] Make Utils.fetchFile support files that contain s…
Browse files Browse the repository at this point in the history
…pecial characters

This PR encodes and decodes the file name to fix the issue.

Author: Shixiong Zhu <shixiong@databricks.com>

Closes #10208 from zsxwing/uri.
  • Loading branch information
zsxwing committed Dec 17, 2015
1 parent 6e07716 commit 86e405f
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 6 deletions.
6 changes: 3 additions & 3 deletions core/src/main/scala/org/apache/spark/HttpFileServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,12 @@ private[spark] class HttpFileServer(

def addFile(file: File) : String = {
addFileToDir(file, fileDir)
serverUri + "/files/" + file.getName
serverUri + "/files/" + Utils.encodeFileNameToURIRawPath(file.getName)
}

def addJar(file: File) : String = {
addFileToDir(file, jarDir)
serverUri + "/jars/" + file.getName
serverUri + "/jars/" + Utils.encodeFileNameToURIRawPath(file.getName)
}

def addDirectory(path: String, resourceBase: String): String = {
Expand All @@ -85,7 +85,7 @@ private[spark] class HttpFileServer(
throw new IllegalArgumentException(s"$file cannot be a directory.")
}
Files.copy(file, new File(dir, file.getName))
dir + "/" + file.getName
dir + "/" + Utils.encodeFileNameToURIRawPath(file.getName)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.util.concurrent.ConcurrentHashMap
import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer}
import org.apache.spark.network.server.StreamManager
import org.apache.spark.rpc.RpcEnvFileServer
import org.apache.spark.util.Utils

/**
* StreamManager implementation for serving files from a NettyRpcEnv.
Expand Down Expand Up @@ -64,13 +65,13 @@ private[netty] class NettyStreamManager(rpcEnv: NettyRpcEnv)
override def addFile(file: File): String = {
require(files.putIfAbsent(file.getName(), file) == null,
s"File ${file.getName()} already registered.")
s"${rpcEnv.address.toSparkURL}/files/${file.getName()}"
s"${rpcEnv.address.toSparkURL}/files/${Utils.encodeFileNameToURIRawPath(file.getName())}"
}

override def addJar(file: File): String = {
require(jars.putIfAbsent(file.getName(), file) == null,
s"JAR ${file.getName()} already registered.")
s"${rpcEnv.address.toSparkURL}/jars/${file.getName()}"
s"${rpcEnv.address.toSparkURL}/jars/${Utils.encodeFileNameToURIRawPath(file.getName())}"
}

override def addDirectory(baseUri: String, path: File): String = {
Expand Down
26 changes: 25 additions & 1 deletion core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,30 @@ private[spark] object Utils extends Logging {
}

/**
* A file name may contain some invalid URI characters, such as " ". This method will convert the
* file name to a raw path accepted by `java.net.URI(String)`.
*
* Note: the file name must not contain "/" or "\"
*/
def encodeFileNameToURIRawPath(fileName: String): String = {
require(!fileName.contains("/") && !fileName.contains("\\"))
// `file` and `localhost` are not used. Just to prevent URI from parsing `fileName` as
// scheme or host. The prefix "/" is required because URI doesn't accept a relative path.
// We should remove it after we get the raw path.
new URI("file", null, "localhost", -1, "/" + fileName, null, null).getRawPath.substring(1)
}

/**
* Get the file name from uri's raw path and decode it. If the raw path of uri ends with "/",
* return the name before the last "/".
*/
def decodeFileNameInURI(uri: URI): String = {
val rawPath = uri.getRawPath
val rawFileName = rawPath.split("/").last
new URI("file:///" + rawFileName).getPath.substring(1)
}

/**
* Download a file or directory to target directory. Supports fetching the file in a variety of
* ways, including HTTP, Hadoop-compatible filesystems, and files on a standard filesystem, based
* on the URL parameter. Fetching directories is only supported from Hadoop-compatible
Expand All @@ -351,7 +375,7 @@ private[spark] object Utils extends Logging {
hadoopConf: Configuration,
timestamp: Long,
useCache: Boolean) {
val fileName = url.split("/").last
val fileName = decodeFileNameInURI(new URI(url))
val targetFile = new File(targetDir, fileName)
val fetchCacheEnabled = conf.getBoolean("spark.files.useFetchCache", defaultValue = true)
if (useCache && fetchCacheEnabled) {
Expand Down
4 changes: 4 additions & 0 deletions core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -771,6 +771,8 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
val tempDir = Utils.createTempDir()
val file = new File(tempDir, "file")
Files.write(UUID.randomUUID().toString(), file, UTF_8)
val fileWithSpecialChars = new File(tempDir, "file name")
Files.write(UUID.randomUUID().toString(), fileWithSpecialChars, UTF_8)
val empty = new File(tempDir, "empty")
Files.write("", empty, UTF_8);
val jar = new File(tempDir, "jar")
Expand All @@ -787,6 +789,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
Files.write(UUID.randomUUID().toString(), subFile2, UTF_8)

val fileUri = env.fileServer.addFile(file)
val fileWithSpecialCharsUri = env.fileServer.addFile(fileWithSpecialChars)
val emptyUri = env.fileServer.addFile(empty)
val jarUri = env.fileServer.addJar(jar)
val dir1Uri = env.fileServer.addDirectory("/dir1", dir1)
Expand All @@ -805,6 +808,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {

val files = Seq(
(file, fileUri),
(fileWithSpecialChars, fileWithSpecialCharsUri),
(empty, emptyUri),
(jar, jarUri),
(subFile1, dir1Uri + "/file1"),
Expand Down
11 changes: 11 additions & 0 deletions core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -734,4 +734,15 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging {
conf.set("spark.executor.instances", "0")) === true)
}

test("encodeFileNameToURIRawPath") {
assert(Utils.encodeFileNameToURIRawPath("abc") === "abc")
assert(Utils.encodeFileNameToURIRawPath("abc xyz") === "abc%20xyz")
assert(Utils.encodeFileNameToURIRawPath("abc:xyz") === "abc:xyz")
}

test("decodeFileNameInURI") {
assert(Utils.decodeFileNameInURI(new URI("files:///abc/xyz")) === "xyz")
assert(Utils.decodeFileNameInURI(new URI("files:///abc")) === "abc")
assert(Utils.decodeFileNameInURI(new URI("files:///abc%20xyz")) === "abc xyz")
}
}

0 comments on commit 86e405f

Please sign in to comment.