diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala index bef5a605f173b..433c45d51fd6b 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala @@ -18,7 +18,7 @@ package org.apache.spark.deploy.kubernetes import java.io.{File, FileInputStream} import java.security.{KeyStore, SecureRandom} -import java.util.concurrent.{TimeoutException, TimeUnit} +import java.util.concurrent.{CountDownLatch, TimeoutException, TimeUnit} import java.util.concurrent.atomic.AtomicBoolean import javax.net.ssl.{SSLContext, TrustManagerFactory, X509TrustManager} @@ -26,7 +26,7 @@ import com.google.common.base.Charsets import com.google.common.io.Files import com.google.common.util.concurrent.SettableFuture import io.fabric8.kubernetes.api.model._ -import io.fabric8.kubernetes.client.{ConfigBuilder, DefaultKubernetesClient, KubernetesClient, KubernetesClientException, Watcher} +import io.fabric8.kubernetes.client.{ConfigBuilder => K8SConfigBuilder, DefaultKubernetesClient, KubernetesClient, KubernetesClientException, Watcher} import io.fabric8.kubernetes.client.Watcher.Action import org.apache.commons.codec.binary.Base64 import scala.collection.JavaConverters._ @@ -67,6 +67,8 @@ private[spark] class Client( private val uiPort = sparkConf.getInt("spark.ui.port", DEFAULT_UI_PORT) private val driverSubmitTimeoutSecs = sparkConf.get(KUBERNETES_DRIVER_SUBMIT_TIMEOUT) + private val waitForAppCompletion: Boolean = sparkConf.get(WAIT_FOR_APP_COMPLETION) + private val secretBase64String = { val secretBytes = new Array[Byte](128) SECURE_RANDOM.nextBytes(secretBytes) @@ -81,9 +83,11 @@ private[spark] class Client( ThreadUtils.newDaemonSingleThreadExecutor("kubernetes-client-retryable-futures")) def run(): Unit = { + logInfo(s"Starting application $kubernetesAppId in Kubernetes...") val (driverSubmitSslOptions, isKeyStoreLocalFile) = parseDriverSubmitSslOptions() + val parsedCustomLabels = parseCustomLabels(customLabels) - var k8ConfBuilder = new ConfigBuilder() + var k8ConfBuilder = new K8SConfigBuilder() .withApiVersion("v1") .withMasterUrl(master) .withNamespace(namespace) @@ -116,73 +120,97 @@ private[spark] class Client( SPARK_APP_NAME_LABEL -> appName) ++ parsedCustomLabels).asJava val containerPorts = buildContainerPorts() - val submitCompletedFuture = SettableFuture.create[Boolean] - val submitPending = new AtomicBoolean(false) - val podWatcher = new DriverPodWatcher( - submitCompletedFuture, - submitPending, - kubernetesClient, - driverSubmitSslOptions, - Array(submitServerSecret) ++ sslSecrets, - driverKubernetesSelectors) + + // start outer watch for status logging of driver pod + val driverPodCompletedLatch = new CountDownLatch(1) + // only enable interval logging if in waitForAppCompletion mode + val loggingInterval = if (waitForAppCompletion) sparkConf.get(REPORT_INTERVAL) else 0 + val loggingWatch = new LoggingPodStatusWatcher(driverPodCompletedLatch, kubernetesAppId, + loggingInterval) Utils.tryWithResource(kubernetesClient .pods() .withLabels(driverKubernetesSelectors) - .watch(podWatcher)) { _ => - kubernetesClient.pods().createNew() - .withNewMetadata() - .withName(kubernetesAppId) + .watch(loggingWatch)) { _ => + + // launch driver pod with inner watch to upload jars when it's ready + val submitCompletedFuture = SettableFuture.create[Boolean] + val submitPending = new AtomicBoolean(false) + val podWatcher = new DriverPodWatcher( + submitCompletedFuture, + submitPending, + kubernetesClient, + driverSubmitSslOptions, + Array(submitServerSecret) ++ sslSecrets, + driverKubernetesSelectors) + Utils.tryWithResource(kubernetesClient + .pods() .withLabels(driverKubernetesSelectors) - .endMetadata() - .withNewSpec() - .withRestartPolicy("OnFailure") - .addNewVolume() - .withName(SUBMISSION_APP_SECRET_VOLUME_NAME) - .withNewSecret() - .withSecretName(submitServerSecret.getMetadata.getName) - .endSecret() - .endVolume - .addToVolumes(sslVolumes: _*) - .withServiceAccount(serviceAccount) - .addNewContainer() - .withName(DRIVER_CONTAINER_NAME) - .withImage(driverDockerImage) - .withImagePullPolicy("IfNotPresent") - .addNewVolumeMount() + .watch(podWatcher)) { _ => + kubernetesClient.pods().createNew() + .withNewMetadata() + .withName(kubernetesAppId) + .withLabels(driverKubernetesSelectors) + .endMetadata() + .withNewSpec() + .withRestartPolicy("OnFailure") + .addNewVolume() .withName(SUBMISSION_APP_SECRET_VOLUME_NAME) - .withMountPath(secretDirectory) - .withReadOnly(true) - .endVolumeMount() - .addToVolumeMounts(sslVolumeMounts: _*) - .addNewEnv() - .withName(ENV_SUBMISSION_SECRET_LOCATION) - .withValue(s"$secretDirectory/$SUBMISSION_APP_SECRET_NAME") - .endEnv() - .addNewEnv() - .withName(ENV_SUBMISSION_SERVER_PORT) - .withValue(SUBMISSION_SERVER_PORT.toString) - .endEnv() - .addToEnv(sslEnvs: _*) - .withPorts(containerPorts.asJava) - .endContainer() - .endSpec() - .done() - var submitSucceeded = false - try { - submitCompletedFuture.get(driverSubmitTimeoutSecs, TimeUnit.SECONDS) - submitSucceeded = true - } catch { - case e: TimeoutException => - val finalErrorMessage: String = buildSubmitFailedErrorMessage(kubernetesClient, e) - logError(finalErrorMessage, e) - throw new SparkException(finalErrorMessage, e) - } finally { - if (!submitSucceeded) { - Utils.tryLogNonFatalError { - kubernetesClient.pods.withName(kubernetesAppId).delete() + .withNewSecret() + .withSecretName(submitServerSecret.getMetadata.getName) + .endSecret() + .endVolume + .addToVolumes(sslVolumes: _*) + .withServiceAccount(serviceAccount) + .addNewContainer() + .withName(DRIVER_CONTAINER_NAME) + .withImage(driverDockerImage) + .withImagePullPolicy("IfNotPresent") + .addNewVolumeMount() + .withName(SUBMISSION_APP_SECRET_VOLUME_NAME) + .withMountPath(secretDirectory) + .withReadOnly(true) + .endVolumeMount() + .addToVolumeMounts(sslVolumeMounts: _*) + .addNewEnv() + .withName(ENV_SUBMISSION_SECRET_LOCATION) + .withValue(s"$secretDirectory/$SUBMISSION_APP_SECRET_NAME") + .endEnv() + .addNewEnv() + .withName(ENV_SUBMISSION_SERVER_PORT) + .withValue(SUBMISSION_SERVER_PORT.toString) + .endEnv() + .addToEnv(sslEnvs: _*) + .withPorts(containerPorts.asJava) + .endContainer() + .endSpec() + .done() + var submitSucceeded = false + try { + submitCompletedFuture.get(driverSubmitTimeoutSecs, TimeUnit.SECONDS) + submitSucceeded = true + logInfo(s"Finished launching local resources to application $kubernetesAppId") + } catch { + case e: TimeoutException => + val finalErrorMessage: String = buildSubmitFailedErrorMessage(kubernetesClient, e) + logError(finalErrorMessage, e) + throw new SparkException(finalErrorMessage, e) + } finally { + if (!submitSucceeded) { + Utils.tryLogNonFatalError { + kubernetesClient.pods.withName(kubernetesAppId).delete() + } } } } + + // wait if configured to do so + if (waitForAppCompletion) { + logInfo(s"Waiting for application $kubernetesAppId to finish...") + driverPodCompletedLatch.await() + logInfo(s"Application $kubernetesAppId finished.") + } else { + logInfo(s"Application $kubernetesAppId successfully launched.") + } } } finally { Utils.tryLogNonFatalError { @@ -377,6 +405,8 @@ private[spark] class Client( Future { sparkConf.set("spark.driver.host", pod.getStatus.getPodIP) val submitRequest = buildSubmissionRequest() + logInfo(s"Submitting local resources to driver pod for application " + + s"$kubernetesAppId ...") driverSubmitter.submitApplication(submitRequest) } } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/LoggingPodStatusWatcher.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/LoggingPodStatusWatcher.scala new file mode 100644 index 0000000000000..cbacaf6bda854 --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/LoggingPodStatusWatcher.scala @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.kubernetes + +import java.util.concurrent.{CountDownLatch, Executors, TimeUnit} + +import scala.collection.JavaConverters._ + +import io.fabric8.kubernetes.api.model.Pod +import io.fabric8.kubernetes.client.{KubernetesClientException, Watcher} +import io.fabric8.kubernetes.client.Watcher.Action + +import org.apache.spark.internal.Logging + +/** + * A monitor for the running Kubernetes pod of a Spark application. Status logging occurs on + * every state change and also at an interval for liveness. + * + * @param podCompletedFuture a CountDownLatch that is set to true when the watched pod finishes + * @param appId + * @param interval ms between each state request. If set to 0 or a negative number, the periodic + * logging will be disabled. + */ +private[kubernetes] class LoggingPodStatusWatcher(podCompletedFuture: CountDownLatch, + appId: String, + interval: Long) + extends Watcher[Pod] with Logging { + + // start timer for periodic logging + private val scheduler = Executors.newScheduledThreadPool(1) + private val logRunnable: Runnable = new Runnable { + override def run() = logShortStatus() + } + if (interval > 0) { + scheduler.scheduleWithFixedDelay(logRunnable, 0, interval, TimeUnit.MILLISECONDS) + } + + private var pod: Option[Pod] = Option.empty + private var prevPhase: String = null + private def phase: String = pod.map(_.getStatus().getPhase()).getOrElse("unknown") + + override def eventReceived(action: Action, pod: Pod): Unit = { + this.pod = Option(pod) + + logShortStatus() + if (prevPhase != phase) { + logLongStatus() + } + prevPhase = phase + + if (phase == "Succeeded" || phase == "Failed") { + podCompletedFuture.countDown() + } + } + + override def onClose(e: KubernetesClientException): Unit = { + scheduler.shutdown() + logDebug(s"Stopped watching application $appId with last-observed phase $phase") + } + + private def logShortStatus() = { + logInfo(s"Application status for $appId (phase: $phase)") + } + + private def logLongStatus() = { + logInfo("Phase changed, new state: " + pod.map(formatPodState(_)).getOrElse("unknown")) + } + + private def formatPodState(pod: Pod): String = { + + val details = Seq[(String, String)]( + // pod metadata + ("pod name", pod.getMetadata.getName()), + ("namespace", pod.getMetadata.getNamespace()), + ("labels", pod.getMetadata.getLabels().asScala.mkString(", ")), + ("pod uid", pod.getMetadata.getUid), + ("creation time", pod.getMetadata.getCreationTimestamp()), + + // spec details + ("service account name", pod.getSpec.getServiceAccountName()), + ("volumes", pod.getSpec.getVolumes().asScala.map(_.getName).mkString(", ")), + ("node name", pod.getSpec.getNodeName()), + + // status + ("start time", pod.getStatus.getStartTime), + ("container images", + pod.getStatus.getContainerStatuses() + .asScala + .map(_.getImage) + .mkString(", ")), + ("phase", pod.getStatus.getPhase()) + ) + + // Use more loggable format if value is null or empty + details.map { case (k, v) => + val newValue = Option(v).filter(_.nonEmpty).getOrElse("N/A") + s"\n\t $k: $newValue" + }.mkString("") + } +} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala index 3e0c400febca1..cb4cd42142ca4 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala @@ -188,4 +188,23 @@ package object config { .internal() .stringConf .createOptional + + private[spark] val WAIT_FOR_APP_COMPLETION = + ConfigBuilder("spark.kubernetes.submit.waitAppCompletion") + .doc( + """ + | In cluster mode, whether to wait for the application to finish before exiting the + | launcher process. + """.stripMargin) + .booleanConf + .createWithDefault(true) + + private[spark] val REPORT_INTERVAL = + ConfigBuilder("spark.kubernetes.report.interval") + .doc( + """ + | Interval between reports of the current app status in cluster mode. + """.stripMargin) + .timeConf(TimeUnit.MILLISECONDS) + .createWithDefaultString("1s") }