Skip to content

Commit

Permalink
spring-projectsGH-2430: Fix Unnecessary describeCluster()
Browse files Browse the repository at this point in the history
Resolves spring-projects#2430

Do not obtain the clusterId unless explicitly needed for observation.
  • Loading branch information
garyrussell committed Oct 8, 2022
1 parent 7290bc1 commit b4c1fe3
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -916,21 +916,22 @@ else if (listener instanceof MessageListener) {
this.lastAlertPartition = new HashMap<>();
this.wasIdlePartition = new HashMap<>();
this.kafkaAdmin = obtainAdmin();
obtainClusterId();
}

@Nullable
private KafkaAdmin obtainAdmin() {
ApplicationContext applicationContext = getApplicationContext();
if (applicationContext != null) {
return applicationContext.getBeanProvider(KafkaAdmin.class).getIfUnique();
if (this.containerProperties.isObservationEnabled()) {
ApplicationContext applicationContext = getApplicationContext();
if (applicationContext != null) {
return applicationContext.getBeanProvider(KafkaAdmin.class).getIfUnique();
}
}
return null;
}

@Nullable
private String clusterId() {
if (this.clusterId == null && this.kafkaAdmin != null) {
if (this.kafkaAdmin != null && this.clusterId == null) {
obtainClusterId();
}
return this.clusterId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
package org.springframework.kafka.annotation;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;

import java.util.ArrayList;
import java.util.Collection;
Expand All @@ -28,6 +31,7 @@
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.BytesDeserializer;
Expand All @@ -44,6 +48,7 @@
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.BatchListenerFailedException;
Expand Down Expand Up @@ -110,7 +115,7 @@ private void doTest(Listener listener, String topic) throws InterruptedException
}

@Test
public void testBatchOfPojoMessages() throws Exception {
public void testBatchOfPojoMessages(@Autowired KafkaAdmin admin) throws Exception {
String topic = "blc3";
this.template.send(new GenericMessage<>(
new Foo("bar"), Collections.singletonMap(KafkaHeaders.TOPIC, topic)));
Expand All @@ -119,6 +124,7 @@ public void testBatchOfPojoMessages() throws Exception {
assertThat(listener.received.size()).isGreaterThan(0);
assertThat(listener.received.get(0).getPayload()).isInstanceOf(Foo.class);
assertThat(listener.received.get(0).getPayload().getBar()).isEqualTo("bar");
verify(admin, never()).clusterId();
}

@Test
Expand Down Expand Up @@ -152,6 +158,11 @@ void conversionError() throws InterruptedException {
@EnableKafka
public static class Config {

@Bean
KafkaAdmin admin(EmbeddedKafkaBroker broker) {
return spy(new KafkaAdmin(Map.of(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, broker.getBrokersAsString())));
}

@Bean
public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory(EmbeddedKafkaBroker embeddedKafka,
KafkaTemplate<Integer, Object> template) {
Expand Down

0 comments on commit b4c1fe3

Please sign in to comment.