Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-194: Make KafkaEmbedded as a bean #294

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ project ('spring-kafka-test') {
description = 'Spring Kafka Test Support'

dependencies {
compile "org.springframework:spring-beans:$springVersion"
compile "org.springframework:spring-context:$springVersion"
compile "org.springframework:spring-test:$springVersion"
compile "org.springframework.retry:spring-retry:$springRetryVersion"

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Copyright 2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.kafka.test.context;

import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Inherited;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

import org.springframework.core.annotation.AliasFor;
import org.springframework.kafka.test.rule.KafkaEmbedded;

/**
* Annotation that can be specified on a test class that runs Spring Kafka based tests.
* Provides the following features over and above the regular <em>Spring TestContext
* Framework</em>:
* <ul>
* <li>Registers a {@link KafkaEmbedded} bean with the {@link KafkaEmbedded#BEAN_NAME} bean name.
* </li>
* </ul>
* <p>
* The typical usage of this annotation is like:
* <pre class="code">
* &#064;RunWith(SpringRunner.class)
* &#064;EmbeddedKafka
* public class MyKafkaTests {
*
* &#064;Autowired
* private KafkaEmbedded kafkaEmbedded;
*
* &#064;Value("${spring.embedded.kafka.brokers}")
* private String brokerAddresses;
* }
* </pre>
*
* @author Artem Bilan
*
* @since 2.0
*
* @see KafkaEmbedded
*/
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
public @interface EmbeddedKafka {

/**
* @return the number of brokers
*/
@AliasFor("count")
int value() default 1;

/**
* @return the number of brokers
*/
@AliasFor("value")
int count() default 1;

/**
* @return passed into {@code kafka.utils.TestUtils.createBrokerConfig())
*/
boolean controlledShutdown() default false;

/**
* @return partitions per topic.
*/
int partitions() default 2;

/**
* @return the topics to create..
*/
String[] topics() default {};

}

Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright 2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.kafka.test.context;

import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.beans.factory.support.DefaultSingletonBeanRegistry;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.kafka.test.rule.KafkaEmbedded;
import org.springframework.test.context.ContextCustomizer;
import org.springframework.test.context.MergedContextConfiguration;
import org.springframework.util.Assert;

/**
* The {@link ContextCustomizer} implementation for Spring Integration specific environment.
* <p>
* Registers {@link KafkaEmbedded} bean.
*
* @author Artem Bilan
*
* @since 2.0
*/
class EmbeddedKafkaContextCustomizer implements ContextCustomizer {

private final EmbeddedKafka embeddedKafka;

EmbeddedKafkaContextCustomizer(EmbeddedKafka embeddedKafka) {
this.embeddedKafka = embeddedKafka;
}

@Override
public void customizeContext(ConfigurableApplicationContext context, MergedContextConfiguration mergedConfig) {
ConfigurableListableBeanFactory beanFactory = context.getBeanFactory();
Assert.isInstanceOf(DefaultSingletonBeanRegistry.class, beanFactory);

KafkaEmbedded kafkaEmbedded = new KafkaEmbedded(this.embeddedKafka.count(),
this.embeddedKafka.controlledShutdown(),
this.embeddedKafka.partitions(),
this.embeddedKafka.topics());

beanFactory.initializeBean(kafkaEmbedded, KafkaEmbedded.BEAN_NAME);
beanFactory.registerSingleton(KafkaEmbedded.BEAN_NAME, kafkaEmbedded);
((DefaultSingletonBeanRegistry) beanFactory).registerDisposableBean(KafkaEmbedded.BEAN_NAME, kafkaEmbedded);
}

}

Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright 2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.kafka.test.context;

import java.util.List;

import org.springframework.core.annotation.AnnotatedElementUtils;
import org.springframework.test.context.ContextConfigurationAttributes;
import org.springframework.test.context.ContextCustomizer;
import org.springframework.test.context.ContextCustomizerFactory;

/**
* The {@link ContextCustomizerFactory} implementation to produce a
* {@link EmbeddedKafkaContextCustomizer} if a {@link EmbeddedKafka} annotation
* is present on the test class.
*
* @author Artem Bilan
*
* @since 2.0
*/
class EmbeddedKafkaContextCustomizerFactory implements ContextCustomizerFactory {

@Override
public ContextCustomizer createContextCustomizer(Class<?> testClass,
List<ContextConfigurationAttributes> configAttributes) {
EmbeddedKafka embeddedKafka =
AnnotatedElementUtils.findMergedAnnotation(testClass, EmbeddedKafka.class);
return embeddedKafka != null ? new EmbeddedKafkaContextCustomizer(embeddedKafka) : null;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@
import org.apache.kafka.common.utils.Time;
import org.junit.rules.ExternalResource;

import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.kafka.test.core.BrokerAddress;
import org.springframework.retry.RetryCallback;
import org.springframework.retry.RetryContext;
Expand Down Expand Up @@ -73,7 +75,9 @@
* @author Gary Russell
*/
@SuppressWarnings("serial")
public class KafkaEmbedded extends ExternalResource implements KafkaRule {
public class KafkaEmbedded extends ExternalResource implements KafkaRule, InitializingBean, DisposableBean {

public static final String BEAN_NAME = "kafkaEmbedded";

public static final String SPRING_EMBEDDED_KAFKA_BROKERS = "spring.embedded.kafka.brokers";

Expand Down Expand Up @@ -129,6 +133,11 @@ public KafkaEmbedded(int count, boolean controlledShutdown, int partitions, Stri
this.partitionsPerTopic = partitions;
}

@Override
public void afterPropertiesSet() throws Exception {
before();
}

@Override
public void before() throws Exception { //NOSONAR
startZookeeper();
Expand Down Expand Up @@ -164,6 +173,12 @@ public void before() throws Exception { //NOSONAR
System.setProperty(SPRING_EMBEDDED_KAFKA_BROKERS, getBrokersAsString());
}


@Override
public void destroy() throws Exception {
after();
}

@Override
public void after() {
System.getProperties().remove(SPRING_EMBEDDED_KAFKA_BROKERS);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# Spring Test ContextCustomizerFactories
org.springframework.test.context.ContextCustomizerFactory=\
org.springframework.kafka.test.context.EmbeddedKafkaContextCustomizerFactory
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,11 @@
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.processor.WallclockTimestampExtractor;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.runner.RunWith;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
Expand All @@ -50,6 +50,7 @@
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.kafka.test.rule.KafkaEmbedded;
import org.springframework.kafka.test.utils.KafkaTestUtils;
import org.springframework.test.annotation.DirtiesContext;
Expand All @@ -63,21 +64,25 @@
*/
@RunWith(SpringRunner.class)
@DirtiesContext
@EmbeddedKafka(partitions = 1,
topics = {
KafkaStreamsTests.STREAMING_TOPIC1,
KafkaStreamsTests.STREAMING_TOPIC2 })
public class KafkaStreamsTests {

private static final String STREAMING_TOPIC1 = "streamingTopic1";
static final String STREAMING_TOPIC1 = "streamingTopic1";

private static final String STREAMING_TOPIC2 = "streamingTopic2";

@ClassRule
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, 1, STREAMING_TOPIC1, STREAMING_TOPIC2);
static final String STREAMING_TOPIC2 = "streamingTopic2";

@Autowired
private KafkaTemplate<Integer, String> kafkaTemplate;

@Autowired
private SettableListenableFuture<String> resultFuture;

@Autowired
private KafkaEmbedded kafkaEmbedded;

@Test
public void testKStreams() throws Exception {
String payload = "foo" + UUID.randomUUID().toString();
Expand All @@ -99,14 +104,17 @@ public void testKStreams() throws Exception {
@EnableKafkaStreams
public static class KafkaStreamsConfiguration {

@Value("${" + KafkaEmbedded.SPRING_EMBEDDED_KAFKA_BROKERS + "}")
private String brokerAddresses;

@Bean
public ProducerFactory<Integer, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}

@Bean
public Map<String, Object> producerConfigs() {
return KafkaTestUtils.producerProps(embeddedKafka);
return KafkaTestUtils.senderProps(this.brokerAddresses);
}

@Bean
Expand All @@ -120,7 +128,7 @@ public KafkaTemplate<Integer, String> template() {
public StreamsConfig kStreamsConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "testStreams");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddresses);
props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName());
Expand Down Expand Up @@ -148,7 +156,7 @@ public KStream<Integer, String> kStream(KStreamBuilder kStreamBuilder) {

@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testGroup", "false", embeddedKafka);
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps(this.brokerAddresses, "testGroup", "false");
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return consumerProps;
}
Expand Down
Loading