diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaAdmin.java b/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaAdmin.java index 92150cccd7..c460f1b53b 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaAdmin.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaAdmin.java @@ -213,6 +213,14 @@ public void setClusterId(String clusterId) { this.clusterId = clusterId; } + /** + * Get the clusterId property. + * @since 3.3.0 + */ + public String getClusterId() { + return clusterId; + } + @Override public Map getConfigurationProperties() { Map configs2 = new HashMap<>(this.configs); diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java b/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java index a1e6043310..c69f40229a 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java @@ -35,6 +35,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.KafkaAdminClient; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerGroupMetadata; @@ -485,13 +486,17 @@ public void afterSingletonsInstantiated() { if (this.kafkaAdmin != null) { Object producerServers = this.producerFactory.getConfigurationProperties() .get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG); - String adminServers = this.kafkaAdmin.getBootstrapServers(); + String adminServers = getAdminBootstrapAddress(); if (!producerServers.equals(adminServers)) { Map props = new HashMap<>(this.kafkaAdmin.getConfigurationProperties()); props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, producerServers); int opTo = this.kafkaAdmin.getOperationTimeout(); + String clusterId = this.kafkaAdmin.getClusterId(); this.kafkaAdmin = new KafkaAdmin(props); this.kafkaAdmin.setOperationTimeout(opTo); + if (clusterId != null && !clusterId.isEmpty()) { + this.kafkaAdmin.setClusterId(clusterId); + } } } } @@ -501,6 +506,21 @@ else if (this.micrometerEnabled) { } } + private String getAdminBootstrapAddress() { + // Retrieve bootstrap servers from KafkaAdmin bootstrap supplier if available + String adminServers = this.kafkaAdmin.getBootstrapServers(); + + // Fallback to configuration properties if bootstrap servers are not set + if (adminServers == null) { + adminServers = this.kafkaAdmin.getConfigurationProperties().getOrDefault( + AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, + "" + ).toString(); + } + + return adminServers; + } + @Nullable private String clusterId() { if (this.kafkaAdmin != null && this.clusterId == null) {