Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Batch listening does not work for typed Spring messaging messages #1145

Closed
KaiStapel opened this issue Jan 17, 2020 · 2 comments · Fixed by #1146
Closed

Batch listening does not work for typed Spring messaging messages #1145

KaiStapel opened this issue Jan 17, 2020 · 2 comments · Fixed by #1146

Comments

@KaiStapel
Copy link
Contributor

KaiStapel commented Jan 17, 2020

Affects Version(s): 2.2.2.RELEASE

Bug report

The following batch listener does not work as expected:

@RabbitListener(queues = "aQueue")
public void receive(List<Message<Foo>> batch) {
  batch.forEach(message -> System.out.println(message.getPayload().toString()));
}

Incoming messages fail to get deserialized with the following error message:

org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Listener threw exception
[...]
Caused by: org.springframework.amqp.support.converter.MessageConversionException: Failed to convert Message content
[...]
Caused by: com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Cannot construct instance of `org.springframework.messaging.Message` (no Creators, like default construct, exist): abstract types either need to be mapped to concrete types, have custom deserializer, or contain additional type information
 at [Source: (String)"{"test": 1}"; line: 1, column: 1]

Steps to reproduce

@SpringBootApplication
@EnableRabbit
public class SpringAmqpBatchMessageBug {

  public static void main(String[] args) throws Exception {
    ConfigurableApplicationContext context = SpringApplication.run(SpringAmqpBatchMessageBug.class, args);
    RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);
    rabbitTemplate.send("exchange", "", msg("{\"test\": 1}"));
    rabbitTemplate.send("exchange", "", msg("{\"test\": 2}"));
    rabbitTemplate.send("exchange", "", msg("{\"test\": 3}"));
    context.getBean(Listener.class).latch.await(60, TimeUnit.SECONDS);
    context.close();
  }

  private static org.springframework.amqp.core.Message msg(String body) throws UnsupportedEncodingException {
    MessageProperties properties = new MessageProperties();
    properties.setContentType("application/json");
    return new org.springframework.amqp.core.Message(body.getBytes(SimpleMessageConverter.DEFAULT_CHARSET), properties);
  }

  @Bean
  public FanoutExchange exchange() { return (FanoutExchange) ExchangeBuilder.fanoutExchange("exchange").build(); }
  @Bean
  public Queue workingBatchQueue() { return new Queue("workingBatchQueue"); }
  @Bean
  public Queue brokenBatchQueue() { return new Queue("brokenBatchQueue"); }
  @Bean
  public Binding workingQueueBinding(Queue workingBatchQueue, FanoutExchange exchange) { return BindingBuilder.bind(workingBatchQueue).to(exchange); }
  @Bean
  public Binding brokenQueueBinding(Queue brokenBatchQueue, FanoutExchange exchange) { return BindingBuilder.bind(brokenBatchQueue).to(exchange); }

  @Bean
  public SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory(
      SimpleRabbitListenerContainerFactoryConfigurer configurer,
      ConnectionFactory connectionFactory) {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    configurer.configure(factory, connectionFactory);
    factory.setConnectionFactory(connectionFactory);
    factory.setMessageConverter(new Jackson2JsonMessageConverter());
    factory.setBatchListener(true);
    factory.setConsumerBatchEnabled(true);
    factory.setDeBatchingEnabled(true);
    factory.setBatchSize(3);
    factory.setReceiveTimeout(1000L);
    return factory;
  }

  @Bean
  public Listener listener() {
    return new Listener();
  }

  public static class Listener {

    private final CountDownLatch latch = new CountDownLatch(3);

    @RabbitListener(
        queues = "workingBatchQueue",
        containerFactory = "simpleRabbitListenerContainerFactory"
    )
    public void receiveWorkingBatch(List<Foo> batch) {
      System.out.println("Received batch of size " + batch.size() + " from 'workingBatchQueue'");
      batch.forEach(message -> {
        System.out.println("Message in 'workingBatchQueue' batch: " + message);
        latch.countDown();
      });
    }

    @RabbitListener(
        queues = "brokenBatchQueue",
        containerFactory = "simpleRabbitListenerContainerFactory"
    )
    public void receiveBrokenBatch(List<Message<Foo>> batch) {
      System.out.println("Received batch of size " + batch.size() + " from 'brokenBatchQueue'");
      batch.forEach(message -> {
        System.out.println("Message in 'brokenBatchQueue' batch: " + message.getPayload());
        latch.countDown();
      });
    }

  }

  public static class Foo {
    private int test;

    public int getTest() {
      return test;
    }

    public void setTest(int test) {
      this.test = test;
    }

    @Override
    public String toString() {
      return "test: " + test;
    }
  }

}
KaiStapel pushed a commit to KaiStapel/spring-amqp that referenced this issue Jan 17, 2020
… listeners with typed 'org.springframework.messaging.Message's
@garyrussell
Copy link
Contributor

Hmmm - we have a test for this...

public static class Listener {
List<Foo> foos;
CountDownLatch foosLatch = new CountDownLatch(1);
List<Message<Foo>> fooMessages;
CountDownLatch fooMessagesLatch = new CountDownLatch(1);
@RabbitListener(queues = "json.batch.1")
public void listen1(List<Foo> in) {
this.foos = in;
this.foosLatch.countDown();
}
@RabbitListener(queues = "json.batch.2")
public void listen2(List<Message<Foo>> in) {
this.fooMessages = in;
this.fooMessagesLatch.countDown();
}
}

When you sign the CLA, I can look at your PR.

@KaiStapel
Copy link
Contributor Author

Your test will also show this bug if you change it as follows:

  1. Actually use batch listening (i.e. BatchMessagingMessageListenerAdapter#onMessageBatch)
factory.setConsumerBatchEnabled(true);
factory.setBatchSize(2);
  1. Do not set a type header in the sent messages
this.template.send("json.batch.2", msg("{\"bar\":\"foo\"}"));
this.template.send("json.batch.2", msg("{\"bar\":\"bar\"}"));
[...}
private org.springframework.amqp.core.Message msg(String body) throws UnsupportedEncodingException {
	MessageProperties properties = new MessageProperties();
	properties.setContentType("application/json");
	return new org.springframework.amqp.core.Message(body.getBytes(SimpleMessageConverter.DEFAULT_CHARSET), properties);
}

KaiStapel pushed a commit to KaiStapel/spring-amqp that referenced this issue Jan 19, 2020
KaiStapel pushed a commit to KaiStapel/spring-amqp that referenced this issue Jan 22, 2020
garyrussell pushed a commit that referenced this issue Jan 22, 2020
…1146)

* [#1145] Fix generic parameter type detection for batch listeners with typed 'org.springframework.messaging.Message's

* [#1145] Update integration test

* [#1145] Add myself as author
KaiStapel pushed a commit to KaiStapel/spring-amqp that referenced this issue Nov 18, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants