Skip to content

Commit

Permalink
Topics: Fix config fetch for Azure EH (#105)
Browse files Browse the repository at this point in the history
Co-authored-by: Aleksei Zotov <azotcsit@gmail.com>
  • Loading branch information
Haarolean and azotcsit authored Feb 17, 2024
1 parent e280c8e commit ccf3103
Showing 1 changed file with 6 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.SecurityDisabledException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.errors.UnknownServerException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.requests.DescribeLogDirsResponse;
Expand Down Expand Up @@ -246,8 +247,10 @@ public Mono<Map<String, List<ConfigEntry>>> getTopicsConfig() {
return listTopics(true).flatMap(topics -> getTopicsConfig(topics, false));
}

//NOTE: skips not-found topics (for which UnknownTopicOrPartitionException was thrown by AdminClient)
//and topics for which DESCRIBE_CONFIGS permission is not set (TopicAuthorizationException was thrown)
/*
NOTE: skips not-found topics (for which UnknownTopicOrPartitionException or UnknownServerException was thrown by
AdminClient) and topics for which DESCRIBE_CONFIGS permission is not set (TopicAuthorizationException was thrown)
*/
public Mono<Map<String, List<ConfigEntry>>> getTopicsConfig(Collection<String> topicNames, boolean includeDoc) {
var includeDocFixed = includeDoc && getClusterFeatures().contains(SupportedFeature.CONFIG_DOCUMENTATION_RETRIEVAL);
// we need to partition calls, because it can lead to AdminClient timeouts in case of large topics count
Expand All @@ -269,6 +272,7 @@ private Mono<Map<String, List<ConfigEntry>>> getTopicsConfigImpl(Collection<Stri
resources,
new DescribeConfigsOptions().includeSynonyms(true).includeDocumentation(includeDoc)).values(),
UnknownTopicOrPartitionException.class,
UnknownServerException.class,
TopicAuthorizationException.class
).map(config -> config.entrySet().stream()
.collect(toMap(
Expand Down

0 comments on commit ccf3103

Please sign in to comment.