From 4005073884224084a7f1bffe62e0d9b9af9c19ef Mon Sep 17 00:00:00 2001 From: James Roper Date: Mon, 7 Jan 2019 12:33:34 +1100 Subject: [PATCH] Removed Kubernetes self contact points hack Fixes #424. Issue #238 introduced a hack that meant that cluster bootstrap used the pod host name when the kubernetes configuration was present, rather than the IP address, since Kubernetes discovery returned host names rather than IP addresses to work with Istio. That hack was made redundant however by #242, which introduced matching by either hostname, or IP address, so it didn't matter if the self host name was an IP address or hostname. However, the hack wasn't removed and stayed there, benign, until #415 changed the way namespace configuration is consumed. At that point, the hack was still benign, but resulted in some confusing log messages. This removes the hack. --- .../cluster/bootstrap/ClusterBootstrap.scala | 11 ++------ .../bootstrap/ClusterBootstrapSettings.scala | 5 ---- .../bootstrap/LowestAddressJoinDecider.scala | 6 ++--- .../bootstrap/SelfAwareJoinDecider.scala | 27 +++++++++---------- .../ClusterBootstrapSettingsSpec.scala | 1 - .../LowestAddressJoinDeciderSpec.scala | 6 ++--- 6 files changed, 20 insertions(+), 36 deletions(-) diff --git a/cluster-bootstrap/src/main/scala/akka/management/cluster/bootstrap/ClusterBootstrap.scala b/cluster-bootstrap/src/main/scala/akka/management/cluster/bootstrap/ClusterBootstrap.scala index ff03f6900..9190fb871 100644 --- a/cluster-bootstrap/src/main/scala/akka/management/cluster/bootstrap/ClusterBootstrap.scala +++ b/cluster-bootstrap/src/main/scala/akka/management/cluster/bootstrap/ClusterBootstrap.scala @@ -99,16 +99,9 @@ final class ClusterBootstrap(implicit system: ExtendedActorSystem) extends Exten _selfContactPointUri.success(baseUri) /** INTERNAL API */ - @InternalApi private[akka] def selfContactPoints: Future[Set[(String, Int)]] = + @InternalApi private[akka] def selfContactPoint: Future[(String, Int)] = _selfContactPointUri.future.map { uri => - settings.joinDecider.selfDerivedHost match { - case Some(selfDerivedHost) if uri.authority.host.isIPv4 => - val derivedHost = s"${uri.authority.host.toString.replace('.', '-')}.$selfDerivedHost" - log.info(s"Derived self contact point $derivedHost:${uri.authority.port}") - Set((uri.authority.host.toString, uri.authority.port), (derivedHost, uri.authority.port)) - case _ => - Set((uri.authority.host.toString, uri.authority.port)) - } + (uri.authority.host.toString, uri.authority.port) } } diff --git a/cluster-bootstrap/src/main/scala/akka/management/cluster/bootstrap/ClusterBootstrapSettings.scala b/cluster-bootstrap/src/main/scala/akka/management/cluster/bootstrap/ClusterBootstrapSettings.scala index 3a7f3aad7..25ab8d4a2 100644 --- a/cluster-bootstrap/src/main/scala/akka/management/cluster/bootstrap/ClusterBootstrapSettings.scala +++ b/cluster-bootstrap/src/main/scala/akka/management/cluster/bootstrap/ClusterBootstrapSettings.scala @@ -97,11 +97,6 @@ final class ClusterBootstrapSettings(config: Config, log: LoggingAdapter) { object joinDecider { val implClass: String = bootConfig.getString("join-decider.class") - - val selfDerivedHost: Option[String] = for { - domain <- config.optDefinedValue("akka.discovery.kubernetes-api.pod-domain") - namespace <- config.optDefinedValue("akka.discovery.kubernetes-api.pod-namespace") - } yield s"$namespace.pod.$domain" } } diff --git a/cluster-bootstrap/src/main/scala/akka/management/cluster/bootstrap/LowestAddressJoinDecider.scala b/cluster-bootstrap/src/main/scala/akka/management/cluster/bootstrap/LowestAddressJoinDecider.scala index 235ff119a..83fea02fd 100644 --- a/cluster-bootstrap/src/main/scala/akka/management/cluster/bootstrap/LowestAddressJoinDecider.scala +++ b/cluster-bootstrap/src/main/scala/akka/management/cluster/bootstrap/LowestAddressJoinDecider.scala @@ -62,15 +62,13 @@ class LowestAddressJoinDecider(system: ActorSystem, settings: ClusterBootstrapSe "Exceeded stable margins without locating seed-nodes, however this node {} is NOT the lowest address " + "out of the discovered endpoints in this deployment, thus NOT joining self. Expecting node [{}] " + "(out of [{}]) to perform the self-join and initiate the cluster.", - contactPointsString(selfContactPoints), lowestAddress.getOrElse(""), - info.contactPoints.mkString(", ")) + contactPointString(selfContactPoint), lowestAddress.getOrElse(""), info.contactPoints.mkString(", ")) else log.warning( "Exceeded stable margins without locating seed-nodes, however this node {} is configured with " + "new-cluster-enabled=off, thus NOT joining self. Expecting existing cluster or node [{}] " + "(out of [{}]) to perform the self-join and initiate the cluster.", - contactPointsString(selfContactPoints), lowestAddress.getOrElse(""), - info.contactPoints.mkString(", ")) + contactPointString(selfContactPoint), lowestAddress.getOrElse(""), info.contactPoints.mkString(", ")) } // the probing will continue until the lowest addressed node decides to join itself. diff --git a/cluster-bootstrap/src/main/scala/akka/management/cluster/bootstrap/SelfAwareJoinDecider.scala b/cluster-bootstrap/src/main/scala/akka/management/cluster/bootstrap/SelfAwareJoinDecider.scala index f381389c2..ac31ad120 100644 --- a/cluster-bootstrap/src/main/scala/akka/management/cluster/bootstrap/SelfAwareJoinDecider.scala +++ b/cluster-bootstrap/src/main/scala/akka/management/cluster/bootstrap/SelfAwareJoinDecider.scala @@ -22,42 +22,41 @@ private[bootstrap] abstract class SelfAwareJoinDecider(system: ActorSystem, sett protected val log = Logging(system, getClass) /** Returns the current `selfContactPoints` as a String for logging, e.g. [127.0.0.1:64714]. */ - protected def contactPointsString(contactPoints: Set[(String, Int)]): String = - contactPoints.map(_.productIterator.mkString(":")).mkString("[", ",", "]") + protected def contactPointString(contactPoint: (String, Int)): String = + contactPoint.productIterator.mkString(":") /** * The value `ClusterBootstrap(system).selfContactPoints` is set prior to HTTP binding, * during [[akka.management.AkkaManagement.start()]], hence we accept blocking on * this initialization. */ - private[bootstrap] def selfContactPoints: Set[(String, Int)] = - Try(Await.result(ClusterBootstrap(system).selfContactPoints, 10.seconds)) - .getOrElse(throw new IllegalStateException( - "'Bootstrap.selfContactPoint' was NOT set, but is required for the bootstrap to work " + - "if binding bootstrap routes manually and not via akka-management.")) + private[bootstrap] def selfContactPoint: (String, Int) = + Try(Await.result(ClusterBootstrap(system).selfContactPoint, 10.seconds)).getOrElse(throw new IllegalStateException( + "'Bootstrap.selfContactPoint' was NOT set, but is required for the bootstrap to work " + + "if binding bootstrap routes manually and not via akka-management.")) /** * Determines whether it has the need and ability to join self and create a new cluster. */ private[bootstrap] def canJoinSelf(target: ResolvedTarget, info: SeedNodesInformation): Boolean = { - val self = selfContactPoints + val self = selfContactPoint if (matchesSelf(target, self)) true else { if (!info.contactPoints.exists(matchesSelf(_, self))) { - log.warning("Self contact point [{}] not found in targets {}", contactPointsString(selfContactPoints), + log.warning("Self contact point [{}] not found in targets {}", contactPointString(selfContactPoint), info.contactPoints.mkString(", ")) } false } } - private[bootstrap] def matchesSelf(target: ResolvedTarget, contactPoints: Set[(String, Int)]): Boolean = + private[bootstrap] def matchesSelf(target: ResolvedTarget, contactPoint: (String, Int)): Boolean = { + val (host, port) = contactPoint target.port match { - case None => - contactPoints.exists { case (host, _) => hostMatches(host, target) } - case Some(lowestPort) => - contactPoints.exists { case (host, port) => hostMatches(host, target) && port == lowestPort } + case None => hostMatches(host, target) + case Some(lowestPort) => hostMatches(host, target) && port == lowestPort } + } /** * Checks for both host name and IP address for discovery mechanisms that return both. diff --git a/cluster-bootstrap/src/test/scala/akka/management/cluster/bootstrap/ClusterBootstrapSettingsSpec.scala b/cluster-bootstrap/src/test/scala/akka/management/cluster/bootstrap/ClusterBootstrapSettingsSpec.scala index 0f0135775..6b8ba30cc 100644 --- a/cluster-bootstrap/src/test/scala/akka/management/cluster/bootstrap/ClusterBootstrapSettingsSpec.scala +++ b/cluster-bootstrap/src/test/scala/akka/management/cluster/bootstrap/ClusterBootstrapSettingsSpec.scala @@ -17,7 +17,6 @@ class ClusterBootstrapSettingsSpec extends AbstractBootstrapSpec { "have the expected defaults " in { val settings = ClusterBootstrapSettings(config, NoLogging) settings.newClusterEnabled should ===(true) - settings.joinDecider.selfDerivedHost.isEmpty should ===(true) } "have the expected overrides " in { diff --git a/cluster-bootstrap/src/test/scala/akka/management/cluster/bootstrap/LowestAddressJoinDeciderSpec.scala b/cluster-bootstrap/src/test/scala/akka/management/cluster/bootstrap/LowestAddressJoinDeciderSpec.scala index f18e29ff9..86daaea74 100644 --- a/cluster-bootstrap/src/test/scala/akka/management/cluster/bootstrap/LowestAddressJoinDeciderSpec.scala +++ b/cluster-bootstrap/src/test/scala/akka/management/cluster/bootstrap/LowestAddressJoinDeciderSpec.scala @@ -202,13 +202,13 @@ class SelfAwareJoinDeciderSpec extends JoinDeciderSpec { "SelfAwareJoinDecider" should { - "return true if a target matches selfContactPoints" in { + "return true if a target matches selfContactPoint" in { ClusterBootstrap(system).setSelfContactPoint(s"http://10.0.0.2:$managementPort/test") val decider = new LowestAddressJoinDecider(system, settings) - val selfContactPoints = decider.selfContactPoints + val selfContactPoint = decider.selfContactPoint val info = seedNodes val target = info.seedNodesObservations.toList.map(_.contactPoint).sorted.headOption - target.exists(decider.matchesSelf(_, selfContactPoints)) should ===(true) + target.exists(decider.matchesSelf(_, selfContactPoint)) should ===(true) } "be able to join self if all conditions met" in {