Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Removed Kubernetes self contact points hack #433

Merged
merged 1 commit into from
Jan 11, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -99,16 +99,9 @@ final class ClusterBootstrap(implicit system: ExtendedActorSystem) extends Exten
_selfContactPointUri.success(baseUri)

/** INTERNAL API */
@InternalApi private[akka] def selfContactPoints: Future[Set[(String, Int)]] =
@InternalApi private[akka] def selfContactPoint: Future[(String, Int)] =
_selfContactPointUri.future.map { uri =>
settings.joinDecider.selfDerivedHost match {
case Some(selfDerivedHost) if uri.authority.host.isIPv4 =>
val derivedHost = s"${uri.authority.host.toString.replace('.', '-')}.$selfDerivedHost"
log.info(s"Derived self contact point $derivedHost:${uri.authority.port}")
Set((uri.authority.host.toString, uri.authority.port), (derivedHost, uri.authority.port))
case _ =>
Set((uri.authority.host.toString, uri.authority.port))
}
(uri.authority.host.toString, uri.authority.port)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,6 @@ final class ClusterBootstrapSettings(config: Config, log: LoggingAdapter) {

object joinDecider {
val implClass: String = bootConfig.getString("join-decider.class")

val selfDerivedHost: Option[String] = for {
domain <- config.optDefinedValue("akka.discovery.kubernetes-api.pod-domain")
namespace <- config.optDefinedValue("akka.discovery.kubernetes-api.pod-namespace")
} yield s"$namespace.pod.$domain"
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,15 +62,13 @@ class LowestAddressJoinDecider(system: ActorSystem, settings: ClusterBootstrapSe
"Exceeded stable margins without locating seed-nodes, however this node {} is NOT the lowest address " +
"out of the discovered endpoints in this deployment, thus NOT joining self. Expecting node [{}] " +
"(out of [{}]) to perform the self-join and initiate the cluster.",
contactPointsString(selfContactPoints), lowestAddress.getOrElse(""),
info.contactPoints.mkString(", "))
contactPointString(selfContactPoint), lowestAddress.getOrElse(""), info.contactPoints.mkString(", "))
else
log.warning(
"Exceeded stable margins without locating seed-nodes, however this node {} is configured with " +
"new-cluster-enabled=off, thus NOT joining self. Expecting existing cluster or node [{}] " +
"(out of [{}]) to perform the self-join and initiate the cluster.",
contactPointsString(selfContactPoints), lowestAddress.getOrElse(""),
info.contactPoints.mkString(", "))
contactPointString(selfContactPoint), lowestAddress.getOrElse(""), info.contactPoints.mkString(", "))
}

// the probing will continue until the lowest addressed node decides to join itself.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,42 +22,41 @@ private[bootstrap] abstract class SelfAwareJoinDecider(system: ActorSystem, sett
protected val log = Logging(system, getClass)

/** Returns the current `selfContactPoints` as a String for logging, e.g. [127.0.0.1:64714]. */
protected def contactPointsString(contactPoints: Set[(String, Int)]): String =
contactPoints.map(_.productIterator.mkString(":")).mkString("[", ",", "]")
protected def contactPointString(contactPoint: (String, Int)): String =
contactPoint.productIterator.mkString(":")

/**
* The value `ClusterBootstrap(system).selfContactPoints` is set prior to HTTP binding,
* during [[akka.management.AkkaManagement.start()]], hence we accept blocking on
* this initialization.
*/
private[bootstrap] def selfContactPoints: Set[(String, Int)] =
Try(Await.result(ClusterBootstrap(system).selfContactPoints, 10.seconds))
.getOrElse(throw new IllegalStateException(
"'Bootstrap.selfContactPoint' was NOT set, but is required for the bootstrap to work " +
"if binding bootstrap routes manually and not via akka-management."))
private[bootstrap] def selfContactPoint: (String, Int) =
Try(Await.result(ClusterBootstrap(system).selfContactPoint, 10.seconds)).getOrElse(throw new IllegalStateException(
"'Bootstrap.selfContactPoint' was NOT set, but is required for the bootstrap to work " +
"if binding bootstrap routes manually and not via akka-management."))

/**
* Determines whether it has the need and ability to join self and create a new cluster.
*/
private[bootstrap] def canJoinSelf(target: ResolvedTarget, info: SeedNodesInformation): Boolean = {
val self = selfContactPoints
val self = selfContactPoint
if (matchesSelf(target, self)) true
else {
if (!info.contactPoints.exists(matchesSelf(_, self))) {
log.warning("Self contact point [{}] not found in targets {}", contactPointsString(selfContactPoints),
log.warning("Self contact point [{}] not found in targets {}", contactPointString(selfContactPoint),
info.contactPoints.mkString(", "))
}
false
}
}

private[bootstrap] def matchesSelf(target: ResolvedTarget, contactPoints: Set[(String, Int)]): Boolean =
private[bootstrap] def matchesSelf(target: ResolvedTarget, contactPoint: (String, Int)): Boolean = {
val (host, port) = contactPoint
target.port match {
case None =>
contactPoints.exists { case (host, _) => hostMatches(host, target) }
case Some(lowestPort) =>
contactPoints.exists { case (host, port) => hostMatches(host, target) && port == lowestPort }
case None => hostMatches(host, target)
case Some(lowestPort) => hostMatches(host, target) && port == lowestPort
}
}

/**
* Checks for both host name and IP address for discovery mechanisms that return both.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ class ClusterBootstrapSettingsSpec extends AbstractBootstrapSpec {
"have the expected defaults " in {
val settings = ClusterBootstrapSettings(config, NoLogging)
settings.newClusterEnabled should ===(true)
settings.joinDecider.selfDerivedHost.isEmpty should ===(true)
}

"have the expected overrides " in {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,13 +202,13 @@ class SelfAwareJoinDeciderSpec extends JoinDeciderSpec {

"SelfAwareJoinDecider" should {

"return true if a target matches selfContactPoints" in {
"return true if a target matches selfContactPoint" in {
ClusterBootstrap(system).setSelfContactPoint(s"http://10.0.0.2:$managementPort/test")
val decider = new LowestAddressJoinDecider(system, settings)
val selfContactPoints = decider.selfContactPoints
val selfContactPoint = decider.selfContactPoint
val info = seedNodes
val target = info.seedNodesObservations.toList.map(_.contactPoint).sorted.headOption
target.exists(decider.matchesSelf(_, selfContactPoints)) should ===(true)
target.exists(decider.matchesSelf(_, selfContactPoint)) should ===(true)
}

"be able to join self if all conditions met" in {
Expand Down