diff --git a/spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaBroker.java b/spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaBroker.java index 588735a91d..7d700a4390 100644 --- a/spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaBroker.java +++ b/spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaBroker.java @@ -89,6 +89,14 @@ default void afterPropertiesSet() { */ EmbeddedKafkaBroker brokerListProperty(String brokerListProperty); + /** + * Set the timeout in seconds for admin operations (e.g. topic creation, close). + * @param adminTimeout the timeout. + * @return the {@link EmbeddedKafkaBroker} + * @since 2.8.5 + */ + EmbeddedKafkaBroker adminTimeout(int adminTimeout); + /** * Get the bootstrap server addresses as a String. * @return the bootstrap servers. diff --git a/spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaBrokerFactory.java b/spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaBrokerFactory.java new file mode 100644 index 0000000000..d04c6dd1f1 --- /dev/null +++ b/spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaBrokerFactory.java @@ -0,0 +1,146 @@ +/* + * Copyright 2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.test; + +import java.io.IOException; +import java.io.InputStream; +import java.io.StringReader; +import java.util.Arrays; +import java.util.Map; +import java.util.Properties; +import java.util.function.Function; + +import org.springframework.core.io.Resource; +import org.springframework.core.io.support.PathMatchingResourcePatternResolver; +import org.springframework.kafka.test.context.EmbeddedKafka; +import org.springframework.util.StringUtils; + +/** + * The factory to encapsulate an {@link EmbeddedKafkaBroker} creation logic. + * + * @author Artem Bilan + * + * @since 3.2.6 + */ +public final class EmbeddedKafkaBrokerFactory { + + private static final String TRANSACTION_STATE_LOG_REPLICATION_FACTOR = "transaction.state.log.replication.factor"; + + /** + * Create an {@link EmbeddedKafkaBroker} based on the {@code EmbeddedKafka} annotation. + * @param embeddedKafka the {@code EmbeddedKafka} annotation. + * @return a new {@link EmbeddedKafkaBroker} instance. + */ + public static EmbeddedKafkaBroker create(EmbeddedKafka embeddedKafka) { + return create(embeddedKafka, Function.identity()); + } + + /** + * Create an {@link EmbeddedKafkaBroker} based on the {@code EmbeddedKafka} annotation. + * @param embeddedKafka the {@code EmbeddedKafka} annotation. + * @param propertyResolver the {@link Function} for placeholders in the annotation attributes. + * @return a new {@link EmbeddedKafkaBroker} instance. + */ + @SuppressWarnings("unchecked") + public static EmbeddedKafkaBroker create(EmbeddedKafka embeddedKafka, Function propertyResolver) { + String[] topics = + Arrays.stream(embeddedKafka.topics()) + .map(propertyResolver) + .toArray(String[]::new); + + EmbeddedKafkaBroker embeddedKafkaBroker; + if (embeddedKafka.kraft()) { + embeddedKafkaBroker = kraftBroker(embeddedKafka, topics); + } + else { + embeddedKafkaBroker = zkBroker(embeddedKafka, topics); + } + int[] ports = setupPorts(embeddedKafka); + + embeddedKafkaBroker.kafkaPorts(ports) + .adminTimeout(embeddedKafka.adminTimeout()); + + Properties properties = new Properties(); + + for (String pair : embeddedKafka.brokerProperties()) { + if (!StringUtils.hasText(pair)) { + continue; + } + try { + properties.load(new StringReader(propertyResolver.apply(pair))); + } + catch (Exception ex) { + throw new IllegalStateException("Failed to load broker property from [" + pair + "]", ex); + } + } + + String brokerPropertiesLocation = embeddedKafka.brokerPropertiesLocation(); + if (StringUtils.hasText(brokerPropertiesLocation)) { + String propertiesLocation = propertyResolver.apply(brokerPropertiesLocation); + Resource propertiesResource = new PathMatchingResourcePatternResolver().getResource(propertiesLocation); + if (!propertiesResource.exists()) { + throw new IllegalStateException( + "Failed to load broker properties from [" + propertiesResource + "]: resource does not exist."); + } + try (InputStream in = propertiesResource.getInputStream()) { + Properties p = new Properties(); + p.load(in); + p.forEach((key, value) -> properties.putIfAbsent(key, propertyResolver.apply((String) value))); + } + catch (IOException ex) { + throw new IllegalStateException("Failed to load broker properties from [" + propertiesResource + "]", ex); + } + } + + properties.putIfAbsent(TRANSACTION_STATE_LOG_REPLICATION_FACTOR, + String.valueOf(Math.min(3, embeddedKafka.count()))); + + embeddedKafkaBroker.brokerProperties((Map) (Map) properties); + String bootstrapServersProperty = embeddedKafka.bootstrapServersProperty(); + if (StringUtils.hasText(bootstrapServersProperty)) { + embeddedKafkaBroker.brokerListProperty(bootstrapServersProperty); + } + + // Safe to start an embedded broker eagerly before context refresh + embeddedKafkaBroker.afterPropertiesSet(); + + return embeddedKafkaBroker; + } + + private static int[] setupPorts(EmbeddedKafka embedded) { + int[] ports = embedded.ports(); + if (embedded.count() > 1 && ports.length == 1 && ports[0] == 0) { + ports = new int[embedded.count()]; + } + return ports; + } + + private static EmbeddedKafkaBroker kraftBroker(EmbeddedKafka embedded, String[] topics) { + return new EmbeddedKafkaKraftBroker(embedded.count(), embedded.partitions(), topics); + } + + private static EmbeddedKafkaBroker zkBroker(EmbeddedKafka embedded, String[] topics) { + return new EmbeddedKafkaZKBroker(embedded.count(), embedded.controlledShutdown(), embedded.partitions(), topics) + .zkPort(embedded.zookeeperPort()) + .zkConnectionTimeout(embedded.zkConnectionTimeout()) + .zkSessionTimeout(embedded.zkSessionTimeout()); + } + + private EmbeddedKafkaBrokerFactory() { + } + +} diff --git a/spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaKraftBroker.java b/spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaKraftBroker.java index 35f80813c6..77b14b8a08 100644 --- a/spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaKraftBroker.java +++ b/spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaKraftBroker.java @@ -176,12 +176,7 @@ public EmbeddedKafkaBroker brokerListProperty(String brokerListProperty) { return this; } - /** - * Set the timeout in seconds for admin operations (e.g. topic creation, close). - * @param adminTimeout the timeout. - * @return the {@link EmbeddedKafkaKraftBroker} - * @since 2.8.5 - */ + @Override public EmbeddedKafkaBroker adminTimeout(int adminTimeout) { this.adminTimeout = Duration.ofSeconds(adminTimeout); return this; diff --git a/spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaZKBroker.java b/spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaZKBroker.java index a9b070a7d2..a12eac978b 100644 --- a/spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaZKBroker.java +++ b/spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaZKBroker.java @@ -250,12 +250,7 @@ public void setZkPort(int zkPort) { this.zkPort = zkPort; } - /** - * Set the timeout in seconds for admin operations (e.g. topic creation, close). - * @param adminTimeout the timeout. - * @return the {@link EmbeddedKafkaBroker} - * @since 2.8.5 - */ + @Override public EmbeddedKafkaBroker adminTimeout(int adminTimeout) { this.adminTimeout = Duration.ofSeconds(adminTimeout); return this; diff --git a/spring-kafka-test/src/main/java/org/springframework/kafka/test/condition/EmbeddedKafkaCondition.java b/spring-kafka-test/src/main/java/org/springframework/kafka/test/condition/EmbeddedKafkaCondition.java index a5b779b021..f436c9b7b0 100644 --- a/spring-kafka-test/src/main/java/org/springframework/kafka/test/condition/EmbeddedKafkaCondition.java +++ b/spring-kafka-test/src/main/java/org/springframework/kafka/test/condition/EmbeddedKafkaCondition.java @@ -16,14 +16,9 @@ package org.springframework.kafka.test.condition; -import java.io.IOException; -import java.io.InputStream; -import java.io.StringReader; import java.lang.reflect.AnnotatedElement; import java.util.Arrays; -import java.util.Map; import java.util.Optional; -import java.util.Properties; import org.junit.jupiter.api.extension.AfterAllCallback; import org.junit.jupiter.api.extension.ConditionEvaluationResult; @@ -37,15 +32,11 @@ import org.junit.jupiter.api.extension.ParameterResolver; import org.springframework.core.annotation.AnnotatedElementUtils; -import org.springframework.core.io.Resource; -import org.springframework.core.io.support.PathMatchingResourcePatternResolver; import org.springframework.kafka.test.EmbeddedKafkaBroker; -import org.springframework.kafka.test.EmbeddedKafkaKraftBroker; -import org.springframework.kafka.test.EmbeddedKafkaZKBroker; +import org.springframework.kafka.test.EmbeddedKafkaBrokerFactory; import org.springframework.kafka.test.context.EmbeddedKafka; import org.springframework.test.context.junit.jupiter.SpringExtension; import org.springframework.util.Assert; -import org.springframework.util.StringUtils; /** * JUnit5 condition for an embedded broker. @@ -117,89 +108,22 @@ public ConditionEvaluationResult evaluateExecutionCondition(ExtensionContext con private boolean springTestContext(AnnotatedElement annotatedElement) { return AnnotatedElementUtils.findAllMergedAnnotations(annotatedElement, ExtendWith.class) .stream() - .filter(extended -> Arrays.asList(extended.value()).contains(SpringExtension.class)) - .findFirst() - .isPresent(); + .map(ExtendWith::value) + .flatMap(Arrays::stream) + .anyMatch(SpringExtension.class::isAssignableFrom); } - @SuppressWarnings("unchecked") private EmbeddedKafkaBroker createBroker(EmbeddedKafka embedded) { - int[] ports = setupPorts(embedded); - EmbeddedKafkaBroker broker; - if (embedded.kraft()) { - broker = kraftBroker(embedded, ports); - } - else { - broker = zkBroker(embedded, ports); - } - Properties properties = new Properties(); - - for (String pair : embedded.brokerProperties()) { - if (!StringUtils.hasText(pair)) { - continue; - } - try { - properties.load(new StringReader(pair)); - } - catch (Exception ex) { - throw new IllegalStateException("Failed to load broker property from [" + pair + "]", - ex); - } - } - if (StringUtils.hasText(embedded.brokerPropertiesLocation())) { - Resource propertiesResource = new PathMatchingResourcePatternResolver() - .getResource(embedded.brokerPropertiesLocation()); - if (!propertiesResource.exists()) { - throw new IllegalStateException( - "Failed to load broker properties from [" + propertiesResource - + "]: resource does not exist."); - } - try (InputStream in = propertiesResource.getInputStream()) { - Properties p = new Properties(); - p.load(in); - p.forEach(properties::putIfAbsent); - } - catch (IOException ex) { - throw new IllegalStateException( - "Failed to load broker properties from [" + propertiesResource + "]", ex); - } - } - broker.brokerProperties((Map) (Map) properties); - if (StringUtils.hasText(embedded.bootstrapServersProperty())) { - broker.brokerListProperty(embedded.bootstrapServersProperty()); - } - broker.afterPropertiesSet(); - return broker; - } - - private EmbeddedKafkaBroker kraftBroker(EmbeddedKafka embedded, int[] ports) { - return new EmbeddedKafkaKraftBroker(embedded.count(), embedded.partitions(), embedded.topics()) - .kafkaPorts(ports) - .adminTimeout(embedded.adminTimeout()); - } - - private EmbeddedKafkaBroker zkBroker(EmbeddedKafka embedded, int[] ports) { - return new EmbeddedKafkaZKBroker(embedded.count(), embedded.controlledShutdown(), - embedded.partitions(), embedded.topics()) - .zkPort(embedded.zookeeperPort()) - .kafkaPorts(ports) - .zkConnectionTimeout(embedded.zkConnectionTimeout()) - .zkSessionTimeout(embedded.zkSessionTimeout()) - .adminTimeout(embedded.adminTimeout()); - } - - private int[] setupPorts(EmbeddedKafka embedded) { - int[] ports = embedded.ports(); - if (embedded.count() > 1 && ports.length == 1 && ports[0] == 0) { - ports = new int[embedded.count()]; - } - return ports; + return EmbeddedKafkaBrokerFactory.create(embedded); } private EmbeddedKafkaBroker getBrokerFromStore(ExtensionContext context) { - return getParentStore(context).get(EMBEDDED_BROKER, EmbeddedKafkaBroker.class) == null + EmbeddedKafkaBroker embeddedKafkaBrokerFromParentStore = + getParentStore(context) + .get(EMBEDDED_BROKER, EmbeddedKafkaBroker.class); + return embeddedKafkaBrokerFromParentStore == null ? getStore(context).get(EMBEDDED_BROKER, EmbeddedKafkaBroker.class) - : getParentStore(context).get(EMBEDDED_BROKER, EmbeddedKafkaBroker.class); + : embeddedKafkaBrokerFromParentStore; } private Store getStore(ExtensionContext context) { diff --git a/spring-kafka-test/src/main/java/org/springframework/kafka/test/context/EmbeddedKafkaContextCustomizer.java b/spring-kafka-test/src/main/java/org/springframework/kafka/test/context/EmbeddedKafkaContextCustomizer.java index 7a4d82a57a..26b7521ca1 100644 --- a/spring-kafka-test/src/main/java/org/springframework/kafka/test/context/EmbeddedKafkaContextCustomizer.java +++ b/spring-kafka-test/src/main/java/org/springframework/kafka/test/context/EmbeddedKafkaContextCustomizer.java @@ -16,27 +16,17 @@ package org.springframework.kafka.test.context; -import java.io.IOException; -import java.io.InputStream; -import java.io.StringReader; -import java.util.Arrays; -import java.util.Map; -import java.util.Properties; - import org.springframework.beans.factory.config.ConfigurableListableBeanFactory; import org.springframework.beans.factory.support.BeanDefinitionRegistry; import org.springframework.beans.factory.support.DefaultSingletonBeanRegistry; import org.springframework.beans.factory.support.RootBeanDefinition; import org.springframework.context.ConfigurableApplicationContext; import org.springframework.core.env.ConfigurableEnvironment; -import org.springframework.core.io.Resource; import org.springframework.kafka.test.EmbeddedKafkaBroker; -import org.springframework.kafka.test.EmbeddedKafkaKraftBroker; -import org.springframework.kafka.test.EmbeddedKafkaZKBroker; +import org.springframework.kafka.test.EmbeddedKafkaBrokerFactory; import org.springframework.test.context.ContextCustomizer; import org.springframework.test.context.MergedContextConfiguration; import org.springframework.util.Assert; -import org.springframework.util.StringUtils; /** * The {@link ContextCustomizer} implementation for the {@link EmbeddedKafkaBroker} bean registration. @@ -55,97 +45,24 @@ class EmbeddedKafkaContextCustomizer implements ContextCustomizer { private final EmbeddedKafka embeddedKafka; - private final String TRANSACTION_STATE_LOG_REPLICATION_FACTOR = "transaction.state.log.replication.factor"; - EmbeddedKafkaContextCustomizer(EmbeddedKafka embeddedKafka) { this.embeddedKafka = embeddedKafka; } @Override - @SuppressWarnings("unchecked") public void customizeContext(ConfigurableApplicationContext context, MergedContextConfiguration mergedConfig) { ConfigurableListableBeanFactory beanFactory = context.getBeanFactory(); Assert.isInstanceOf(DefaultSingletonBeanRegistry.class, beanFactory); ConfigurableEnvironment environment = context.getEnvironment(); - String[] topics = - Arrays.stream(this.embeddedKafka.topics()) - .map(environment::resolvePlaceholders) - .toArray(String[]::new); - - int[] ports = setupPorts(); - EmbeddedKafkaBroker embeddedKafkaBroker; - if (this.embeddedKafka.kraft()) { - embeddedKafkaBroker = new EmbeddedKafkaKraftBroker(this.embeddedKafka.count(), - this.embeddedKafka.partitions(), - topics) - .kafkaPorts(ports); - } - else { - embeddedKafkaBroker = new EmbeddedKafkaZKBroker(this.embeddedKafka.count(), - this.embeddedKafka.controlledShutdown(), - this.embeddedKafka.partitions(), - topics) - .kafkaPorts(ports) - .zkPort(this.embeddedKafka.zookeeperPort()) - .zkConnectionTimeout(this.embeddedKafka.zkConnectionTimeout()) - .zkSessionTimeout(this.embeddedKafka.zkSessionTimeout()); - } - - Properties properties = new Properties(); - - for (String pair : this.embeddedKafka.brokerProperties()) { - if (!StringUtils.hasText(pair)) { - continue; - } - try { - properties.load(new StringReader(environment.resolvePlaceholders(pair))); - } - catch (Exception ex) { - throw new IllegalStateException("Failed to load broker property from [" + pair + "]", ex); - } - } - - if (StringUtils.hasText(this.embeddedKafka.brokerPropertiesLocation())) { - String propertiesLocation = environment.resolvePlaceholders(this.embeddedKafka.brokerPropertiesLocation()); - Resource propertiesResource = context.getResource(propertiesLocation); - if (!propertiesResource.exists()) { - throw new IllegalStateException( - "Failed to load broker properties from [" + propertiesResource + "]: resource does not exist."); - } - try (InputStream in = propertiesResource.getInputStream()) { - Properties p = new Properties(); - p.load(in); - p.forEach((key, value) -> properties.putIfAbsent(key, environment.resolvePlaceholders((String) value))); - } - catch (IOException ex) { - throw new IllegalStateException("Failed to load broker properties from [" + propertiesResource + "]", ex); - } - } - - properties.putIfAbsent(TRANSACTION_STATE_LOG_REPLICATION_FACTOR, String.valueOf(Math.min(3, embeddedKafka.count()))); - - embeddedKafkaBroker.brokerProperties((Map) (Map) properties); - if (StringUtils.hasText(this.embeddedKafka.bootstrapServersProperty())) { - embeddedKafkaBroker.brokerListProperty(this.embeddedKafka.bootstrapServersProperty()); - } - - // Safe to start an embedded broker eagerly before context refresh - embeddedKafkaBroker.afterPropertiesSet(); + EmbeddedKafkaBroker embeddedKafkaBroker = + EmbeddedKafkaBrokerFactory.create(this.embeddedKafka, environment::resolvePlaceholders); ((BeanDefinitionRegistry) beanFactory).registerBeanDefinition(EmbeddedKafkaBroker.BEAN_NAME, new RootBeanDefinition(EmbeddedKafkaBroker.class, () -> embeddedKafkaBroker)); } - private int[] setupPorts() { - int[] ports = this.embeddedKafka.ports(); - if (this.embeddedKafka.count() > 1 && ports.length == 1 && ports[0] == 0) { - ports = new int[this.embeddedKafka.count()]; - } - return ports; - } - @Override public int hashCode() { return this.embeddedKafka.hashCode(); diff --git a/spring-kafka-test/src/test/java/org/springframework/kafka/test/condition/EmbeddedKafkaConditionTests.java b/spring-kafka-test/src/test/java/org/springframework/kafka/test/condition/EmbeddedKafkaConditionTests.java index 4a0f0dcde9..5647227e57 100644 --- a/spring-kafka-test/src/test/java/org/springframework/kafka/test/condition/EmbeddedKafkaConditionTests.java +++ b/spring-kafka-test/src/test/java/org/springframework/kafka/test/condition/EmbeddedKafkaConditionTests.java @@ -16,6 +16,8 @@ package org.springframework.kafka.test.condition; +import java.time.Duration; + import org.junit.jupiter.api.Test; import org.springframework.kafka.test.EmbeddedKafkaBroker; @@ -28,12 +30,13 @@ /** * @author Gary Russell * @author MichaƂ Padula + * @author Artem Bilan * * @since 2.3 * */ @EmbeddedKafka(bootstrapServersProperty = "my.bss.property", count = 2, controlledShutdown = true, partitions = 3, - kraft = false) + adminTimeout = 67) public class EmbeddedKafkaConditionTests { @Test @@ -41,6 +44,7 @@ public void test(EmbeddedKafkaBroker broker) { assertThat(broker.getBrokersAsString()).isNotNull(); assertThat(KafkaTestUtils.getPropertyValue(broker, "brokerListProperty")).isEqualTo("my.bss.property"); assertThat(KafkaTestUtils.getPropertyValue(broker, "controlledShutdown")).isEqualTo(Boolean.TRUE); + assertThat(KafkaTestUtils.getPropertyValue(broker, "adminTimeout")).isEqualTo(Duration.ofSeconds(67)); assertThat(broker.getPartitionsPerTopic()).isEqualTo(3); } diff --git a/spring-kafka-test/src/test/java/org/springframework/kafka/test/context/EmbeddedKafkaContextCustomizerTests.java b/spring-kafka-test/src/test/java/org/springframework/kafka/test/context/EmbeddedKafkaContextCustomizerTests.java index 710fdfe914..536e23f8f1 100644 --- a/spring-kafka-test/src/test/java/org/springframework/kafka/test/context/EmbeddedKafkaContextCustomizerTests.java +++ b/spring-kafka-test/src/test/java/org/springframework/kafka/test/context/EmbeddedKafkaContextCustomizerTests.java @@ -16,6 +16,7 @@ package org.springframework.kafka.test.context; +import java.time.Duration; import java.util.Map; import org.junit.jupiter.api.BeforeEach; @@ -80,6 +81,8 @@ void testPorts() { .isEqualTo("127.0.0.1:" + annotationWithPorts.ports()[0]); assertThat(KafkaTestUtils.getPropertyValue(embeddedKafkaBroker, "brokerListProperty")) .isEqualTo("my.bss.prop"); + assertThat(KafkaTestUtils.getPropertyValue(embeddedKafkaBroker, "adminTimeout")) + .isEqualTo(Duration.ofSeconds(33)); } @Test @@ -121,7 +124,7 @@ private static final class SecondTestWithEmbeddedKafka { } - @EmbeddedKafka(kraft = false, ports = 8085, bootstrapServersProperty = "my.bss.prop") + @EmbeddedKafka(kraft = false, ports = 8085, bootstrapServersProperty = "my.bss.prop", adminTimeout = 33) private static final class TestWithEmbeddedKafkaPorts { }