Skip to content

Commit

Permalink
Allow adding arbitrary files (apache#71)
Browse files Browse the repository at this point in the history
* Allow adding arbitrary files

* Address comments and add documentation
  • Loading branch information
mccheah authored and foxish committed Jul 24, 2017
1 parent 261a624 commit ab731f1
Show file tree
Hide file tree
Showing 12 changed files with 243 additions and 32 deletions.
2 changes: 2 additions & 0 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,8 @@ object SparkSubmit extends CommandLineUtils {
sysProp = "spark.kubernetes.namespace"),
OptionAssigner(args.kubernetesUploadJars, KUBERNETES, CLUSTER,
sysProp = "spark.kubernetes.driver.uploads.jars"),
OptionAssigner(args.kubernetesUploadFiles, KUBERNETES, CLUSTER,
sysProp = "spark.kubernetes.driver.uploads.files"),

// Other options
OptionAssigner(args.executorCores, STANDALONE | YARN, ALL_DEPLOY_MODES,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
// Kubernetes only
var kubernetesNamespace: String = null
var kubernetesUploadJars: String = null
var kubernetesUploadFiles: String = null

// Standalone cluster mode only
var supervise: Boolean = false
Expand Down Expand Up @@ -203,6 +204,9 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
kubernetesUploadJars = Option(kubernetesUploadJars)
.orElse(sparkProperties.get("spark.kubernetes.driver.uploads.jars"))
.orNull
kubernetesUploadFiles = Option(kubernetesUploadFiles)
.orElse(sparkProperties.get("spark.kubernetes.driver.uploads.files"))
.orNull

// Try to set main class from JAR if no --class argument is given
if (mainClass == null && !isPython && !isR && primaryResource != null) {
Expand Down Expand Up @@ -447,6 +451,9 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
case KUBERNETES_UPLOAD_JARS =>
kubernetesUploadJars = value

case KUBERNETES_UPLOAD_FILES =>
kubernetesUploadFiles = value

case HELP =>
printUsageAndExit(0)

Expand Down
12 changes: 11 additions & 1 deletion docs/running-on-kubernetes.md
Original file line number Diff line number Diff line change
Expand Up @@ -217,10 +217,20 @@ from the other deployment modes. See the [configuration page](configuration.html
<td><code>spark.kubernetes.driver.uploads.jars</code></td>
<td>(none)</td>
<td>
Comma-separated list of jars to sent to the driver and all executors when submitting the application in cluster
Comma-separated list of jars to send to the driver and all executors when submitting the application in cluster
mode. Refer to <a href="running-on-kubernetes.html#adding-other-jars">adding other jars</a> for more information.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.driver.uploads.files</code></td>
<td>(none)</td>
<td>
Comma-separated list of files to send to the driver and all executors when submitting the application in cluster
mode. The files are added in a flat hierarchy to the current working directory of the driver, having the same
names as the names of the original files. Note that two files with the same name cannot be added, even if they
were in different source directories on the client disk.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.executor.memoryOverhead</code></td>
<td>executorMemory * 0.10, with minimum of 384 </td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ class SparkSubmitOptionParser {
protected final String KUBERNETES_MASTER = "--kubernetes-master";
protected final String KUBERNETES_NAMESPACE = "--kubernetes-namespace";
protected final String KUBERNETES_UPLOAD_JARS = "--upload-jars";
protected final String KUBERNETES_UPLOAD_FILES = "--upload-files";

/**
* This is the canonical list of spark-submit options. Each entry in the array contains the
Expand Down Expand Up @@ -122,7 +123,8 @@ class SparkSubmitOptionParser {
{ TOTAL_EXECUTOR_CORES },
{ KUBERNETES_MASTER },
{ KUBERNETES_NAMESPACE },
{ KUBERNETES_UPLOAD_JARS }
{ KUBERNETES_UPLOAD_JARS },
{ KUBERNETES_UPLOAD_FILES }
};

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,9 @@ private[spark] class Client(
private val sslSecretsDirectory = s"$DRIVER_CONTAINER_SECRETS_BASE_DIR/$kubernetesAppId-ssl"
private val sslSecretsName = s"$SUBMISSION_SSL_SECRETS_PREFIX-$kubernetesAppId"
private val driverDockerImage = sparkConf.get(DRIVER_DOCKER_IMAGE)
private val uploadedJars = sparkConf.get(KUBERNETES_DRIVER_UPLOAD_JARS)
private val uploadedJars = sparkConf.get(KUBERNETES_DRIVER_UPLOAD_JARS).filter(_.nonEmpty)
private val uploadedFiles = sparkConf.get(KUBERNETES_DRIVER_UPLOAD_FILES).filter(_.nonEmpty)
uploadedFiles.foreach(validateNoDuplicateUploadFileNames)
private val uiPort = sparkConf.getInt("spark.ui.port", DEFAULT_UI_PORT)
private val driverSubmitTimeoutSecs = sparkConf.get(KUBERNETES_DRIVER_SUBMIT_TIMEOUT)

Expand Down Expand Up @@ -513,18 +515,40 @@ private[spark] class Client(
case "container" => ContainerAppResource(appResourceUri.getPath)
case other => RemoteAppResource(other)
}

val uploadJarsBase64Contents = compressJars(uploadedJars)
val uploadJarsBase64Contents = compressFiles(uploadedJars)
val uploadFilesBase64Contents = compressFiles(uploadedFiles)
KubernetesCreateSubmissionRequest(
appResource = resolvedAppResource,
mainClass = mainClass,
appArgs = appArgs,
secret = secretBase64String,
sparkProperties = sparkConf.getAll.toMap,
uploadedJarsBase64Contents = uploadJarsBase64Contents)
uploadedJarsBase64Contents = uploadJarsBase64Contents,
uploadedFilesBase64Contents = uploadFilesBase64Contents)
}

// Because uploaded files should be added to the working directory of the driver, they
// need to not have duplicate file names. They are added to the working directory so the
// user can reliably locate them in their application. This is similar in principle to how
// YARN handles its `spark.files` setting.
private def validateNoDuplicateUploadFileNames(uploadedFilesCommaSeparated: String): Unit = {
val pathsWithDuplicateNames = uploadedFilesCommaSeparated
.split(",")
.groupBy(new File(_).getName)
.filter(_._2.length > 1)
if (pathsWithDuplicateNames.nonEmpty) {
val pathsWithDuplicateNamesSorted = pathsWithDuplicateNames
.values
.flatten
.toList
.sortBy(new File(_).getName)
throw new SparkException("Cannot upload files with duplicate names via" +
s" ${KUBERNETES_DRIVER_UPLOAD_FILES.key}. The following paths have a duplicated" +
s" file name: ${pathsWithDuplicateNamesSorted.mkString(",")}")
}
}

private def compressJars(maybeFilePaths: Option[String]): Option[TarGzippedData] = {
private def compressFiles(maybeFilePaths: Option[String]): Option[TarGzippedData] = {
maybeFilePaths
.map(_.split(","))
.map(CompressionUtils.createTarGzip(_))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,13 +94,27 @@ package object config {
private[spark] val KUBERNETES_DRIVER_UPLOAD_JARS =
ConfigBuilder("spark.kubernetes.driver.uploads.jars")
.doc("""
| Comma-separated list of jars to sent to the driver and
| Comma-separated list of jars to send to the driver and
| all executors when submitting the application in cluster
| mode.
""".stripMargin)
.stringConf
.createOptional

private[spark] val KUBERNETES_DRIVER_UPLOAD_FILES =
ConfigBuilder("spark.kubernetes.driver.uploads.files")
.doc("""
| Comma-separated list of files to send to the driver and
| all executors when submitting the application in cluster
| mode. The files are added in a flat hierarchy to the
| current working directory of the driver, having the same
| names as the names of the original files. Note that two
| files with the same name cannot be added, even if they
| were in different source directories on the client disk.
""".stripMargin)
.stringConf
.createOptional

// Note that while we set a default for this when we start up the
// scheduler, the specific default value is dynamically determined
// based on the executor memory.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ case class KubernetesCreateSubmissionRequest(
appArgs: Array[String],
sparkProperties: Map[String, String],
secret: String,
uploadedJarsBase64Contents: Option[TarGzippedData]) extends SubmitRestProtocolRequest {
uploadedJarsBase64Contents: Option[TarGzippedData],
uploadedFilesBase64Contents: Option[TarGzippedData]) extends SubmitRestProtocolRequest {
message = "create"
clientSparkVersion = SPARK_VERSION
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ private[spark] object CompressionUtils extends Logging {
while (usedFileNames.contains(resolvedFileName)) {
val oldResolvedFileName = resolvedFileName
resolvedFileName = s"$nameWithoutExtension-$deduplicationCounter.$extension"
logWarning(s"File with name $oldResolvedFileName already exists. Trying to add with" +
s" file name $resolvedFileName instead.")
logWarning(s"File with name $oldResolvedFileName already exists. Trying to add" +
s" with file name $resolvedFileName instead.")
deduplicationCounter += 1
}
usedFileNames += resolvedFileName
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package org.apache.spark.deploy.rest.kubernetes

import java.io.File
import java.net.URI
import java.nio.file.Paths
import java.util.concurrent.CountDownLatch
import javax.servlet.http.{HttpServletRequest, HttpServletResponse}

Expand All @@ -27,7 +28,7 @@ import org.apache.commons.codec.binary.Base64
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

import org.apache.spark.{SecurityManager, SPARK_VERSION => sparkVersion, SparkConf, SparkException, SSLOptions}
import org.apache.spark.{SecurityManager, SPARK_VERSION => sparkVersion, SparkConf, SSLOptions}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.deploy.rest._
import org.apache.spark.util.{ShutdownHookManager, ThreadUtils, Utils}
Expand Down Expand Up @@ -149,37 +150,42 @@ private[spark] class KubernetesSparkRestServer(
appArgs,
sparkProperties,
secret,
uploadedJars) =>
uploadedJars,
uploadedFiles) =>
val decodedSecret = Base64.decodeBase64(secret)
if (!expectedApplicationSecret.sameElements(decodedSecret)) {
responseServlet.setStatus(HttpServletResponse.SC_UNAUTHORIZED)
handleError("Unauthorized to submit application.")
} else {
val tempDir = Utils.createTempDir()
val appResourcePath = resolvedAppResource(appResource, tempDir)
val jarsDirectory = new File(tempDir, "jars")
if (!jarsDirectory.mkdir) {
throw new IllegalStateException("Failed to create jars dir at" +
s"${jarsDirectory.getAbsolutePath}")
}
val writtenJars = writeBase64ContentsToFiles(uploadedJars, jarsDirectory)
val driverExtraClasspath = sparkProperties
.get("spark.driver.extraClassPath")
.map(_.split(","))
.getOrElse(Array.empty[String])
val writtenJars = writeUploadedJars(uploadedJars, tempDir)
val writtenFiles = writeUploadedFiles(uploadedFiles)
val resolvedSparkProperties = new mutable.HashMap[String, String]
resolvedSparkProperties ++= sparkProperties

// Resolve driver classpath and jars
val originalJars = sparkProperties.get("spark.jars")
.map(_.split(","))
.getOrElse(Array.empty[String])
val resolvedJars = writtenJars ++ originalJars ++ Array(appResourcePath)
val sparkJars = new File(sparkHome, "jars").listFiles().map(_.getAbsolutePath)
val driverExtraClasspath = sparkProperties
.get("spark.driver.extraClassPath")
.map(_.split(","))
.getOrElse(Array.empty[String])
val driverClasspath = driverExtraClasspath ++
resolvedJars ++
sparkJars ++
Array(appResourcePath)
val resolvedSparkProperties = new mutable.HashMap[String, String]
resolvedSparkProperties ++= sparkProperties
sparkJars
resolvedSparkProperties("spark.jars") = resolvedJars.mkString(",")

// Resolve spark.files
val originalFiles = sparkProperties.get("spark.files")
.map(_.split(","))
.getOrElse(Array.empty[String])
val resolvedFiles = originalFiles ++ writtenFiles
resolvedSparkProperties("spark.files") = resolvedFiles.mkString(",")

val command = new ArrayBuffer[String]
command += javaExecutable
command += "-cp"
Expand Down Expand Up @@ -229,6 +235,21 @@ private[spark] class KubernetesSparkRestServer(
}
}

private def writeUploadedJars(files: Option[TarGzippedData], rootTempDir: File):
Seq[String] = {
val resolvedDirectory = new File(rootTempDir, "jars")
if (!resolvedDirectory.mkdir()) {
throw new IllegalStateException(s"Failed to create jars dir at " +
resolvedDirectory.getAbsolutePath)
}
writeBase64ContentsToFiles(files, resolvedDirectory)
}

private def writeUploadedFiles(files: Option[TarGzippedData]): Seq[String] = {
val workingDir = Paths.get("").toFile.getAbsoluteFile
writeBase64ContentsToFiles(files, workingDir)
}

def resolvedAppResource(appResource: AppResource, tempDir: File): String = {
val appResourcePath = appResource match {
case UploadedAppResource(resourceContentsBase64, resourceName) =>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.kubernetes.integrationtest.jobs

import java.nio.file.Paths

import com.google.common.base.Charsets
import com.google.common.io.Files

import org.apache.spark.SparkException
import org.apache.spark.sql.SparkSession

private[spark] object FileExistenceTest {

def main(args: Array[String]): Unit = {
if (args.length < 2) {
throw new IllegalArgumentException("Usage: WordCount <source-file> <expected contents>")
}
// Can't use SparkContext.textFile since the file is local to the driver
val file = Paths.get(args(0)).toFile
if (!file.exists()) {
throw new SparkException(s"Failed to find file at ${file.getAbsolutePath}")
} else {
// scalastyle:off println
val contents = Files.toString(file, Charsets.UTF_8)
if (args(1) != contents) {
throw new SparkException(s"Contents do not match. Expected: ${args(1)}," +
s" actual, $contents")
} else {
println(s"File found at ${file.getAbsolutePath} with correct contents.")
}
// scalastyle:on println
}
val spark = SparkSession.builder()
.appName("Test")
.getOrCreate()
spark.stop()
}

}
Loading

0 comments on commit ab731f1

Please sign in to comment.