Skip to content

Commit

Permalink
GH-1817: Allow Annotation Attribute Modification
Browse files Browse the repository at this point in the history
Resolves #1817

* Fix flaky reactor test - add a delay to prevent canceling the consumer before offsets sent.

* Simplify for developers; support multiple enhancers; add coverage for repeated listeners.

* Docs for new changes.

* Doc that enhancer bean definitions must be static; add test.
  • Loading branch information
garyrussell authored Jun 4, 2021
1 parent cb4ad95 commit ad1ae60
Show file tree
Hide file tree
Showing 7 changed files with 216 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public static void main(String[] args) {
}

@RetryableTopic(attempts = "5", backoff = @Backoff(delay = 2_000, maxDelay = 10_000, multiplier = 2))
@KafkaListener(id = "fooGroup", topics = "topic4")
@KafkaListener(id = "fooGroup", topics = "topic4", clientIdPrefix = "test")
public void listen(String in, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.OFFSET) long offset) {

Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
logging.level.root=off
logging.level.org.apache.kafka=info
logging.level.com.example=info
#logging.level.org.springframework.kafka=error
29 changes: 29 additions & 0 deletions spring-kafka-docs/src/main/asciidoc/kafka.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -1863,6 +1863,35 @@ void listen(Object in, @Header(KafkaHeaders.RECORD_METADATA) ConsumerRecordMetad
----
====

[[kafkalistener-attrs]]
===== `@KafkaListener` Attribute Modification

Starting with version 2.7.2, you can now programmatically modify annotation attributes before the container is created.
To do so, add one or more `KafkaListenerAnnotationBeanPostProcessor.AnnotationEnhancer` to the application context.
`AnnotationEnhancer` is a `BiFunction<Map<String, Object>, AnnotatedElement, Map<String, Object>` and must return a a map of attributes.
The attribute values can contain SpEL and/or property placeholders; the enhancer is called before any resolution is performed.
If more than one enhancer is present, and they implement `Ordered`, they will be invoked in order.

IMPORTANT: `AnnotationEnhancer` bean definitions must be declared `static` because they are required very early in the application context's lifecycle.

An example follows:

====
[source, java]
----
@Bean
public static AnnotationEnhancer groupIdEnhancer() {
return (attrs, element) -> {
attrs.put("groupId", attrs.get("id") + "." + (element instanceof Class
? ((Class<?>) element).getSimpleName()
: ((Method) element).getDeclaringClass().getSimpleName()
+ "." + ((Method) element).getName()));
return attrs;
};
}
----
====

[[kafkalistener-lifecycle]]
===== `@KafkaListener` Lifecycle Management

Expand Down
3 changes: 3 additions & 0 deletions spring-kafka-docs/src/main/asciidoc/whats-new.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ You can now set the `rawRecordHeader` property on the `MessagingMessageConverter
This is useful, for example, if you wish to use a `DeadLetterPublishingRecoverer` in a listener error handler.
See <<listener-error-handlers>> for more information.

You can now modify `@KafkaListener` annotations during application initialization.
See <<kafkalistener-attrs>> for more information.

[[x27-dlt]]
==== `DeadLetterPublishingRecover` Changes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import java.io.IOException;
import java.io.StringReader;
import java.lang.reflect.AnnotatedElement;
import java.lang.reflect.Method;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
Expand All @@ -33,6 +34,7 @@
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand All @@ -43,8 +45,8 @@
import org.springframework.aop.support.AopUtils;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.beans.factory.BeanInitializationException;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.ListableBeanFactory;
import org.springframework.beans.factory.NoSuchBeanDefinitionException;
import org.springframework.beans.factory.ObjectFactory;
Expand All @@ -56,9 +58,13 @@
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.beans.factory.config.Scope;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.expression.StandardBeanExpressionResolver;
import org.springframework.core.MethodIntrospector;
import org.springframework.core.MethodParameter;
import org.springframework.core.OrderComparator;
import org.springframework.core.Ordered;
import org.springframework.core.annotation.AnnotatedElementUtils;
import org.springframework.core.annotation.AnnotationUtils;
Expand Down Expand Up @@ -134,7 +140,7 @@
* @see MethodKafkaListenerEndpoint
*/
public class KafkaListenerAnnotationBeanPostProcessor<K, V>
implements BeanPostProcessor, Ordered, BeanFactoryAware, SmartInitializingSingleton {
implements BeanPostProcessor, Ordered, ApplicationContextAware, InitializingBean, SmartInitializingSingleton {

private static final String GENERATED_ID_PREFIX = "org.springframework.kafka.KafkaListenerEndpointContainer#";

Expand All @@ -149,25 +155,29 @@ public class KafkaListenerAnnotationBeanPostProcessor<K, V>

private final ListenerScope listenerScope = new ListenerScope();

private KafkaListenerEndpointRegistry endpointRegistry;

private String defaultContainerFactoryBeanName = DEFAULT_KAFKA_LISTENER_CONTAINER_FACTORY_BEAN_NAME;

private BeanFactory beanFactory;

private final KafkaHandlerMethodFactoryAdapter messageHandlerMethodFactory =
new KafkaHandlerMethodFactoryAdapter();

private final KafkaListenerEndpointRegistrar registrar = new KafkaListenerEndpointRegistrar();

private final AtomicInteger counter = new AtomicInteger();

private KafkaListenerEndpointRegistry endpointRegistry;

private String defaultContainerFactoryBeanName = DEFAULT_KAFKA_LISTENER_CONTAINER_FACTORY_BEAN_NAME;

private ApplicationContext applicationContext;

private BeanFactory beanFactory;

private BeanExpressionResolver resolver = new StandardBeanExpressionResolver();

private BeanExpressionContext expressionContext;

private Charset charset = StandardCharsets.UTF_8;

private AnnotationEnhancer enhancer;

@Override
public int getOrder() {
return LOWEST_PRECEDENCE;
Expand Down Expand Up @@ -213,13 +223,23 @@ public MessageHandlerMethodFactory getMessageHandlerMethodFactory() {
return this.messageHandlerMethodFactory;
}

@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
if (applicationContext instanceof ConfigurableApplicationContext) {
setBeanFactory(((ConfigurableApplicationContext) applicationContext).getBeanFactory());
}
else {
setBeanFactory(applicationContext);
}
}

/**
* Making a {@link BeanFactory} available is optional; if not set,
* {@link KafkaListenerConfigurer} beans won't get autodetected and an
* {@link #setEndpointRegistry endpoint registry} has to be explicitly configured.
* @param beanFactory the {@link BeanFactory} to be used.
*/
@Override
public void setBeanFactory(BeanFactory beanFactory) {
this.beanFactory = beanFactory;
if (beanFactory instanceof ConfigurableListableBeanFactory) {
Expand All @@ -240,6 +260,11 @@ public void setCharset(Charset charset) {
this.charset = charset;
}

@Override
public void afterPropertiesSet() throws Exception {
buildEnhancer();
}

@Override
public void afterSingletonsInstantiated() {
this.registrar.setBeanFactory(this.beanFactory);
Expand Down Expand Up @@ -280,6 +305,25 @@ public void afterSingletonsInstantiated() {
this.registrar.afterPropertiesSet();
}

private void buildEnhancer() {
if (this.applicationContext != null) {
Map<String, AnnotationEnhancer> enhancersMap =
this.applicationContext.getBeansOfType(AnnotationEnhancer.class, false, false);
if (enhancersMap.size() > 0) {
List<AnnotationEnhancer> enhancers = enhancersMap.values()
.stream()
.sorted(new OrderComparator())
.collect(Collectors.toList());
this.enhancer = (attrs, element) -> {
Map<String, Object> newAttrs = attrs;
for (AnnotationEnhancer enhancer : enhancers) {
newAttrs = enhancer.apply(newAttrs, element);
}
return attrs;
};
}
}
}

@Override
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
Expand Down Expand Up @@ -333,11 +377,14 @@ private Collection<KafkaListener> findListenerAnnotations(Class<?> clazz) {
Set<KafkaListener> listeners = new HashSet<>();
KafkaListener ann = AnnotatedElementUtils.findMergedAnnotation(clazz, KafkaListener.class);
if (ann != null) {
ann = enhance(clazz, ann);
listeners.add(ann);
}
KafkaListeners anns = AnnotationUtils.findAnnotation(clazz, KafkaListeners.class);
if (anns != null) {
listeners.addAll(Arrays.asList(anns.value()));
listeners.addAll(Arrays.stream(anns.value())
.map(anno -> enhance(clazz, anno))
.collect(Collectors.toList()));
}
return listeners;
}
Expand All @@ -349,15 +396,28 @@ private Set<KafkaListener> findListenerAnnotations(Method method) {
Set<KafkaListener> listeners = new HashSet<>();
KafkaListener ann = AnnotatedElementUtils.findMergedAnnotation(method, KafkaListener.class);
if (ann != null) {
ann = enhance(method, ann);
listeners.add(ann);
}
KafkaListeners anns = AnnotationUtils.findAnnotation(method, KafkaListeners.class);
if (anns != null) {
listeners.addAll(Arrays.asList(anns.value()));
listeners.addAll(Arrays.stream(anns.value())
.map(anno -> enhance(method, anno))
.collect(Collectors.toList()));
}
return listeners;
}

private KafkaListener enhance(AnnotatedElement element, KafkaListener ann) {
if (this.enhancer == null) {
return ann;
}
else {
return AnnotationUtils.synthesizeAnnotation(
this.enhancer.apply(AnnotationUtils.getAnnotationAttributes(ann), element), KafkaListener.class, null);
}
}

private void processMultiMethodListeners(Collection<KafkaListener> classLevelListeners, List<Method> multiMethods,
Object bean, String beanName) {

Expand Down Expand Up @@ -1070,4 +1130,14 @@ protected boolean isEmptyPayload(Object payload) {

}

/**
* Post processes each set of annotation attributes.
*
* @since 2.7.2
*
*/
public interface AnnotationEnhancer extends BiFunction<Map<String, Object>, AnnotatedElement, Map<String, Object>> {

}

}
Loading

0 comments on commit ad1ae60

Please sign in to comment.