Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Some generics optimization in the MessagingMessageListenerAdapter #3620

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ void testMulti() {
}

@Test
@SuppressWarnings("unchecked")
void testTransactionReplicationFactor() {
EmbeddedKafka annotationWithPorts =
AnnotationUtils.findAnnotation(TestWithEmbeddedKafkaTransactionFactor.class, EmbeddedKafka.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
import org.springframework.util.ClassUtils;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;
import org.springframework.util.TypeUtils;

/**
* An abstract {@link org.springframework.kafka.listener.MessageListener} adapter
Expand Down Expand Up @@ -320,6 +321,20 @@ public void setBeanResolver(BeanResolver beanResolver) {
this.evaluationContext.addPropertyAccessor(new MapAccessor());
}

/**
* Set the retry callback for failures of both {@link CompletableFuture} and {@link Mono}.
* {@link MessagingMessageListenerAdapter#asyncFailure(Object, Acknowledgment, Consumer, Throwable, Message)}
* will invoke {@link MessagingMessageListenerAdapter#asyncRetryCallback} when
* {@link CompletableFuture} or {@link Mono} fails to complete.
* @param asyncRetryCallback the callback for async retry.
* @since 3.3
*/
public void setCallbackForAsyncFailure(
@Nullable BiConsumer<ConsumerRecord<K, V>, RuntimeException> asyncRetryCallback) {

this.asyncRetryCallback = asyncRetryCallback;
}

protected boolean isMessageList() {
return this.isMessageList;
}
Expand Down Expand Up @@ -392,6 +407,7 @@ public void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekC

protected Message<?> toMessagingMessage(ConsumerRecord<K, V> cRecord, @Nullable Acknowledgment acknowledgment,
Consumer<?, ?> consumer) {

return getMessageConverter().toMessage(cRecord, acknowledgment, consumer, getType());
}

Expand Down Expand Up @@ -875,70 +891,47 @@ else if (isAck || isKotlinContinuation || isConsumer || annotationHeaderIsGroupI
private Type extractGenericParameterTypFromMethodParameter(MethodParameter methodParameter) {
Type genericParameterType = methodParameter.getGenericParameterType();
if (genericParameterType instanceof ParameterizedType parameterizedType) {
if (parameterizedType.getRawType().equals(Message.class)) {
Type rawType = parameterizedType.getRawType();
if (rawType.equals(Message.class)) {
genericParameterType = parameterizedType.getActualTypeArguments()[0];
}
else if (parameterizedType.getRawType().equals(List.class)
&& parameterizedType.getActualTypeArguments().length == 1) {

Type paramType = getTypeFromWildCardWithUpperBound(parameterizedType.getActualTypeArguments()[0]);
this.isConsumerRecordList = parameterIsType(paramType, ConsumerRecord.class);
boolean messageWithGeneric = rawByParameterIsType(paramType, Message.class);
this.isMessageList = Message.class.equals(paramType) || messageWithGeneric;
if (messageWithGeneric) {
else if (rawType.equals(List.class) && parameterizedType.getActualTypeArguments().length == 1) {
Type paramType = parameterizedType.getActualTypeArguments()[0];
boolean messageHasGeneric = paramType instanceof ParameterizedType pType
&& pType.getRawType().equals(Message.class);
this.isMessageList = TypeUtils.isAssignable(paramType, Message.class) || messageHasGeneric;
this.isConsumerRecordList = TypeUtils.isAssignable(paramType, ConsumerRecord.class);
if (messageHasGeneric) {
genericParameterType = ((ParameterizedType) paramType).getActualTypeArguments()[0];
}
}
else {
this.isConsumerRecords = parameterizedType.getRawType().equals(ConsumerRecords.class);
this.isConsumerRecords = rawType.equals(ConsumerRecords.class);
}
}
return genericParameterType;
}

private boolean annotationHeaderIsGroupId(MethodParameter methodParameter) {
private static boolean annotationHeaderIsGroupId(MethodParameter methodParameter) {
Header header = methodParameter.getParameterAnnotation(Header.class);
return header != null && KafkaHeaders.GROUP_ID.equals(header.value());
}

private Type getTypeFromWildCardWithUpperBound(Type paramType) {
if (paramType instanceof WildcardType wcType
&& wcType.getUpperBounds() != null
&& wcType.getUpperBounds().length > 0) {
paramType = wcType.getUpperBounds()[0];
}
return paramType;
}

private boolean isMessageWithNoTypeInfo(Type parameterType) {
private static boolean isMessageWithNoTypeInfo(Type parameterType) {
if (parameterType instanceof ParameterizedType pType && pType.getRawType().equals(Message.class)) {
return pType.getActualTypeArguments()[0] instanceof WildcardType;
}
return Message.class.equals(parameterType); // could be Message without a generic type
}

private boolean parameterIsType(Type parameterType, Type type) {
private static boolean parameterIsType(Type parameterType, Type type) {
return parameterType.equals(type) || rawByParameterIsType(parameterType, type);
}

private boolean rawByParameterIsType(Type parameterType, Type type) {
private static boolean rawByParameterIsType(Type parameterType, Type type) {
return parameterType instanceof ParameterizedType pType && pType.getRawType().equals(type);
}

/**
* Set the retry callback for failures of both {@link CompletableFuture} and {@link Mono}.
* {@link MessagingMessageListenerAdapter#asyncFailure(Object, Acknowledgment, Consumer, Throwable, Message)}
* will invoke {@link MessagingMessageListenerAdapter#asyncRetryCallback} when
* {@link CompletableFuture} or {@link Mono} fails to complete.
* @param asyncRetryCallback the callback for async retry.
* @since 3.3
*/
public void setCallbackForAsyncFailure(
@Nullable BiConsumer<ConsumerRecord<K, V>, RuntimeException> asyncRetryCallback) {

this.asyncRetryCallback = asyncRetryCallback;
}

/**
* Root object for reply expression evaluation.
* @param request the request.
Expand All @@ -947,6 +940,7 @@ public void setCallbackForAsyncFailure(
* @since 2.0
*/
public record ReplyExpressionRoot(Object request, Object source, Object result) {

}

static class NoOpAck implements Acknowledgment {
Expand Down