From 10274feebe815e3a5b424873b5c302ff491cd016 Mon Sep 17 00:00:00 2001 From: Gary Russell Date: Mon, 25 Jul 2022 11:41:21 -0400 Subject: [PATCH] GH-1474: Fix MessageProperties.lastInBatch Resolves https://github.com/spring-projects/spring-amqp/issues/1474 When consuming the whole debatched batch as a list, all messages had the `lastInBatch` property set to true. Clone the message properties for the last record. **cherry-pick to 2.4.x** --- .../amqp/core/MessageProperties.java | 11 ++++++++++- .../rabbit/batch/SimpleBatchingStrategy.java | 17 ++++++++++++----- .../core/BatchingRabbitTemplateTests.java | 6 +++--- 3 files changed, 25 insertions(+), 9 deletions(-) diff --git a/spring-amqp/src/main/java/org/springframework/amqp/core/MessageProperties.java b/spring-amqp/src/main/java/org/springframework/amqp/core/MessageProperties.java index 42e92054ca..6f955d381e 100644 --- a/spring-amqp/src/main/java/org/springframework/amqp/core/MessageProperties.java +++ b/spring-amqp/src/main/java/org/springframework/amqp/core/MessageProperties.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2019 the original author or authors. + * Copyright 2002-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -140,6 +140,15 @@ public void setHeader(String key, Object value) { this.headers.put(key, value); } + /** + * Set headers. + * @param headers the headers. + * @since 2.4.7 + */ + public void setHeaders(Map headers) { + this.headers.putAll(headers); + } + /** * Typed getter for a header. * @param headerName the header name. diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/batch/SimpleBatchingStrategy.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/batch/SimpleBatchingStrategy.java index 504fe548a7..d1558f7a10 100644 --- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/batch/SimpleBatchingStrategy.java +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/batch/SimpleBatchingStrategy.java @@ -1,5 +1,5 @@ /* - * Copyright 2014-2019 the original author or authors. + * Copyright 2014-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -29,6 +29,7 @@ import org.springframework.amqp.rabbit.support.ListenerExecutionFailedException; import org.springframework.amqp.support.AmqpHeaders; import org.springframework.amqp.support.converter.MessageConversionException; +import org.springframework.beans.BeanUtils; import org.springframework.util.Assert; /** @@ -184,10 +185,16 @@ public void deBatch(Message message, Consumer fragmentConsumer) { byte[] body = new byte[length]; byteBuffer.get(body); messageProperties.setContentLength(length); - // Caveat - shared MessageProperties. - Message fragment = new Message(body, messageProperties); - if (!byteBuffer.hasRemaining()) { - messageProperties.setLastInBatch(true); + // Caveat - shared MessageProperties, except for last + Message fragment; + if (byteBuffer.hasRemaining()) { + fragment = new Message(body, messageProperties); + } + else { + MessageProperties lastProperties = new MessageProperties(); + BeanUtils.copyProperties(messageProperties, lastProperties); + lastProperties.setLastInBatch(true); + fragment = new Message(body, lastProperties); } fragmentConsumer.accept(fragment); } diff --git a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/BatchingRabbitTemplateTests.java b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/BatchingRabbitTemplateTests.java index 12cf466696..469dfcd587 100644 --- a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/BatchingRabbitTemplateTests.java +++ b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/BatchingRabbitTemplateTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2014-2020 the original author or authors. + * Copyright 2014-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -261,8 +261,8 @@ private void testDebatchByContainer(AbstractMessageListenerContainer container, if (asList) { container.setMessageListener((BatchMessageListener) messages -> { received.addAll(messages); - lastInBatch.add(false); - lastInBatch.add(true); + lastInBatch.add(messages.get(0).getMessageProperties().isLastInBatch()); + lastInBatch.add(messages.get(1).getMessageProperties().isLastInBatch()); batchSize.set(messages.size()); latch.countDown(); });