Skip to content

Commit

Permalink
GH-1474: Fix MessageProperties.lastInBatch
Browse files Browse the repository at this point in the history
Resolves #1474

When consuming the whole debatched batch as a list, all messages
had the `lastInBatch` property set to true.

Clone the message properties for the last record.

**cherry-pick to 2.4.x**
  • Loading branch information
garyrussell authored and artembilan committed Jul 25, 2022
1 parent 765e011 commit 10274fe
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 9 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -140,6 +140,15 @@ public void setHeader(String key, Object value) {
this.headers.put(key, value);
}

/**
* Set headers.
* @param headers the headers.
* @since 2.4.7
*/
public void setHeaders(Map<String, Object> headers) {
this.headers.putAll(headers);
}

/**
* Typed getter for a header.
* @param headerName the header name.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2014-2019 the original author or authors.
* Copyright 2014-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -29,6 +29,7 @@
import org.springframework.amqp.rabbit.support.ListenerExecutionFailedException;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.amqp.support.converter.MessageConversionException;
import org.springframework.beans.BeanUtils;
import org.springframework.util.Assert;

/**
Expand Down Expand Up @@ -184,10 +185,16 @@ public void deBatch(Message message, Consumer<Message> fragmentConsumer) {
byte[] body = new byte[length];
byteBuffer.get(body);
messageProperties.setContentLength(length);
// Caveat - shared MessageProperties.
Message fragment = new Message(body, messageProperties);
if (!byteBuffer.hasRemaining()) {
messageProperties.setLastInBatch(true);
// Caveat - shared MessageProperties, except for last
Message fragment;
if (byteBuffer.hasRemaining()) {
fragment = new Message(body, messageProperties);
}
else {
MessageProperties lastProperties = new MessageProperties();
BeanUtils.copyProperties(messageProperties, lastProperties);
lastProperties.setLastInBatch(true);
fragment = new Message(body, lastProperties);
}
fragmentConsumer.accept(fragment);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2014-2020 the original author or authors.
* Copyright 2014-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -261,8 +261,8 @@ private void testDebatchByContainer(AbstractMessageListenerContainer container,
if (asList) {
container.setMessageListener((BatchMessageListener) messages -> {
received.addAll(messages);
lastInBatch.add(false);
lastInBatch.add(true);
lastInBatch.add(messages.get(0).getMessageProperties().isLastInBatch());
lastInBatch.add(messages.get(1).getMessageProperties().isLastInBatch());
batchSize.set(messages.size());
latch.countDown();
});
Expand Down

0 comments on commit 10274fe

Please sign in to comment.