From 20b4dd81d758c13a15de0c6e75aef405ad6041ef Mon Sep 17 00:00:00 2001 From: iliax Date: Wed, 12 Jul 2023 16:10:29 +0400 Subject: [PATCH 1/2] ISSUE-3820: ACL enablement check fixes: 1. Setting 1 hour ConfigRelatedInfo update duration 2. logging ACL checking on debug level --- .../kafka/ui/service/ReactiveAdminClient.java | 67 ++++++++++--------- 1 file changed, 37 insertions(+), 30 deletions(-) diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java index 0b6f16a2235..a6ead35750b 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java @@ -15,6 +15,8 @@ import com.provectus.kafka.ui.util.KafkaVersion; import com.provectus.kafka.ui.util.annotation.KafkaClientInternalsDependant; import java.io.Closeable; +import java.time.Duration; +import java.time.temporal.ChronoUnit; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; @@ -129,38 +131,41 @@ private record ConfigRelatedInfo(String version, Set features, boolean topicDeletionIsAllowed) { - private static Mono extract(AdminClient ac, int controllerId) { - return loadBrokersConfig(ac, List.of(controllerId)) - .map(map -> map.isEmpty() ? List.of() : map.get(controllerId)) - .flatMap(configs -> { - String version = "1.0-UNKNOWN"; - boolean topicDeletionEnabled = true; - for (ConfigEntry entry : configs) { - if (entry.name().contains("inter.broker.protocol.version")) { - version = entry.value(); - } - if (entry.name().equals("delete.topic.enable")) { - topicDeletionEnabled = Boolean.parseBoolean(entry.value()); - } - } - var builder = ConfigRelatedInfo.builder() - .version(version) - .topicDeletionIsAllowed(topicDeletionEnabled); - return SupportedFeature.forVersion(ac, version) - .map(features -> builder.features(features).build()); - }); + final static Duration UPDATE_DURATION = Duration.of(1, ChronoUnit.HOURS); + + private static Mono extract(AdminClient ac) { + return ReactiveAdminClient.describeClusterImpl(ac, Set.of()) + .flatMap(desc -> { + // choosing node from which we will get configs (starting with controller) + var targetNodeId = Optional.ofNullable(desc.controller) + .map(Node::id) + .orElse(desc.getNodes().iterator().next().id()); + return loadBrokersConfig(ac, List.of(targetNodeId)) + .map(map -> map.isEmpty() ? List.of() : map.get(targetNodeId)) + .flatMap(configs -> { + String version = "1.0-UNKNOWN"; + boolean topicDeletionEnabled = true; + for (ConfigEntry entry : configs) { + if (entry.name().contains("inter.broker.protocol.version")) { + version = entry.value(); + } + if (entry.name().equals("delete.topic.enable")) { + topicDeletionEnabled = Boolean.parseBoolean(entry.value()); + } + } + final String finalVersion = version; + final boolean finalTopicDeletionEnabled = topicDeletionEnabled; + return SupportedFeature.forVersion(ac, version) + .map(features -> new ConfigRelatedInfo(finalVersion, features, finalTopicDeletionEnabled)); + }); + }) + .cache(UPDATE_DURATION); } } public static Mono create(AdminClient adminClient) { - return describeClusterImpl(adminClient, Set.of()) - // choosing node from which we will get configs (starting with controller) - .flatMap(descr -> descr.controller != null - ? Mono.just(descr.controller) - : Mono.justOrEmpty(descr.nodes.stream().findFirst()) - ) - .flatMap(node -> ConfigRelatedInfo.extract(adminClient, node.id())) - .map(info -> new ReactiveAdminClient(adminClient, info)); + Mono configRelatedInfoMono = ConfigRelatedInfo.extract(adminClient); + return configRelatedInfoMono.map(info -> new ReactiveAdminClient(adminClient, configRelatedInfoMono, info)); } @@ -170,7 +175,7 @@ private static Mono isAuthorizedSecurityEnabled(AdminClient ac, @Nullab .doOnError(th -> !(th instanceof SecurityDisabledException) && !(th instanceof InvalidRequestException) && !(th instanceof UnsupportedVersionException), - th -> log.warn("Error checking if security enabled", th)) + th -> log.debug("Error checking if security enabled", th)) .onErrorReturn(false); } @@ -202,6 +207,8 @@ public static Mono toMono(KafkaFuture future) { @Getter(AccessLevel.PACKAGE) // visible for testing private final AdminClient client; + private final Mono configRelatedInfoMono; + private volatile ConfigRelatedInfo configRelatedInfo; public Set getClusterFeatures() { @@ -228,7 +235,7 @@ public Mono updateInternalStats(@Nullable Node controller) { if (controller == null) { return Mono.empty(); } - return ConfigRelatedInfo.extract(client, controller.id()) + return configRelatedInfoMono .doOnNext(info -> this.configRelatedInfo = info) .then(); } From 75c3880942e77773e42c881c90873871e000be32 Mon Sep 17 00:00:00 2001 From: iliax Date: Tue, 18 Jul 2023 11:23:38 +0400 Subject: [PATCH 2/2] checkstyle fix --- .../com/provectus/kafka/ui/service/ReactiveAdminClient.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java index a6ead35750b..9de908efa7f 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java @@ -131,7 +131,7 @@ private record ConfigRelatedInfo(String version, Set features, boolean topicDeletionIsAllowed) { - final static Duration UPDATE_DURATION = Duration.of(1, ChronoUnit.HOURS); + static final Duration UPDATE_DURATION = Duration.of(1, ChronoUnit.HOURS); private static Mono extract(AdminClient ac) { return ReactiveAdminClient.describeClusterImpl(ac, Set.of())