From 00775a590c140de6f66407f037bdf678f9275acf Mon Sep 17 00:00:00 2001 From: Artem Bilan Date: Mon, 11 Nov 2024 14:07:34 -0500 Subject: [PATCH] Some generics optimization in the `MessagingMessageListenerAdapter` * Fix warning in the `EmbeddedKafkaContextCustomizerTests.testTransactionReplicationFactor()` --- .../EmbeddedKafkaContextCustomizerTests.java | 1 + .../MessagingMessageListenerAdapter.java | 68 +++++++++---------- 2 files changed, 32 insertions(+), 37 deletions(-) diff --git a/spring-kafka-test/src/test/java/org/springframework/kafka/test/context/EmbeddedKafkaContextCustomizerTests.java b/spring-kafka-test/src/test/java/org/springframework/kafka/test/context/EmbeddedKafkaContextCustomizerTests.java index 20deb9cb8b..710fdfe914 100644 --- a/spring-kafka-test/src/test/java/org/springframework/kafka/test/context/EmbeddedKafkaContextCustomizerTests.java +++ b/spring-kafka-test/src/test/java/org/springframework/kafka/test/context/EmbeddedKafkaContextCustomizerTests.java @@ -96,6 +96,7 @@ void testMulti() { } @Test + @SuppressWarnings("unchecked") void testTransactionReplicationFactor() { EmbeddedKafka annotationWithPorts = AnnotationUtils.findAnnotation(TestWithEmbeddedKafkaTransactionFactor.class, EmbeddedKafka.class); diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java index 5059141c69..59440ccda3 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java @@ -77,6 +77,7 @@ import org.springframework.util.ClassUtils; import org.springframework.util.ObjectUtils; import org.springframework.util.StringUtils; +import org.springframework.util.TypeUtils; /** * An abstract {@link org.springframework.kafka.listener.MessageListener} adapter @@ -320,6 +321,20 @@ public void setBeanResolver(BeanResolver beanResolver) { this.evaluationContext.addPropertyAccessor(new MapAccessor()); } + /** + * Set the retry callback for failures of both {@link CompletableFuture} and {@link Mono}. + * {@link MessagingMessageListenerAdapter#asyncFailure(Object, Acknowledgment, Consumer, Throwable, Message)} + * will invoke {@link MessagingMessageListenerAdapter#asyncRetryCallback} when + * {@link CompletableFuture} or {@link Mono} fails to complete. + * @param asyncRetryCallback the callback for async retry. + * @since 3.3 + */ + public void setCallbackForAsyncFailure( + @Nullable BiConsumer, RuntimeException> asyncRetryCallback) { + + this.asyncRetryCallback = asyncRetryCallback; + } + protected boolean isMessageList() { return this.isMessageList; } @@ -392,6 +407,7 @@ public void onIdleContainer(Map assignments, ConsumerSeekC protected Message toMessagingMessage(ConsumerRecord cRecord, @Nullable Acknowledgment acknowledgment, Consumer consumer) { + return getMessageConverter().toMessage(cRecord, acknowledgment, consumer, getType()); } @@ -875,70 +891,47 @@ else if (isAck || isKotlinContinuation || isConsumer || annotationHeaderIsGroupI private Type extractGenericParameterTypFromMethodParameter(MethodParameter methodParameter) { Type genericParameterType = methodParameter.getGenericParameterType(); if (genericParameterType instanceof ParameterizedType parameterizedType) { - if (parameterizedType.getRawType().equals(Message.class)) { + Type rawType = parameterizedType.getRawType(); + if (rawType.equals(Message.class)) { genericParameterType = parameterizedType.getActualTypeArguments()[0]; } - else if (parameterizedType.getRawType().equals(List.class) - && parameterizedType.getActualTypeArguments().length == 1) { - - Type paramType = getTypeFromWildCardWithUpperBound(parameterizedType.getActualTypeArguments()[0]); - this.isConsumerRecordList = parameterIsType(paramType, ConsumerRecord.class); - boolean messageWithGeneric = rawByParameterIsType(paramType, Message.class); - this.isMessageList = Message.class.equals(paramType) || messageWithGeneric; - if (messageWithGeneric) { + else if (rawType.equals(List.class) && parameterizedType.getActualTypeArguments().length == 1) { + Type paramType = parameterizedType.getActualTypeArguments()[0]; + boolean messageHasGeneric = paramType instanceof ParameterizedType pType + && pType.getRawType().equals(Message.class); + this.isMessageList = TypeUtils.isAssignable(paramType, Message.class) || messageHasGeneric; + this.isConsumerRecordList = TypeUtils.isAssignable(paramType, ConsumerRecord.class); + if (messageHasGeneric) { genericParameterType = ((ParameterizedType) paramType).getActualTypeArguments()[0]; } } else { - this.isConsumerRecords = parameterizedType.getRawType().equals(ConsumerRecords.class); + this.isConsumerRecords = rawType.equals(ConsumerRecords.class); } } return genericParameterType; } - private boolean annotationHeaderIsGroupId(MethodParameter methodParameter) { + private static boolean annotationHeaderIsGroupId(MethodParameter methodParameter) { Header header = methodParameter.getParameterAnnotation(Header.class); return header != null && KafkaHeaders.GROUP_ID.equals(header.value()); } - private Type getTypeFromWildCardWithUpperBound(Type paramType) { - if (paramType instanceof WildcardType wcType - && wcType.getUpperBounds() != null - && wcType.getUpperBounds().length > 0) { - paramType = wcType.getUpperBounds()[0]; - } - return paramType; - } - - private boolean isMessageWithNoTypeInfo(Type parameterType) { + private static boolean isMessageWithNoTypeInfo(Type parameterType) { if (parameterType instanceof ParameterizedType pType && pType.getRawType().equals(Message.class)) { return pType.getActualTypeArguments()[0] instanceof WildcardType; } return Message.class.equals(parameterType); // could be Message without a generic type } - private boolean parameterIsType(Type parameterType, Type type) { + private static boolean parameterIsType(Type parameterType, Type type) { return parameterType.equals(type) || rawByParameterIsType(parameterType, type); } - private boolean rawByParameterIsType(Type parameterType, Type type) { + private static boolean rawByParameterIsType(Type parameterType, Type type) { return parameterType instanceof ParameterizedType pType && pType.getRawType().equals(type); } - /** - * Set the retry callback for failures of both {@link CompletableFuture} and {@link Mono}. - * {@link MessagingMessageListenerAdapter#asyncFailure(Object, Acknowledgment, Consumer, Throwable, Message)} - * will invoke {@link MessagingMessageListenerAdapter#asyncRetryCallback} when - * {@link CompletableFuture} or {@link Mono} fails to complete. - * @param asyncRetryCallback the callback for async retry. - * @since 3.3 - */ - public void setCallbackForAsyncFailure( - @Nullable BiConsumer, RuntimeException> asyncRetryCallback) { - - this.asyncRetryCallback = asyncRetryCallback; - } - /** * Root object for reply expression evaluation. * @param request the request. @@ -947,6 +940,7 @@ public void setCallbackForAsyncFailure( * @since 2.0 */ public record ReplyExpressionRoot(Object request, Object source, Object result) { + } static class NoOpAck implements Acknowledgment {