From 891ba3bd338e0eb5a97e3d99f4dc7f1e0809f2cf Mon Sep 17 00:00:00 2001 From: Gary Russell Date: Tue, 19 Feb 2019 15:41:46 -0500 Subject: [PATCH] GH-910: Fix return type headers for async returns Fixes https://github.com/spring-projects/spring-amqp/issues/910 --- .../AbstractAdaptableMessageListener.java | 4 +-- .../listener/adapter/InvocationResult.java | 8 +++-- .../rabbit/annotation/AsyncListenerTests.java | 34 +++++++++++++++---- 3 files changed, 34 insertions(+), 12 deletions(-) diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/adapter/AbstractAdaptableMessageListener.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/adapter/AbstractAdaptableMessageListener.java index c02c24fe22..ae4b69aebd 100644 --- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/adapter/AbstractAdaptableMessageListener.java +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/adapter/AbstractAdaptableMessageListener.java @@ -344,8 +344,8 @@ else if (this.logger.isWarnEnabled()) { } private void asyncSuccess(InvocationResult resultArg, Message request, Channel channel, Object source, Object r) { - doHandleResult(new InvocationResult(r, resultArg.getSendTo(), resultArg.getReturnType()), request, - channel, source); + // Set the return type to null so the converter will use the actual returned object's class for type info + doHandleResult(new InvocationResult(r, resultArg.getSendTo(), null), request, channel, source); try { channel.basicAck(request.getMessageProperties().getDeliveryTag(), false); } diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/adapter/InvocationResult.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/adapter/InvocationResult.java index c5947c4c6e..b7deb03270 100644 --- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/adapter/InvocationResult.java +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/adapter/InvocationResult.java @@ -1,5 +1,5 @@ /* - * Copyright 2018 the original author or authors. + * Copyright 2018-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,6 +19,7 @@ import java.lang.reflect.Type; import org.springframework.expression.Expression; +import org.springframework.lang.Nullable; /** * The result of a listener method invocation. @@ -33,9 +34,10 @@ public final class InvocationResult { private final Expression sendTo; + @Nullable private final Type returnType; - public InvocationResult(Object result, Expression sendTo, Type returnType) { + public InvocationResult(Object result, Expression sendTo, @Nullable Type returnType) { this.returnValue = result; this.sendTo = sendTo; this.returnType = returnType; @@ -49,7 +51,7 @@ public Expression getSendTo() { return this.sendTo; } - + @Nullable public Type getReturnType() { return this.returnType; } diff --git a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/annotation/AsyncListenerTests.java b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/annotation/AsyncListenerTests.java index 734ac33ff1..68d2ed958f 100644 --- a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/annotation/AsyncListenerTests.java +++ b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/annotation/AsyncListenerTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2018 the original author or authors. + * Copyright 2018-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,7 +16,7 @@ package org.springframework.amqp.rabbit.annotation; -import static org.junit.Assert.assertEquals; +import static org.assertj.core.api.Assertions.assertThat; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -36,6 +36,8 @@ import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.junit.BrokerRunning; +import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; +import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -61,6 +63,9 @@ public class AsyncListenerTests { @Rule public BrokerRunning brokerRunning = BrokerRunning.isRunning(); + @Autowired + private EnableRabbitConfig config; + @Autowired private RabbitTemplate rabbitTemplate; @@ -75,22 +80,31 @@ public class AsyncListenerTests { @Test public void testAsyncListener() throws Exception { - assertEquals("FOO", this.rabbitTemplate.convertSendAndReceive(this.queue1.getName(), "foo")); + assertThat(this.rabbitTemplate.convertSendAndReceive(this.queue1.getName(), "foo")).isEqualTo("FOO"); RabbitConverterFuture future = this.asyncTemplate.convertSendAndReceive(this.queue1.getName(), "foo"); - assertEquals("FOO", future.get(10, TimeUnit.SECONDS)); - assertEquals("FOO", this.rabbitTemplate.convertSendAndReceive(this.queue2.getName(), "foo")); + assertThat(future.get(10, TimeUnit.SECONDS)).isEqualTo("FOO"); + assertThat(this.rabbitTemplate.convertSendAndReceive(this.queue2.getName(), "foo")).isEqualTo("FOO"); + assertThat(this.config.typeId).isEqualTo("java.lang.String"); } @Configuration @EnableRabbit public static class EnableRabbitConfig { + private volatile Object typeId; + + @Bean + public MessageConverter converter() { + return new Jackson2JsonMessageConverter(); + } + @Bean public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(rabbitConnectionFactory()); factory.setMismatchedQueuesFatal(true); factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); + factory.setMessageConverter(converter()); return factory; } @@ -103,7 +117,13 @@ public ConnectionFactory rabbitConnectionFactory() { @Bean public RabbitTemplate rabbitTemplate() { - return new RabbitTemplate(rabbitConnectionFactory()); + RabbitTemplate template = new RabbitTemplate(rabbitConnectionFactory()); + template.setMessageConverter(converter()); + template.setAfterReceivePostProcessors(m -> { + this.typeId = m.getMessageProperties().getHeaders().get("__TypeId__"); + return m; + }); + return template; } @Bean @@ -153,7 +173,7 @@ public ListenableFuture listen1(String foo) { } @RabbitListener(id = "bar", queues = "#{queue2.name}") - public Mono listen2(String foo) { + public Mono listen2(String foo) { if (barFirst.getAndSet(false)) { return Mono.error(new RuntimeException("Mono.error()")); }