diff --git a/akkeeper-common/src/main/scala/akkeeper/common/config/Configs.scala b/akkeeper-common/src/main/scala/akkeeper/common/config/Configs.scala index 4113f38..499b6d9 100644 --- a/akkeeper-common/src/main/scala/akkeeper/common/config/Configs.scala +++ b/akkeeper-common/src/main/scala/akkeeper/common/config/Configs.scala @@ -41,7 +41,6 @@ private[akkeeper] final class AkkeeperAkkaConfig(akkeeperAkkaConfig: Config) { lazy val port: Int = akkeeperAkkaConfig.getInt("port") lazy val seedNodesNum: Int = akkeeperAkkaConfig.getInt("seed-nodes-num") lazy val joinClusterTimeout: FiniteDuration = akkeeperAkkaConfig.getDuration("join-cluster-timeout") - lazy val leaveClusterTimeout: FiniteDuration = akkeeperAkkaConfig.getDuration("leave-cluster-timeout") } private[akkeeper] final class MasterConfig(masterConfig: Config) { diff --git a/akkeeper/src/main/resources/reference.conf b/akkeeper/src/main/resources/reference.conf index 2ec08b5..38465db 100644 --- a/akkeeper/src/main/resources/reference.conf +++ b/akkeeper/src/main/resources/reference.conf @@ -55,9 +55,6 @@ akkeeper { # The timeout after which a node gives up its attempt to join a cluster. join-cluster-timeout = 90s - - # The timeout after which a node gives up its attempt to gracefully leave a cluster. - leave-cluster-timeout = 30s } api { diff --git a/akkeeper/src/main/scala/akkeeper/container/ContainerInstanceMain.scala b/akkeeper/src/main/scala/akkeeper/container/ContainerInstanceMain.scala index cc6c84b..2cd3227 100644 --- a/akkeeper/src/main/scala/akkeeper/container/ContainerInstanceMain.scala +++ b/akkeeper/src/main/scala/akkeeper/container/ContainerInstanceMain.scala @@ -90,8 +90,7 @@ object ContainerInstanceMain extends App with ContainerDefinitionJsonProtocol { ContainerInstanceService.createLocal(actorSystem, actors, instanceStorage, instanceArgs.instanceId, instanceArgs.masterAddress, - joinClusterTimeout = instanceConfig.akkeeperAkka.joinClusterTimeout, - leaveClusterTimeout = instanceConfig.akkeeperAkka.leaveClusterTimeout) + joinClusterTimeout = instanceConfig.akkeeperAkka.joinClusterTimeout) Await.result(actorSystem.whenTerminated, Duration.Inf) sys.exit(0) diff --git a/akkeeper/src/main/scala/akkeeper/container/service/ContainerInstanceService.scala b/akkeeper/src/main/scala/akkeeper/container/service/ContainerInstanceService.scala index d111fd1..a28a11b 100644 --- a/akkeeper/src/main/scala/akkeeper/container/service/ContainerInstanceService.scala +++ b/akkeeper/src/main/scala/akkeeper/container/service/ContainerInstanceService.scala @@ -33,8 +33,7 @@ class ContainerInstanceService(userActors: Seq[ActorLaunchContext], instanceId: InstanceId, masterAddress: Address, registrationRetryInterval: FiniteDuration, - joinClusterTimeout: FiniteDuration, - leaveClusterTimeout: FiniteDuration) + joinClusterTimeout: FiniteDuration) extends Actor with ActorLogging { private implicit val dispatcher = context.dispatcher @@ -96,10 +95,7 @@ class ContainerInstanceService(userActors: Seq[ActorLaunchContext], private def terminateThisInstance(): Unit = { cluster.leave(cluster.selfAddress) - cluster.registerOnMemberRemoved(context.system.terminate()) - // Scheduling a timeout command. - context.become(leavingClusterReceive) - context.system.scheduler.scheduleOnce(leaveClusterTimeout, self, LeaveClusterTimeout) + CoordinatedShutdown(cluster.system).run(CoordinatedShutdown.ClusterLeavingReason) } private def initializedReceive: Receive = { @@ -129,13 +125,6 @@ class ContainerInstanceService(userActors: Seq[ActorLaunchContext], // Safely ignore the timeout command. } - private def leavingClusterReceive: Receive = { - case LeaveClusterTimeout => - log.warning(s"Couldn't leave the cluster after ${leaveClusterTimeout.toSeconds} seconds. " + - "Terminating this instance...") - context.system.terminate() - } - private def joiningClusterReceive: Receive = { case InstanceJoinedCluster => log.debug("Successfully joined the cluster") @@ -145,7 +134,7 @@ class ContainerInstanceService(userActors: Seq[ActorLaunchContext], case JoinClusterTimeout => log.error(s"Couldn't join the cluster during ${joinClusterTimeout.toSeconds} seconds. " + "Terminating this instance...") - context.system.terminate() + terminateThisInstance() } private def waitingForJoinCommandReceive: Receive = { @@ -165,7 +154,6 @@ object ContainerInstanceService { private case object JoinCluster private case object RetryRegistration private case object JoinClusterTimeout - private case object LeaveClusterTimeout private case object InstanceJoinedCluster private[akkeeper] val DefaultRegistrationRetryInterval = 30 seconds private[akkeeper] val DefaultJoinClusterTimeout = 90 seconds @@ -179,10 +167,9 @@ object ContainerInstanceService { instanceId: InstanceId, masterAddress: Address, registrationRetryInterval: FiniteDuration = DefaultRegistrationRetryInterval, - joinClusterTimeout: FiniteDuration = DefaultJoinClusterTimeout, - leaveClusterTimeout: FiniteDuration = DefaultLeaveClusterTimeout): ActorRef = { + joinClusterTimeout: FiniteDuration = DefaultJoinClusterTimeout): ActorRef = { val props = Props(classOf[ContainerInstanceService], userActors, instanceStorage, - instanceId, masterAddress, registrationRetryInterval, joinClusterTimeout, leaveClusterTimeout) + instanceId, masterAddress, registrationRetryInterval, joinClusterTimeout) factory.actorOf(props, ActorName) } } diff --git a/akkeeper/src/test/scala/akkeeper/container/service/ContainerInstanceServiceSpec.scala b/akkeeper/src/test/scala/akkeeper/container/service/ContainerInstanceServiceSpec.scala index 5d5c52a..e1bb934 100644 --- a/akkeeper/src/test/scala/akkeeper/container/service/ContainerInstanceServiceSpec.scala +++ b/akkeeper/src/test/scala/akkeeper/container/service/ContainerInstanceServiceSpec.scala @@ -70,11 +70,10 @@ class ContainerInstanceServiceSpec(system: ActorSystem) extends TestKit(system) instanceId: InstanceId, masterAddress: Address, retryInterval: FiniteDuration = DefaultRegistrationRetryInterval, - joinClusterTimeout: FiniteDuration = DefaultJoinClusterTimeout, - leaveClusterTimeout: FiniteDuration = DefaultLeaveClusterTimeout + joinClusterTimeout: FiniteDuration = DefaultJoinClusterTimeout ): ActorRef = { val props = Props(classOf[ContainerInstanceService], userActors, instanceStorage, - instanceId, masterAddress, retryInterval, joinClusterTimeout, leaveClusterTimeout) + instanceId, masterAddress, retryInterval, joinClusterTimeout) childActorOf(props, ContainerInstanceService.ActorName) }