From 50b9cb2495d59ae117861971d3181b077ef7afac Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Thu, 23 Jul 2020 19:12:51 -0700 Subject: [PATCH] rename WorkerDecommission message to DecommissionWorker. (CR feedback) --- .../main/scala/org/apache/spark/deploy/DeployMessage.scala | 2 +- .../main/scala/org/apache/spark/deploy/master/Master.scala | 6 +++--- .../main/scala/org/apache/spark/deploy/worker/Worker.scala | 4 ++-- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala index b7a64d75a8d47..ec05496c9986e 100644 --- a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala @@ -64,7 +64,7 @@ private[deploy] object DeployMessages { * @param id the worker id * @param worker the worker endpoint ref */ - case class WorkerDecommission( + case class DecommissionWorker( id: String, worker: RpcEndpointRef) extends DeployMessage diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 220e1c963d5ea..74c77468a19a2 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -245,7 +245,7 @@ private[deploy] class Master( logError("Leadership has been revoked -- master shutting down.") System.exit(0) - case WorkerDecommission(id, workerRef) => + case DecommissionWorker(id, workerRef) => logInfo("Recording worker %s decommissioning".format(id)) if (state == RecoveryState.STANDBY) { workerRef.send(MasterInStandby) @@ -874,7 +874,7 @@ private[deploy] class Master( /** * Decommission all workers that are active on any of the given hostnames. The decommissioning is - * asynchronously done by enqueueing WorkerDecommission messages to self. No checks are done about + * asynchronously done by enqueueing DecommissionWorker messages to self. No checks are done about * the prior state of the worker. So an already decommissioned worker will match as well. * * @param hostnames: A list of hostnames without the ports. Like "localhost", "foo.bar.com" etc @@ -893,7 +893,7 @@ private[deploy] class Master( // The workers are removed async to avoid blocking the receive loop for the entire batch workersToRemove.foreach(wi => { logInfo(s"Sending the worker decommission to ${wi.id} and ${wi.endpoint}") - self.send(WorkerDecommission(wi.id, wi.endpoint)) + self.send(DecommissionWorker(wi.id, wi.endpoint)) }) // Return the count of workers actually removed diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index 862e685c2dce6..36bac0835ac7e 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -668,7 +668,7 @@ private[deploy] class Worker( finishedApps += id maybeCleanupApplication(id) - case WorkerDecommission(_, _) => + case DecommissionWorker(_, _) => decommissionSelf() } @@ -772,7 +772,7 @@ private[deploy] class Worker( if (conf.get(WORKER_DECOMMISSION_ENABLED)) { logDebug("Decommissioning self") decommissioned = true - sendToMaster(WorkerDecommission(workerId, self)) + sendToMaster(DecommissionWorker(workerId, self)) } else { logWarning("Asked to decommission self, but decommissioning not enabled") }