Skip to content

Commit

Permalink
Allow KafkaRoller talk to controller directly
Browse files Browse the repository at this point in the history
Update AdminClientProvider to create an admin client for controllers with BOOTSTRAP_CONTROLLERS_CONFIG set.
Allow KafkaRoller create admin client against controllers nodes, if they are running 3.9.0 or later.
Remove ceShouldBeFatal option as it's never set to true.

Signed-off-by: Gantigmaa Selenge <tina.selenge@gmail.com>
  • Loading branch information
tinaselenge committed Nov 15, 2024
1 parent 849ffe0 commit 99cc9e2
Show file tree
Hide file tree
Showing 8 changed files with 232 additions and 106 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,10 @@ public class KafkaCluster extends AbstractModel implements SupportsMetrics, Supp
protected static final String REPLICATION_PORT_NAME = "tcp-replication";
protected static final int KAFKA_AGENT_PORT = 8443;
protected static final String KAFKA_AGENT_PORT_NAME = "tcp-kafkaagent";
protected static final int CONTROLPLANE_PORT = 9090;
/**
* Port number used for control plane
*/
public static final int CONTROLPLANE_PORT = 9090;
protected static final String CONTROLPLANE_PORT_NAME = "tcp-ctrlplane"; // port name is up to 15 characters

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,8 @@
import org.apache.kafka.clients.admin.AlterConfigsResult;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.DescribeClusterResult;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.SslAuthenticationException;

