diff --git a/cook/src/main/scala/org/apache/spark/scheduler/cluster/cook/CoarseCookSchedulerBackend.scala b/cook/src/main/scala/org/apache/spark/scheduler/cluster/cook/CoarseCookSchedulerBackend.scala index a7bf06e413375..c9df48d069784 100644 --- a/cook/src/main/scala/org/apache/spark/scheduler/cluster/cook/CoarseCookSchedulerBackend.scala +++ b/cook/src/main/scala/org/apache/spark/scheduler/cluster/cook/CoarseCookSchedulerBackend.scala @@ -78,6 +78,17 @@ object CoarseCookSchedulerBackend { * a task is done. It launches Spark tasks within the coarse-grained Cook instances using the * CoarseGrainedSchedulerBackend mechanism. This class is useful for lower and more predictable * latency. + * + * Since Spark 2.0.0, executor id must be an integer even though its type is string. This backend + * uses task id which is also an integer and created via + * {{{ + * MesosCoarseGrainedSchedulerBackend.newMesosId + * }}} + * as executor id. + * + * To ensure the mapping from executor id (task id) to its Cook job instance is 1-1 and onto, + * we only allow one instance per Cook job and we are using the mapping from + * -> to track this relationship. */ class CoarseCookSchedulerBackend( scheduler: TaskSchedulerImpl, @@ -107,6 +118,7 @@ class CoarseCookSchedulerBackend( var totalFailures = 0 val jobIds = mutable.Set[UUID]() val abortedJobIds = mutable.Set[UUID]() + private val executorIdToJobId = mutable.HashMap[String, UUID]() private[this] val jobClient = new JobClient.Builder() .setHost(cookHost) @@ -184,15 +196,15 @@ class CoarseCookSchedulerBackend( import CoarseCookSchedulerBackend.fetchURI val jobId = UUID.randomUUID() - executorUUIDWriter(jobId) - logInfo(s"Creating job with id: $jobId") + val taskId = sparkMesosScheduler.newMesosTaskId() + executorIdToJobId += taskId -> jobId + logInfo(s"Creating job with id: $jobId. The corresponding executor id and task id is $taskId") val fakeOffer = Offer.newBuilder() .setId(OfferID.newBuilder().setValue("Cook-id")) .setFrameworkId(FrameworkID.newBuilder().setValue("Cook")) .setHostname("$(hostname)") - .setSlaveId(SlaveID.newBuilder().setValue("${MESOS_EXECUTOR_ID}")) + .setSlaveId(SlaveID.newBuilder().setValue(jobId.toString)) .build() - val taskId = sparkMesosScheduler.newMesosTaskId() val commandInfo = sparkMesosScheduler.createCommand(fakeOffer, numCores.toInt, taskId) val commandString = commandInfo.getValue val environmentInfo = commandInfo.getEnvironment @@ -284,6 +296,7 @@ class CoarseCookSchedulerBackend( .setMemory(executorMemory(sc).toDouble) .setCpus(numCores) .setPriority(priority) + .disableMeaCulpaRetries() .setRetries(1) val container = conf.get("spark.executor.cook.container", null) @@ -315,12 +328,6 @@ class CoarseCookSchedulerBackend( ret } - // In our fake offer mesos adds some autoincrementing ID per job but - // this sticks around in the executorId so we strop it out to get the actual executor ID - private def instanceIdFromExecutorId(executorId: String): UUID = { - UUID.fromString(executorId.split('/')(0)) - } - override def createDriverEndpoint(properties: Seq[(String, String)]): DriverEndpoint = { new DriverEndpoint(rpcEnv, properties) { override def onDisconnected(remoteAddress: RpcAddress): Unit = { @@ -330,32 +337,29 @@ class CoarseCookSchedulerBackend( } def handleDisconnectedExecutor(executorId: String): Unit = { - logInfo(s"Recieved disconnect message from executor with ID: ${executorId}") + logInfo(s"Received disconnect message from executor with id $executorId." + + s" Its related cook job id is ${executorIdToJobId(executorId)}") // TODO: we end up querying for everything, not sure of the perf implications here - val allInstances = jobClient.query(jobIds.asJava).asScala.values + val jobId = executorIdToJobId(executorId) + val jobInstances = jobClient.query(Seq(jobId).asJava).asScala.values .flatMap(_.getInstances.asScala).toSeq - val instanceId = instanceIdFromExecutorId(executorId) - val correspondingInstance = allInstances.find(_.getTaskID == instanceId) - if (correspondingInstance.isEmpty) { + val slaveLostReason = SlaveLost("Remote RPC client disassociated likely due to " + + "containers exceeding thresholds or network issues. Check driver logs for WARN " + + "message.") + if (jobInstances.isEmpty) { // This can happen in the case of an aborted executor when the Listener removes it first. // We can just mark it as lost since it wouldn't be preempted anyways. - removeExecutor(executorId, SlaveLost("Remote RPC client disassociated likely due to " + - "containers exceeding thresholds or network issues. Check driver logs for WARN " + - "message.")) + removeExecutor(executorId, slaveLostReason) } - correspondingInstance.foreach(instance => { - val wasPreempted = instance.getPreempted - val exitCode = instance.getReasonCode - if (wasPreempted) { - logInfo(s"Executor ${executorId} was removed due to preemption. Marking as killed.") - removeExecutor(executorId, ExecutorExited(exitCode.toInt, - false, "Executor was preempted by the scheduler.")) + jobInstances.foreach { instance => + if (instance.getPreempted) { + logInfo(s"Executor $executorId was removed due to preemption. Marking as killed.") + removeExecutor(executorId, ExecutorExited(instance.getReasonCode.toInt, + exitCausedByApp = false, "Executor was preempted by the scheduler.")) } else { - removeExecutor(executorId, SlaveLost("Remote RPC client disassociated likely due to " + - "containers exceeding thresholds or network issues. Check driver logs for WARN " + - "message.")) + removeExecutor(executorId, slaveLostReason) } - }) + } } } } @@ -365,13 +369,9 @@ class CoarseCookSchedulerBackend( * @return whether the kill request is acknowledged. */ override def doKillExecutors(executorIds: Seq[String]): Future[Boolean] = Future.successful { - val instancesToKill = executorIds.map(instanceIdFromExecutorId).toSet - val jobsToInstances = jobClient.query(jobIds.asJava).asScala.values - .flatMap(job => job.getInstances.asScala.map((job.getUUID, _))).toSeq - val correspondingJobs = jobsToInstances.filter(i => instancesToKill.contains(i._2.getTaskID)) - .map(_._1).toSet - jobClient.abort(correspondingJobs.asJava) - correspondingJobs.foreach(abortedJobIds.add) + val jobIdsToKill = executorIds.flatMap(executorIdToJobId.get) + jobClient.abort(jobIdsToKill.asJava) + jobIdsToKill.foreach(abortedJobIds.add) true } diff --git a/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java b/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java index 7d272b856f300..b458331eee34e 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java +++ b/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java @@ -91,7 +91,7 @@ abstract List buildCommand(Map env) */ List buildJavaCommand(String extraClassPath) throws IOException { List cmd = new ArrayList<>(); - String javaCommand = System.getenv("JAVA_CMD"); + String javaCommand = System.getenv("JAVA_COMMAND"); if (javaCommand != null) { cmd.addAll(Arrays.asList(javaCommand.trim().split("\\s+"))); return cmd;