Skip to content

Commit

Permalink
Use Coordinated shutdown for a proper termination of a Container inst…
Browse files Browse the repository at this point in the history
…ance (#68)
  • Loading branch information
izeigerman authored Jan 28, 2019
1 parent f299f62 commit a1a445e
Show file tree
Hide file tree
Showing 5 changed files with 8 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ private[akkeeper] final class AkkeeperAkkaConfig(akkeeperAkkaConfig: Config) {
lazy val port: Int = akkeeperAkkaConfig.getInt("port")
lazy val seedNodesNum: Int = akkeeperAkkaConfig.getInt("seed-nodes-num")
lazy val joinClusterTimeout: FiniteDuration = akkeeperAkkaConfig.getDuration("join-cluster-timeout")
lazy val leaveClusterTimeout: FiniteDuration = akkeeperAkkaConfig.getDuration("leave-cluster-timeout")
}

private[akkeeper] final class MasterConfig(masterConfig: Config) {
Expand Down
3 changes: 0 additions & 3 deletions akkeeper/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,6 @@ akkeeper {

# The timeout after which a node gives up its attempt to join a cluster.
join-cluster-timeout = 90s

# The timeout after which a node gives up its attempt to gracefully leave a cluster.
leave-cluster-timeout = 30s
}

api {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,7 @@ object ContainerInstanceMain extends App with ContainerDefinitionJsonProtocol {

ContainerInstanceService.createLocal(actorSystem, actors,
instanceStorage, instanceArgs.instanceId, instanceArgs.masterAddress,
joinClusterTimeout = instanceConfig.akkeeperAkka.joinClusterTimeout,
leaveClusterTimeout = instanceConfig.akkeeperAkka.leaveClusterTimeout)
joinClusterTimeout = instanceConfig.akkeeperAkka.joinClusterTimeout)

Await.result(actorSystem.whenTerminated, Duration.Inf)
sys.exit(0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,7 @@ class ContainerInstanceService(userActors: Seq[ActorLaunchContext],
instanceId: InstanceId,
masterAddress: Address,
registrationRetryInterval: FiniteDuration,
joinClusterTimeout: FiniteDuration,
leaveClusterTimeout: FiniteDuration)
joinClusterTimeout: FiniteDuration)
extends Actor with ActorLogging {

private implicit val dispatcher = context.dispatcher
Expand Down Expand Up @@ -96,10 +95,7 @@ class ContainerInstanceService(userActors: Seq[ActorLaunchContext],

private def terminateThisInstance(): Unit = {
cluster.leave(cluster.selfAddress)
cluster.registerOnMemberRemoved(context.system.terminate())
// Scheduling a timeout command.
context.become(leavingClusterReceive)
context.system.scheduler.scheduleOnce(leaveClusterTimeout, self, LeaveClusterTimeout)
CoordinatedShutdown(cluster.system).run(CoordinatedShutdown.ClusterLeavingReason)
}

private def initializedReceive: Receive = {
Expand Down Expand Up @@ -129,13 +125,6 @@ class ContainerInstanceService(userActors: Seq[ActorLaunchContext],
// Safely ignore the timeout command.
}

private def leavingClusterReceive: Receive = {
case LeaveClusterTimeout =>
log.warning(s"Couldn't leave the cluster after ${leaveClusterTimeout.toSeconds} seconds. " +
"Terminating this instance...")
context.system.terminate()
}

private def joiningClusterReceive: Receive = {
case InstanceJoinedCluster =>
log.debug("Successfully joined the cluster")
Expand All @@ -145,7 +134,7 @@ class ContainerInstanceService(userActors: Seq[ActorLaunchContext],
case JoinClusterTimeout =>
log.error(s"Couldn't join the cluster during ${joinClusterTimeout.toSeconds} seconds. " +
"Terminating this instance...")
context.system.terminate()
terminateThisInstance()
}

private def waitingForJoinCommandReceive: Receive = {
Expand All @@ -165,7 +154,6 @@ object ContainerInstanceService {
private case object JoinCluster
private case object RetryRegistration
private case object JoinClusterTimeout
private case object LeaveClusterTimeout
private case object InstanceJoinedCluster
private[akkeeper] val DefaultRegistrationRetryInterval = 30 seconds
private[akkeeper] val DefaultJoinClusterTimeout = 90 seconds
Expand All @@ -179,10 +167,9 @@ object ContainerInstanceService {
instanceId: InstanceId,
masterAddress: Address,
registrationRetryInterval: FiniteDuration = DefaultRegistrationRetryInterval,
joinClusterTimeout: FiniteDuration = DefaultJoinClusterTimeout,
leaveClusterTimeout: FiniteDuration = DefaultLeaveClusterTimeout): ActorRef = {
joinClusterTimeout: FiniteDuration = DefaultJoinClusterTimeout): ActorRef = {
val props = Props(classOf[ContainerInstanceService], userActors, instanceStorage,
instanceId, masterAddress, registrationRetryInterval, joinClusterTimeout, leaveClusterTimeout)
instanceId, masterAddress, registrationRetryInterval, joinClusterTimeout)
factory.actorOf(props, ActorName)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,10 @@ class ContainerInstanceServiceSpec(system: ActorSystem) extends TestKit(system)
instanceId: InstanceId,
masterAddress: Address,
retryInterval: FiniteDuration = DefaultRegistrationRetryInterval,
joinClusterTimeout: FiniteDuration = DefaultJoinClusterTimeout,
leaveClusterTimeout: FiniteDuration = DefaultLeaveClusterTimeout
joinClusterTimeout: FiniteDuration = DefaultJoinClusterTimeout
): ActorRef = {
val props = Props(classOf[ContainerInstanceService], userActors, instanceStorage,
instanceId, masterAddress, retryInterval, joinClusterTimeout, leaveClusterTimeout)
instanceId, masterAddress, retryInterval, joinClusterTimeout)
childActorOf(props, ContainerInstanceService.ActorName)
}

Expand Down

0 comments on commit a1a445e

Please sign in to comment.