Expand Down Expand Up @@ -111,7 +109,6 @@ public class KafkaRoller {
private static final ReconciliationLogger LOGGER = ReconciliationLogger.create(KafkaRoller.class);
private static final String CONTROLLER_QUORUM_FETCH_TIMEOUT_MS_CONFIG_NAME = "controller.quorum.fetch.timeout.ms";
private static final String CONTROLLER_QUORUM_FETCH_TIMEOUT_MS_CONFIG_DEFAULT = "2000";

private final PodOperator podOperations;
private final long pollingIntervalMs;
protected final long operationTimeoutMs;
Expand Down Expand Up @@ -198,7 +195,7 @@ public KafkaRoller(Reconciliation reconciliation, Vertx vertx, PodOperator podOp
private boolean maybeInitBrokerAdminClient() {
if (this.brokerAdminClient == null) {
try {
this.brokerAdminClient = adminClient(nodes.stream().filter(NodeRef::broker).collect(Collectors.toSet()), false);
this.brokerAdminClient = brokerAdminClient(nodes);
} catch (ForceableProblem | FatalProblem e) {
LOGGER.warnCr(reconciliation, "Failed to create brokerAdminClient.", e);
return false;
Expand All @@ -213,14 +210,17 @@ private boolean maybeInitBrokerAdminClient() {
*/
private boolean maybeInitControllerAdminClient() {
if (this.controllerAdminClient == null) {
// Prior to 3.9.0, Kafka did not support directly connecting to controllers nodes
// via Kafka Admin API when running in KRaft mode.
// Therefore, use brokers to initialise adminClient for quorum health check
// when the version is older than 3.9.0.
try {
// TODO: Currently, when running in KRaft mode Kafka does not support using Kafka Admin API with controller
// nodes. This is tracked in https://github.com/strimzi/strimzi-kafka-operator/issues/9692.
// Therefore use broker nodes of the cluster to initialise adminClient for quorum health check.
// Once Kafka Admin API is supported for controllers, nodes.stream().filter(NodeRef:controller)
// can be used here. Until then pass an empty set of nodes so the client is initialized with
// the brokers service.
this.controllerAdminClient = adminClient(Set.of(), false);
if (KafkaVersion.compareDottedVersions(kafkaVersion.version(), "3.9.0") >= 0) {
this.controllerAdminClient = controllerAdminClient(nodes);
} else {
this.controllerAdminClient = brokerAdminClient(Set.of());

}
} catch (ForceableProblem | FatalProblem e) {
LOGGER.warnCr(reconciliation, "Failed to create controllerAdminClient.", e);
return false;
Expand Down Expand Up @@ -607,35 +607,13 @@ private void checkIfRestartOrReconfigureRequired(NodeRef nodeRef, boolean isCont
KafkaBrokerLoggingConfigurationDiff brokerLoggingDiff = null;
boolean needsReconfig = false;

if (isController) {
if (maybeInitControllerAdminClient()) {
String controllerQuorumFetchTimeout = CONTROLLER_QUORUM_FETCH_TIMEOUT_MS_CONFIG_DEFAULT;
String desiredConfig = kafkaConfigProvider.apply(nodeRef.nodeId());

if (desiredConfig != null) {
OrderedProperties orderedProperties = new OrderedProperties();
controllerQuorumFetchTimeout = orderedProperties.addStringPairs(desiredConfig).asMap().getOrDefault(CONTROLLER_QUORUM_FETCH_TIMEOUT_MS_CONFIG_NAME, CONTROLLER_QUORUM_FETCH_TIMEOUT_MS_CONFIG_DEFAULT);
}

restartContext.quorumCheck = quorumCheck(controllerAdminClient, Long.parseLong(controllerQuorumFetchTimeout));
} else {
//TODO When https://github.com/strimzi/strimzi-kafka-operator/issues/9692 is complete
// we should change this logic to immediately restart this pod because we cannot connect to it.
if (isBroker) {
// If it is a combined node (controller and broker) and the admin client cannot be initialised,
// restart this pod. There is no reason to continue as we won't be able to
// connect an admin client to this pod for other checks later.
LOGGER.infoCr(reconciliation, "KafkaQuorumCheck cannot be initialised for {} because none of the brokers do not seem to responding to connection attempts. " +
"Restarting pod because it is a combined node so it is one of the brokers that is not responding.", nodeRef);
reasonToRestartPod.add(RestartReason.POD_UNRESPONSIVE);
markRestartContextWithForceRestart(restartContext);
return;
} else {
// If it is a controller only node throw an UnforceableProblem, so we try again until the backOff
// is finished, then it will move on to the next controller and eventually the brokers.
throw new UnforceableProblem("KafkaQuorumCheck cannot be initialised for " + nodeRef + " because none of the brokers do not seem to responding to connection attempts");
}
// if it is a pure controller, initialise the admin client specifically for controllers
if (isController && !isBroker) {
if (!maybeInitControllerAdminClient()) {
handleFailedAdminClientForController(nodeRef, restartContext, reasonToRestartPod);
return;
}
restartContext.quorumCheck = quorumCheck(controllerAdminClient, nodeRef);
}

if (isBroker) {
Expand All @@ -646,6 +624,11 @@ private void checkIfRestartOrReconfigureRequired(NodeRef nodeRef, boolean isCont
return;
}

// If it is a mixed node, initialise quorum check with the broker admin client
if (isController) {
restartContext.quorumCheck = quorumCheck(brokerAdminClient, nodeRef);
}

// Always get the broker config. This request gets sent to that specific broker, so it's a proof that we can
// connect to the broker and that it's capable of responding.
Config brokerConfig;
Expand Down Expand Up @@ -691,6 +674,21 @@ private void checkIfRestartOrReconfigureRequired(NodeRef nodeRef, boolean isCont
restartContext.brokerLoggingDiff = brokerLoggingDiff;
}

private void handleFailedAdminClientForController(NodeRef nodeRef, RestartContext restartContext, RestartReasons reasonToRestartPod) throws UnforceableProblem {
if (KafkaVersion.compareDottedVersions(kafkaVersion.version(), "3.9.0") >= 0) {
// If the version supports talking to controllers, force restart this pod when the admin client cannot be initialised.
// There is no reason to continue as we won't be able to connect an admin client to this pod for other checks later.
LOGGER.infoCr(reconciliation, "KafkaQuorumCheck cannot be initialised for {} because none of the controllers do not seem to responding to connection attempts.", nodeRef);
reasonToRestartPod.add(RestartReason.POD_UNRESPONSIVE);
markRestartContextWithForceRestart(restartContext);
} else {
// If the version does not support talking to controllers, the admin client should be connecting to the broker nodes.
// Since connection to the brokers failed, throw an UnforceableProblem so that broker nodes can be checked later
// which may potentially resolve the connection issue.
throw new UnforceableProblem("KafkaQuorumCheck cannot be initialised for " + nodeRef + " because none of the brokers do not seem to responding to connection attempts");
}
}

/**
* Returns a config of the given broker.
* @param nodeRef The reference of the broker.
Expand Down Expand Up @@ -905,34 +903,48 @@ protected Future<Void> restart(Pod pod, RestartContext restartContext) {
* Returns an AdminClient instance bootstrapped from the given nodes. If nodes is an
* empty set, use the brokers service to bootstrap the client.
*/
/* test */ Admin adminClient(Set<NodeRef> nodes, boolean ceShouldBeFatal) throws ForceableProblem, FatalProblem {
// If no nodes are passed initialize the admin client using the brokers service
// TODO when https://github.com/strimzi/strimzi-kafka-operator/issues/9692 is completed review whether
// this function can be reverted to expect nodes to be non empty
/* test */ Admin brokerAdminClient(Set<NodeRef> nodes) throws ForceableProblem, FatalProblem {
// If no nodes are passed, initialize the admin client using the bootstrap service
// This is still needed for versions older than 3.9.0, so that when only controller nodes being rolled,
// it can use brokers to get quorum information via AdminClient.
String bootstrapHostnames;
if (nodes.isEmpty()) {
bootstrapHostnames = String.format("%s:%s", DnsNameGenerator.of(namespace, KafkaResources.bootstrapServiceName(cluster)).serviceDnsName(), KafkaCluster.REPLICATION_PORT);
} else {
bootstrapHostnames = nodes.stream().map(node -> DnsNameGenerator.podDnsName(namespace, KafkaResources.brokersServiceName(cluster), node.podName()) + ":" + KafkaCluster.REPLICATION_PORT).collect(Collectors.joining(","));
bootstrapHostnames = nodes.stream().filter(NodeRef::broker).map(node -> DnsNameGenerator.podDnsName(namespace, KafkaResources.brokersServiceName(cluster), node.podName()) + ":" + KafkaCluster.REPLICATION_PORT).collect(Collectors.joining(","));
}

try {
LOGGER.debugCr(reconciliation, "Creating AdminClient for {}", bootstrapHostnames);
return adminClientProvider.createAdminClient(bootstrapHostnames, coTlsPemIdentity.pemTrustSet(), coTlsPemIdentity.pemAuthIdentity());
} catch (KafkaException e) {
if (ceShouldBeFatal && (e instanceof ConfigException
|| e.getCause() instanceof ConfigException)) {
throw new FatalProblem("An error while try to create an admin client with bootstrap brokers " + bootstrapHostnames, e);
} else {
throw new ForceableProblem("An error while try to create an admin client with bootstrap brokers " + bootstrapHostnames, e);
}
} catch (RuntimeException e) {
throw new ForceableProblem("An error while try to create an admin client with bootstrap brokers " + bootstrapHostnames, e);
}
}

/* test */ KafkaQuorumCheck quorumCheck(Admin ac, long controllerQuorumFetchTimeoutMs) {
return new KafkaQuorumCheck(reconciliation, ac, vertx, controllerQuorumFetchTimeoutMs);
/**
* Returns an AdminClient instance bootstrapped from the given controller nodes.
*/
/* test */ Admin controllerAdminClient(Set<NodeRef> nodes) throws ForceableProblem, FatalProblem {
String bootstrapHostnames = nodes.stream().filter(NodeRef::controller).map(node -> DnsNameGenerator.podDnsName(namespace, KafkaResources.brokersServiceName(cluster), node.podName()) + ":" + KafkaCluster.CONTROLPLANE_PORT).collect(Collectors.joining(","));

try {
LOGGER.debugCr(reconciliation, "Creating AdminClient for {}", bootstrapHostnames);
return adminClientProvider.createControllerAdminClient(bootstrapHostnames, coTlsPemIdentity.pemTrustSet(), coTlsPemIdentity.pemAuthIdentity());
} catch (RuntimeException e) {
throw new ForceableProblem("An error while try to create an admin client with bootstrap controllers " + bootstrapHostnames, e);
}
}

/* test */ KafkaQuorumCheck quorumCheck(Admin ac, NodeRef nodeRef) {
String controllerQuorumFetchTimeout = CONTROLLER_QUORUM_FETCH_TIMEOUT_MS_CONFIG_DEFAULT;
String desiredConfig = kafkaConfigProvider.apply(nodeRef.nodeId());

if (desiredConfig != null) {
OrderedProperties orderedProperties = new OrderedProperties();
controllerQuorumFetchTimeout = orderedProperties.addStringPairs(desiredConfig).asMap().getOrDefault(CONTROLLER_QUORUM_FETCH_TIMEOUT_MS_CONFIG_NAME, CONTROLLER_QUORUM_FETCH_TIMEOUT_MS_CONFIG_DEFAULT);
}
return new KafkaQuorumCheck(reconciliation, ac, vertx, Long.parseLong(controllerQuorumFetchTimeout));
}

/* test */ KafkaAvailability availability(Admin ac) {
Expand Down Expand Up @@ -975,7 +987,7 @@ int controller(NodeRef nodeRef, long timeout, TimeUnit unit, RestartContext rest
// This is tracked in https://github.com/strimzi/strimzi-kafka-operator/issues/9373.
// Use admin client connected directly to this broker here, then any exception or timeout trying to connect to
// the current node will be caught and handled from this method, rather than appearing elsewhere.
try (Admin ac = adminClient(Set.of(nodeRef), false)) {
try (Admin ac = brokerAdminClient(Set.of(nodeRef))) {
Node controllerNode = null;

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -518,10 +518,20 @@ public Admin createAdminClient(String bootstrapHostnames, PemTrustSet kafkaCaTru
return createAdminClient(bootstrapHostnames, kafkaCaTrustSet, authIdentity, new Properties());
}

@Override
public Admin createControllerAdminClient(String controllerBootstrapHostnames, PemTrustSet kafkaCaTrustSet, PemAuthIdentity authIdentity) {
return createControllerAdminClient(controllerBootstrapHostnames, kafkaCaTrustSet, authIdentity, new Properties());
}

@Override
public Admin createAdminClient(String bootstrapHostnames, PemTrustSet kafkaCaTrustSet, PemAuthIdentity authIdentity, Properties config) {
return mockAdminClient;
}

@Override
public Admin createControllerAdminClient(String controllerBootstrapHostnames, PemTrustSet kafkaCaTrustSet, PemAuthIdentity authIdentity, Properties config) {
return mockAdminClient;
}
};
}

Expand Down
Loading

0 comments on commit 99cc9e2

Please sign in to comment.