Skip to content

Commit

Permalink
GH-910: Fix return type headers for async returns
Browse files Browse the repository at this point in the history
Fixes #910
  • Loading branch information
garyrussell authored and artembilan committed Feb 19, 2019
1 parent dd66369 commit 891ba3b
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -344,8 +344,8 @@ else if (this.logger.isWarnEnabled()) {
}

private void asyncSuccess(InvocationResult resultArg, Message request, Channel channel, Object source, Object r) {
doHandleResult(new InvocationResult(r, resultArg.getSendTo(), resultArg.getReturnType()), request,
channel, source);
// Set the return type to null so the converter will use the actual returned object's class for type info
doHandleResult(new InvocationResult(r, resultArg.getSendTo(), null), request, channel, source);
try {
channel.basicAck(request.getMessageProperties().getDeliveryTag(), false);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018 the original author or authors.
* Copyright 2018-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -19,6 +19,7 @@
import java.lang.reflect.Type;

import org.springframework.expression.Expression;
import org.springframework.lang.Nullable;

/**
* The result of a listener method invocation.
Expand All @@ -33,9 +34,10 @@ public final class InvocationResult {

private final Expression sendTo;

@Nullable
private final Type returnType;

public InvocationResult(Object result, Expression sendTo, Type returnType) {
public InvocationResult(Object result, Expression sendTo, @Nullable Type returnType) {
this.returnValue = result;
this.sendTo = sendTo;
this.returnType = returnType;
Expand All @@ -49,7 +51,7 @@ public Expression getSendTo() {
return this.sendTo;
}


@Nullable
public Type getReturnType() {
return this.returnType;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018 the original author or authors.
* Copyright 2018-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,7 +16,7 @@

package org.springframework.amqp.rabbit.annotation;

import static org.junit.Assert.assertEquals;
import static org.assertj.core.api.Assertions.assertThat;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
Expand All @@ -36,6 +36,8 @@
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.junit.BrokerRunning;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
Expand All @@ -61,6 +63,9 @@ public class AsyncListenerTests {
@Rule
public BrokerRunning brokerRunning = BrokerRunning.isRunning();

@Autowired
private EnableRabbitConfig config;

@Autowired
private RabbitTemplate rabbitTemplate;

Expand All @@ -75,22 +80,31 @@ public class AsyncListenerTests {

@Test
public void testAsyncListener() throws Exception {
assertEquals("FOO", this.rabbitTemplate.convertSendAndReceive(this.queue1.getName(), "foo"));
assertThat(this.rabbitTemplate.convertSendAndReceive(this.queue1.getName(), "foo")).isEqualTo("FOO");
RabbitConverterFuture<Object> future = this.asyncTemplate.convertSendAndReceive(this.queue1.getName(), "foo");
assertEquals("FOO", future.get(10, TimeUnit.SECONDS));
assertEquals("FOO", this.rabbitTemplate.convertSendAndReceive(this.queue2.getName(), "foo"));
assertThat(future.get(10, TimeUnit.SECONDS)).isEqualTo("FOO");
assertThat(this.rabbitTemplate.convertSendAndReceive(this.queue2.getName(), "foo")).isEqualTo("FOO");
assertThat(this.config.typeId).isEqualTo("java.lang.String");
}

@Configuration
@EnableRabbit
public static class EnableRabbitConfig {

private volatile Object typeId;

@Bean
public MessageConverter converter() {
return new Jackson2JsonMessageConverter();
}

@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(rabbitConnectionFactory());
factory.setMismatchedQueuesFatal(true);
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
factory.setMessageConverter(converter());
return factory;
}

Expand All @@ -103,7 +117,13 @@ public ConnectionFactory rabbitConnectionFactory() {

@Bean
public RabbitTemplate rabbitTemplate() {
return new RabbitTemplate(rabbitConnectionFactory());
RabbitTemplate template = new RabbitTemplate(rabbitConnectionFactory());
template.setMessageConverter(converter());
template.setAfterReceivePostProcessors(m -> {
this.typeId = m.getMessageProperties().getHeaders().get("__TypeId__");
return m;
});
return template;
}

@Bean
Expand Down Expand Up @@ -153,7 +173,7 @@ public ListenableFuture<String> listen1(String foo) {
}

@RabbitListener(id = "bar", queues = "#{queue2.name}")
public Mono<String> listen2(String foo) {
public Mono<?> listen2(String foo) {
if (barFirst.getAndSet(false)) {
return Mono.error(new RuntimeException("Mono.error()"));
}
Expand Down

0 comments on commit 891ba3b

Please sign in to comment.