From 6befb2d8bdc5743d0333f4839cf301af165582ce Mon Sep 17 00:00:00 2001 From: Akshat Bordia Date: Tue, 9 Jun 2020 09:29:37 -0500 Subject: [PATCH] [SPARK-31486][CORE] spark.submit.waitAppCompletion flag to control spark-submit exit in Standalone Cluster Mode ### What changes were proposed in this pull request? These changes implement an application wait mechanism which will allow spark-submit to wait until the application finishes in Standalone Spark Mode. This will delay the exit of spark-submit JVM until the job is completed. This implementation will keep monitoring the application until it is either finished, failed or killed. This will be controlled via a flag (spark.submit.waitForCompletion) which will be set to false by default. ### Why are the changes needed? Currently, Livy API for Standalone Cluster Mode doesn't know when the job has finished. If this flag is enabled, this can be used by Livy API (/batches/{batchId}/state) to find out when the application has finished/failed. This flag is Similar to spark.yarn.submit.waitAppCompletion. ### Does this PR introduce any user-facing change? Yes, this PR introduces a new flag but it will be disabled by default. ### How was this patch tested? Couldn't implement unit tests since the pollAndReportStatus method has System.exit() calls. Please provide any suggestions. Tested spark-submit locally for the following scenarios: 1. With the flag enabled, spark-submit exits once the job is finished. 2. With the flag enabled and job failed, spark-submit exits when the job fails. 3. With the flag disabled, spark-submit exists post submitting the job (existing behavior). 4. Existing behavior is unchanged when the flag is not added explicitly. Closes #28258 from akshatb1/master. Lead-authored-by: Akshat Bordia Co-authored-by: Akshat Bordia Signed-off-by: Sean Owen --- .../org/apache/spark/deploy/Client.scala | 95 ++++++++++++++----- docs/spark-standalone.md | 19 ++++ 2 files changed, 88 insertions(+), 26 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/Client.scala b/core/src/main/scala/org/apache/spark/deploy/Client.scala index 7022b986ea025..6beea5646f63b 100644 --- a/core/src/main/scala/org/apache/spark/deploy/Client.scala +++ b/core/src/main/scala/org/apache/spark/deploy/Client.scala @@ -17,6 +17,8 @@ package org.apache.spark.deploy +import java.util.concurrent.TimeUnit + import scala.collection.mutable.HashSet import scala.concurrent.ExecutionContext import scala.reflect.ClassTag @@ -27,6 +29,7 @@ import org.apache.log4j.Logger import org.apache.spark.{SecurityManager, SparkConf} import org.apache.spark.deploy.DeployMessages._ import org.apache.spark.deploy.master.{DriverState, Master} +import org.apache.spark.deploy.master.DriverState.DriverState import org.apache.spark.internal.{config, Logging} import org.apache.spark.internal.config.Network.RPC_ASK_TIMEOUT import org.apache.spark.resource.ResourceUtils @@ -61,6 +64,12 @@ private class ClientEndpoint( private val lostMasters = new HashSet[RpcAddress] private var activeMasterEndpoint: RpcEndpointRef = null + private val waitAppCompletion = conf.getBoolean("spark.standalone.submit.waitAppCompletion", + false) + private val REPORT_DRIVER_STATUS_INTERVAL = 10000 + private var submittedDriverID = "" + private var driverStatusReported = false + private def getProperty(key: String, conf: SparkConf): Option[String] = { sys.props.get(key).orElse(conf.getOption(key)) @@ -107,8 +116,13 @@ private class ClientEndpoint( case "kill" => val driverId = driverArgs.driverId + submittedDriverID = driverId asyncSendToMasterAndForwardReply[KillDriverResponse](RequestKillDriver(driverId)) } + logInfo("... waiting before polling master for driver state") + forwardMessageThread.scheduleAtFixedRate(() => Utils.tryLogNonFatalError { + monitorDriverStatus() + }, 5000, REPORT_DRIVER_STATUS_INTERVAL, TimeUnit.MILLISECONDS) } /** @@ -124,58 +138,87 @@ private class ClientEndpoint( } } - /* Find out driver status then exit the JVM */ - def pollAndReportStatus(driverId: String): Unit = { - // Since ClientEndpoint is the only RpcEndpoint in the process, blocking the event loop thread - // is fine. - logInfo("... waiting before polling master for driver state") - Thread.sleep(5000) - logInfo("... polling master for driver state") - val statusResponse = - activeMasterEndpoint.askSync[DriverStatusResponse](RequestDriverStatus(driverId)) - if (statusResponse.found) { - logInfo(s"State of $driverId is ${statusResponse.state.get}") - // Worker node, if present - (statusResponse.workerId, statusResponse.workerHostPort, statusResponse.state) match { - case (Some(id), Some(hostPort), Some(DriverState.RUNNING)) => - logInfo(s"Driver running on $hostPort ($id)") - case _ => + private def monitorDriverStatus(): Unit = { + if (submittedDriverID != "") { + asyncSendToMasterAndForwardReply[DriverStatusResponse](RequestDriverStatus(submittedDriverID)) + } + } + + /** + * Processes and reports the driver status then exit the JVM if the + * waitAppCompletion is set to false, else reports the driver status + * if debug logs are enabled. + */ + + def reportDriverStatus( + found: Boolean, + state: Option[DriverState], + workerId: Option[String], + workerHostPort: Option[String], + exception: Option[Exception]): Unit = { + if (found) { + // Using driverStatusReported to avoid writing following + // logs again when waitAppCompletion is set to true + if (!driverStatusReported) { + driverStatusReported = true + logInfo(s"State of $submittedDriverID is ${state.get}") + // Worker node, if present + (workerId, workerHostPort, state) match { + case (Some(id), Some(hostPort), Some(DriverState.RUNNING)) => + logInfo(s"Driver running on $hostPort ($id)") + case _ => + } } // Exception, if present - statusResponse.exception match { + exception match { case Some(e) => logError(s"Exception from cluster was: $e") e.printStackTrace() System.exit(-1) case _ => - System.exit(0) + state.get match { + case DriverState.FINISHED | DriverState.FAILED | + DriverState.ERROR | DriverState.KILLED => + logInfo(s"State of driver $submittedDriverID is ${state.get}, " + + s"exiting spark-submit JVM.") + System.exit(0) + case _ => + if (!waitAppCompletion) { + logInfo(s"spark-submit not configured to wait for completion, " + + s"exiting spark-submit JVM.") + System.exit(0) + } else { + logDebug(s"State of driver $submittedDriverID is ${state.get}, " + + s"continue monitoring driver status.") + } + } + } + } else { + logError(s"ERROR: Cluster master did not recognize $submittedDriverID") + System.exit(-1) } - } else { - logError(s"ERROR: Cluster master did not recognize $driverId") - System.exit(-1) } - } - override def receive: PartialFunction[Any, Unit] = { case SubmitDriverResponse(master, success, driverId, message) => logInfo(message) if (success) { activeMasterEndpoint = master - pollAndReportStatus(driverId.get) + submittedDriverID = driverId.get } else if (!Utils.responseFromBackup(message)) { System.exit(-1) } - case KillDriverResponse(master, driverId, success, message) => logInfo(message) if (success) { activeMasterEndpoint = master - pollAndReportStatus(driverId) } else if (!Utils.responseFromBackup(message)) { System.exit(-1) } + + case DriverStatusResponse(found, state, workerId, workerHostPort, exception) => + reportDriverStatus(found, state, workerId, workerHostPort, exception) } override def onDisconnected(remoteAddress: RpcAddress): Unit = { diff --git a/docs/spark-standalone.md b/docs/spark-standalone.md index 1e6f8c586d546..1f70d46d587a8 100644 --- a/docs/spark-standalone.md +++ b/docs/spark-standalone.md @@ -374,6 +374,25 @@ To run an interactive Spark shell against the cluster, run the following command You can also pass an option `--total-executor-cores ` to control the number of cores that spark-shell uses on the cluster. +# Client Properties + +Spark applications supports the following configuration properties specific to standalone mode: + + + + + + + + + +
Property NameDefault ValueMeaningSince Version
spark.standalone.submit.waitAppCompletionfalse + In standalone cluster mode, controls whether the client waits to exit until the application completes. + If set to true, the client process will stay alive polling the driver's status. + Otherwise, the client process will exit after submission. + 3.1.0
+ + # Launching Spark Applications The [`spark-submit` script](submitting-applications.html) provides the most straightforward way to