From 5a5e9881ff9fc6d3f3b74d954ffcb69333c5d276 Mon Sep 17 00:00:00 2001 From: raylax Date: Thu, 18 Jan 2024 00:23:13 +0800 Subject: [PATCH 1/7] Fix `x-delay` header to `Long` --- .../amqp/core/MessageProperties.java | 35 ++++++++++++++++++- .../amqp/support/SimpleAmqpHeaderMapper.java | 4 ++- .../support/SimpleAmqpHeaderMapperTests.java | 22 +++++++++++- 3 files changed, 58 insertions(+), 3 deletions(-) diff --git a/spring-amqp/src/main/java/org/springframework/amqp/core/MessageProperties.java b/spring-amqp/src/main/java/org/springframework/amqp/core/MessageProperties.java index eba31fb895..5b87db2743 100644 --- a/spring-amqp/src/main/java/org/springframework/amqp/core/MessageProperties.java +++ b/spring-amqp/src/main/java/org/springframework/amqp/core/MessageProperties.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2022 the original author or authors. + * Copyright 2002-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -434,6 +434,7 @@ public void setConsumerQueue(String consumerQueue) { * The x-delay header (outbound). * @return the delay. * @since 1.6 + * @deprecated in favor of {@link #getDelayLong()} * @see #getReceivedDelay() */ public Integer getDelay() { @@ -450,6 +451,7 @@ public Integer getDelay() { * Set the x-delay header. * @param delay the delay. * @since 1.6 + * @deprecated in favor of {@link #setDelayLong(Long)} */ public void setDelay(Integer delay) { if (delay == null || delay < 0) { @@ -460,6 +462,37 @@ public void setDelay(Integer delay) { } } + /** + * Get the x-delay header long value. + * + * @return the delay. + * @since 3.1.2 + */ + public Long getDelayLong() { + Object delay = this.headers.get(X_DELAY); + if (delay instanceof Long) { + return (Long) delay; + } + else { + return null; + } + } + + /** + * Set the x-delay header to a long value. + * + * @param delay the delay. + * @since 3.1.2 + */ + public void setDelayLong(Long delay) { + if (delay == null || delay < 0) { + this.headers.remove(X_DELAY); + } + else { + this.headers.put(X_DELAY, delay); + } + } + public boolean isFinalRetryForMessageWithNoId() { return this.finalRetryForMessageWithNoId; } diff --git a/spring-amqp/src/main/java/org/springframework/amqp/support/SimpleAmqpHeaderMapper.java b/spring-amqp/src/main/java/org/springframework/amqp/support/SimpleAmqpHeaderMapper.java index 3031d3d87c..14d9a17809 100644 --- a/spring-amqp/src/main/java/org/springframework/amqp/support/SimpleAmqpHeaderMapper.java +++ b/spring-amqp/src/main/java/org/springframework/amqp/support/SimpleAmqpHeaderMapper.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2019 the original author or authors. + * Copyright 2002-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -71,6 +71,8 @@ public void fromHeaders(MessageHeaders headers, MessageProperties amqpMessagePro javaUtils .acceptIfNotNull(getHeaderIfAvailable(headers, AmqpHeaders.DELAY, Integer.class), amqpMessageProperties::setDelay) + .acceptIfNotNull(getHeaderIfAvailable(headers, AmqpHeaders.DELAY, Long.class), + amqpMessageProperties::setDelayLong) .acceptIfNotNull(getHeaderIfAvailable(headers, AmqpHeaders.DELIVERY_MODE, MessageDeliveryMode.class), amqpMessageProperties::setDeliveryMode) .acceptIfNotNull(getHeaderIfAvailable(headers, AmqpHeaders.DELIVERY_TAG, Long.class), diff --git a/spring-amqp/src/test/java/org/springframework/amqp/support/SimpleAmqpHeaderMapperTests.java b/spring-amqp/src/test/java/org/springframework/amqp/support/SimpleAmqpHeaderMapperTests.java index 2e9c768279..e1f7eebe83 100644 --- a/spring-amqp/src/test/java/org/springframework/amqp/support/SimpleAmqpHeaderMapperTests.java +++ b/spring-amqp/src/test/java/org/springframework/amqp/support/SimpleAmqpHeaderMapperTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2019 the original author or authors. + * Copyright 2002-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -95,6 +95,26 @@ public void fromHeaders() { assertThat(amqpProperties.getDelay()).isEqualTo(Integer.valueOf(1234)); } + @Test + public void fromHeadersWithLongDelay() { + SimpleAmqpHeaderMapper headerMapper = new SimpleAmqpHeaderMapper(); + Map headerMap = new HashMap<>(); + headerMap.put(AmqpHeaders.DELAY, 1234L); + MessageHeaders messageHeaders = new MessageHeaders(headerMap); + MessageProperties amqpProperties = new MessageProperties(); + headerMapper.fromHeaders(messageHeaders, amqpProperties); + assertThat(amqpProperties.getDelayLong()).isEqualTo(Long.valueOf(1234)); + + amqpProperties.setDelayLong(5678L); + assertThat(amqpProperties.getDelayLong()).isEqualTo(Long.valueOf(5678)); + + amqpProperties.setDelay(123); + assertThat(amqpProperties.getDelayLong()).isNull(); + + + } + + @Test public void fromHeadersWithContentTypeAsMediaType() { SimpleAmqpHeaderMapper headerMapper = new SimpleAmqpHeaderMapper(); From ed92329fa2a686895a64bbba2ba115069454f03a Mon Sep 17 00:00:00 2001 From: raylax Date: Thu, 18 Jan 2024 10:25:10 +0800 Subject: [PATCH 2/7] Fix `x-delay` header to `Long` --- .../amqp/core/MessageProperties.java | 63 +++++++++++++------ .../amqp/support/SimpleAmqpHeaderMapper.java | 5 +- .../amqp/core/MessagePropertiesTests.java | 5 +- .../support/SimpleAmqpHeaderMapperTests.java | 16 +++-- .../DefaultMessagePropertiesConverter.java | 9 ++- 5 files changed, 68 insertions(+), 30 deletions(-) diff --git a/spring-amqp/src/main/java/org/springframework/amqp/core/MessageProperties.java b/spring-amqp/src/main/java/org/springframework/amqp/core/MessageProperties.java index 5b87db2743..a9cae9998f 100644 --- a/spring-amqp/src/main/java/org/springframework/amqp/core/MessageProperties.java +++ b/spring-amqp/src/main/java/org/springframework/amqp/core/MessageProperties.java @@ -24,6 +24,8 @@ import java.util.List; import java.util.Map; +import org.springframework.util.Assert; + /** * Message Properties for an AMQP message. * @@ -33,6 +35,7 @@ * @author Dmitry Chernyshov * @author Artem Bilan * @author Csaba Soti + * @author Raylax Grey */ public class MessageProperties implements Serializable { @@ -66,6 +69,8 @@ public class MessageProperties implements Serializable { public static final Integer DEFAULT_PRIORITY = 0; + public static final long X_DELAY_MAX = 0xffffffffL; // (2 ^ 32) - 1 + private final Map headers = new HashMap<>(); private Date timestamp; @@ -118,7 +123,7 @@ public class MessageProperties implements Serializable { private String consumerQueue; - private Integer receivedDelay; + private Long receivedDelay; private MessageDeliveryMode receivedDeliveryMode; @@ -352,10 +357,13 @@ public String getReceivedRoutingKey() { * received message contains the delay. * @return the received delay. * @since 1.6 + * @deprecated in favor of {@link #getReceivedDelayLong()} * @see #getDelay() */ + @Deprecated(since = "3.1.2", forRemoval = true) public Integer getReceivedDelay() { - return this.receivedDelay; + Long receivedDelay = getReceivedDelayLong(); + return receivedDelay != null ? Math.toIntExact(receivedDelay) : null; } /** @@ -363,8 +371,34 @@ public Integer getReceivedDelay() { * received message contains the delay. * @param receivedDelay the received delay. * @since 1.6 + * @deprecated in favor of {@link #setReceivedDelayLong(Long)} */ + @Deprecated(since = "3.1.2", forRemoval = true) public void setReceivedDelay(Integer receivedDelay) { + setReceivedDelayLong(receivedDelay != null ? receivedDelay.longValue() : null); + } + + /** + * When a delayed message exchange is used the x-delay header on a + * received message contains the delay. + * + * @return the received delay. + * @since 3.1.2 + * @see #getDelayLong() + */ + public Long getReceivedDelayLong() { + return this.receivedDelay; + } + + /** + * When a delayed message exchange is used the x-delay header on a + * received message contains the delay. + * + * @param receivedDelay the received delay. + * @since 3.1.2 + * @see #setDelayLong(Long) + */ + public void setReceivedDelayLong(Long receivedDelay) { this.receivedDelay = receivedDelay; } @@ -437,14 +471,10 @@ public void setConsumerQueue(String consumerQueue) { * @deprecated in favor of {@link #getDelayLong()} * @see #getReceivedDelay() */ + @Deprecated(since = "3.1.2", forRemoval = true) public Integer getDelay() { - Object delay = this.headers.get(X_DELAY); - if (delay instanceof Integer) { - return (Integer) delay; - } - else { - return null; - } + Long delay = getDelayLong(); + return delay != null ? Math.toIntExact(delay) : null; } /** @@ -453,13 +483,9 @@ public Integer getDelay() { * @since 1.6 * @deprecated in favor of {@link #setDelayLong(Long)} */ + @Deprecated(since = "3.1.2", forRemoval = true) public void setDelay(Integer delay) { - if (delay == null || delay < 0) { - this.headers.remove(X_DELAY); - } - else { - this.headers.put(X_DELAY, delay); - } + setDelayLong(delay != null ? delay.longValue() : null); } /** @@ -487,10 +513,11 @@ public Long getDelayLong() { public void setDelayLong(Long delay) { if (delay == null || delay < 0) { this.headers.remove(X_DELAY); + return; } - else { - this.headers.put(X_DELAY, delay); - } + + Assert.isTrue(delay <= X_DELAY_MAX, "Delay cannot exceed " + X_DELAY_MAX); + this.headers.put(X_DELAY, delay); } public boolean isFinalRetryForMessageWithNoId() { diff --git a/spring-amqp/src/main/java/org/springframework/amqp/support/SimpleAmqpHeaderMapper.java b/spring-amqp/src/main/java/org/springframework/amqp/support/SimpleAmqpHeaderMapper.java index 14d9a17809..51af201fa8 100644 --- a/spring-amqp/src/main/java/org/springframework/amqp/support/SimpleAmqpHeaderMapper.java +++ b/spring-amqp/src/main/java/org/springframework/amqp/support/SimpleAmqpHeaderMapper.java @@ -48,6 +48,7 @@ * @author Gary Russell * @author Artem Bilan * @author Stephane Nicoll + * @author Raylax Grey * @since 1.4 */ public class SimpleAmqpHeaderMapper extends AbstractHeaderMapper implements AmqpHeaderMapper { @@ -69,8 +70,6 @@ public void fromHeaders(MessageHeaders headers, MessageProperties amqpMessagePro amqpMessageProperties.setCorrelationId((String) correlationId); } javaUtils - .acceptIfNotNull(getHeaderIfAvailable(headers, AmqpHeaders.DELAY, Integer.class), - amqpMessageProperties::setDelay) .acceptIfNotNull(getHeaderIfAvailable(headers, AmqpHeaders.DELAY, Long.class), amqpMessageProperties::setDelayLong) .acceptIfNotNull(getHeaderIfAvailable(headers, AmqpHeaders.DELIVERY_MODE, MessageDeliveryMode.class), @@ -152,7 +151,7 @@ public MessageHeaders toHeaders(MessageProperties amqpMessageProperties) { javaUtils .acceptIfCondition(priority != null && priority > 0, AmqpMessageHeaderAccessor.PRIORITY, priority, putObject) - .acceptIfNotNull(AmqpHeaders.RECEIVED_DELAY, amqpMessageProperties.getReceivedDelay(), putObject) + .acceptIfNotNull(AmqpHeaders.RECEIVED_DELAY, amqpMessageProperties.getReceivedDelayLong(), putObject) .acceptIfHasText(AmqpHeaders.RECEIVED_EXCHANGE, amqpMessageProperties.getReceivedExchange(), putString) .acceptIfHasText(AmqpHeaders.RECEIVED_ROUTING_KEY, amqpMessageProperties.getReceivedRoutingKey(), diff --git a/spring-amqp/src/test/java/org/springframework/amqp/core/MessagePropertiesTests.java b/spring-amqp/src/test/java/org/springframework/amqp/core/MessagePropertiesTests.java index 6a1df9d995..58ed50f4f5 100644 --- a/spring-amqp/src/test/java/org/springframework/amqp/core/MessagePropertiesTests.java +++ b/spring-amqp/src/test/java/org/springframework/amqp/core/MessagePropertiesTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2019 the original author or authors. + * Copyright 2002-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -30,6 +30,7 @@ * @author Artem Bilan * @author Gary Russell * @author Csaba Soti + * @author Raylax Grey * */ public class MessagePropertiesTests { @@ -55,7 +56,7 @@ public void testDelayHeader() { MessageProperties properties = new MessageProperties(); Integer delay = 100; properties.setDelay(delay); - assertThat(properties.getHeaders().get(MessageProperties.X_DELAY)).isEqualTo(delay); + assertThat(properties.getHeaders().get(MessageProperties.X_DELAY)).isEqualTo(100L); properties.setDelay(null); assertThat(properties.getHeaders().containsKey(MessageProperties.X_DELAY)).isFalse(); } diff --git a/spring-amqp/src/test/java/org/springframework/amqp/support/SimpleAmqpHeaderMapperTests.java b/spring-amqp/src/test/java/org/springframework/amqp/support/SimpleAmqpHeaderMapperTests.java index e1f7eebe83..d99e2d1ad0 100644 --- a/spring-amqp/src/test/java/org/springframework/amqp/support/SimpleAmqpHeaderMapperTests.java +++ b/spring-amqp/src/test/java/org/springframework/amqp/support/SimpleAmqpHeaderMapperTests.java @@ -17,6 +17,7 @@ package org.springframework.amqp.support; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.assertj.core.api.Assertions.fail; import java.util.Date; @@ -37,6 +38,7 @@ * @author Mark Fisher * @author Gary Russell * @author Oleg Zhurakousky + * @author Raylax Grey */ public class SimpleAmqpHeaderMapperTests { @@ -51,7 +53,7 @@ public void fromHeaders() { headerMap.put(AmqpHeaders.CONTENT_TYPE, "test.contentType"); String testCorrelationId = "foo"; headerMap.put(AmqpHeaders.CORRELATION_ID, testCorrelationId); - headerMap.put(AmqpHeaders.DELAY, 1234); + headerMap.put(AmqpHeaders.DELAY, 1234L); headerMap.put(AmqpHeaders.DELIVERY_MODE, MessageDeliveryMode.NON_PERSISTENT); headerMap.put(AmqpHeaders.DELIVERY_TAG, 1234L); headerMap.put(AmqpHeaders.EXPIRATION, "test.expiration"); @@ -108,9 +110,15 @@ public void fromHeadersWithLongDelay() { amqpProperties.setDelayLong(5678L); assertThat(amqpProperties.getDelayLong()).isEqualTo(Long.valueOf(5678)); - amqpProperties.setDelay(123); - assertThat(amqpProperties.getDelayLong()).isNull(); + amqpProperties.setDelayLong(null); + assertThat(amqpProperties.getHeaders().containsKey(AmqpHeaders.DELAY)).isFalse(); + + amqpProperties.setDelayLong(MessageProperties.X_DELAY_MAX); + assertThat(amqpProperties.getDelayLong()).isEqualTo(Long.valueOf(MessageProperties.X_DELAY_MAX)); + assertThatThrownBy(() -> amqpProperties.setDelayLong(MessageProperties.X_DELAY_MAX + 1)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Delay cannot exceed"); } @@ -171,7 +179,7 @@ public void toHeaders() { assertThat(headerMap.get(AmqpHeaders.EXPIRATION)).isEqualTo("test.expiration"); assertThat(headerMap.get(AmqpHeaders.MESSAGE_COUNT)).isEqualTo(42); assertThat(headerMap.get(AmqpHeaders.MESSAGE_ID)).isEqualTo("test.messageId"); - assertThat(headerMap.get(AmqpHeaders.RECEIVED_DELAY)).isEqualTo(1234); + assertThat(headerMap.get(AmqpHeaders.RECEIVED_DELAY)).isEqualTo(1234L); assertThat(headerMap.get(AmqpHeaders.RECEIVED_EXCHANGE)).isEqualTo("test.receivedExchange"); assertThat(headerMap.get(AmqpHeaders.RECEIVED_ROUTING_KEY)).isEqualTo("test.receivedRoutingKey"); assertThat(headerMap.get(AmqpHeaders.REPLY_TO)).isEqualTo("test.replyTo"); diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/support/DefaultMessagePropertiesConverter.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/support/DefaultMessagePropertiesConverter.java index 72053d2846..79ab7dc061 100644 --- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/support/DefaultMessagePropertiesConverter.java +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/support/DefaultMessagePropertiesConverter.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2022 the original author or authors. + * Copyright 2002-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -40,6 +40,7 @@ * @author Mark Fisher * @author Gary Russell * @author Soeren Unruh + * @author Raylax Grey * @since 1.0 */ public class DefaultMessagePropertiesConverter implements MessagePropertiesConverter { @@ -92,8 +93,10 @@ public MessageProperties toMessageProperties(final BasicProperties source, final String key = entry.getKey(); if (MessageProperties.X_DELAY.equals(key)) { Object value = entry.getValue(); - if (value instanceof Integer integ) { - target.setReceivedDelay(integ); + if (value instanceof Integer intValue) { + target.setReceivedDelayLong(intValue.longValue()); + } else if (value instanceof Long longVal) { + target.setReceivedDelayLong(longVal); } } else { From 4ff6dc5177f704db53cec76304d3918074523468 Mon Sep 17 00:00:00 2001 From: raylax Date: Thu, 18 Jan 2024 22:44:26 +0800 Subject: [PATCH 3/7] Fix code style --- .../amqp/rabbit/support/DefaultMessagePropertiesConverter.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/support/DefaultMessagePropertiesConverter.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/support/DefaultMessagePropertiesConverter.java index 79ab7dc061..b5633e1156 100644 --- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/support/DefaultMessagePropertiesConverter.java +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/support/DefaultMessagePropertiesConverter.java @@ -95,7 +95,8 @@ public MessageProperties toMessageProperties(final BasicProperties source, final Object value = entry.getValue(); if (value instanceof Integer intValue) { target.setReceivedDelayLong(intValue.longValue()); - } else if (value instanceof Long longVal) { + } + else if (value instanceof Long longVal) { target.setReceivedDelayLong(longVal); } } From 677af59ef166eff0eeeff410109f34c110186ebc Mon Sep 17 00:00:00 2001 From: raylax Date: Thu, 18 Jan 2024 23:10:57 +0800 Subject: [PATCH 4/7] Fix code style --- .../java/org/springframework/amqp/core/MessageProperties.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/spring-amqp/src/main/java/org/springframework/amqp/core/MessageProperties.java b/spring-amqp/src/main/java/org/springframework/amqp/core/MessageProperties.java index a9cae9998f..c08f712a9c 100644 --- a/spring-amqp/src/main/java/org/springframework/amqp/core/MessageProperties.java +++ b/spring-amqp/src/main/java/org/springframework/amqp/core/MessageProperties.java @@ -381,7 +381,6 @@ public void setReceivedDelay(Integer receivedDelay) { /** * When a delayed message exchange is used the x-delay header on a * received message contains the delay. - * * @return the received delay. * @since 3.1.2 * @see #getDelayLong() @@ -393,7 +392,6 @@ public Long getReceivedDelayLong() { /** * When a delayed message exchange is used the x-delay header on a * received message contains the delay. - * * @param receivedDelay the received delay. * @since 3.1.2 * @see #setDelayLong(Long) From 408384a970c542784e3bc496aab1d7034fc426f9 Mon Sep 17 00:00:00 2001 From: raylax Date: Thu, 18 Jan 2024 23:41:46 +0800 Subject: [PATCH 5/7] Update deprecated API --- .../support/DefaultMessagePropertiesConverter.java | 5 ++++- .../amqp/rabbit/core/RabbitAdminIntegrationTests.java | 11 ++++++----- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/support/DefaultMessagePropertiesConverter.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/support/DefaultMessagePropertiesConverter.java index b5633e1156..84c29dfeb7 100644 --- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/support/DefaultMessagePropertiesConverter.java +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/support/DefaultMessagePropertiesConverter.java @@ -94,10 +94,13 @@ public MessageProperties toMessageProperties(final BasicProperties source, final if (MessageProperties.X_DELAY.equals(key)) { Object value = entry.getValue(); if (value instanceof Integer intValue) { - target.setReceivedDelayLong(intValue.longValue()); + long receivedDelayLongValue = intValue.longValue(); + target.setReceivedDelayLong(receivedDelayLongValue); + target.setHeader(key, receivedDelayLongValue); } else if (value instanceof Long longVal) { target.setReceivedDelayLong(longVal); + target.setHeader(key, longVal); } } else { diff --git a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/RabbitAdminIntegrationTests.java b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/RabbitAdminIntegrationTests.java index abf8f76cb3..71d9792a2e 100644 --- a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/RabbitAdminIntegrationTests.java +++ b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/RabbitAdminIntegrationTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2022 the original author or authors. + * Copyright 2002-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -59,6 +59,7 @@ * @author Gary Russell * @author Gunnar Hillert * @author Artem Bilan + * @author Raylax Grey */ @RabbitAvailable(management = true) public class RabbitAdminIntegrationTests extends NeedsManagementTests { @@ -397,20 +398,20 @@ public void testDeclareDelayedExchange() throws Exception { RabbitTemplate template = new RabbitTemplate(this.connectionFactory); template.setReceiveTimeout(10000); template.convertAndSend(exchangeName, queue.getName(), "foo", message -> { - message.getMessageProperties().setDelay(1000); + message.getMessageProperties().setDelayLong(1000L); return message; }); MessageProperties properties = new MessageProperties(); - properties.setDelay(500); + properties.setDelayLong(500L); template.send(exchangeName, queue.getName(), MessageBuilder.withBody("foo".getBytes()).andProperties(properties).build()); long t1 = System.currentTimeMillis(); Message received = template.receive(queue.getName()); assertThat(received).isNotNull(); - assertThat(received.getMessageProperties().getReceivedDelay()).isEqualTo(Integer.valueOf(500)); + assertThat(received.getMessageProperties().getDelayLong()).isEqualTo(Long.valueOf(500L)); received = template.receive(queue.getName()); assertThat(received).isNotNull(); - assertThat(received.getMessageProperties().getReceivedDelay()).isEqualTo(Integer.valueOf(1000)); + assertThat(received.getMessageProperties().getDelayLong()).isEqualTo(Long.valueOf(1000L)); assertThat(System.currentTimeMillis() - t1).isGreaterThan(950L); Map exchange2 = getExchange(exchangeName); From d841817e7bbb4031588d7ec3ebcec1f8ba7938c9 Mon Sep 17 00:00:00 2001 From: raylax Date: Thu, 18 Jan 2024 23:42:26 +0800 Subject: [PATCH 6/7] Fix code style --- .../java/org/springframework/amqp/core/MessageProperties.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/spring-amqp/src/main/java/org/springframework/amqp/core/MessageProperties.java b/spring-amqp/src/main/java/org/springframework/amqp/core/MessageProperties.java index c08f712a9c..2532600831 100644 --- a/spring-amqp/src/main/java/org/springframework/amqp/core/MessageProperties.java +++ b/spring-amqp/src/main/java/org/springframework/amqp/core/MessageProperties.java @@ -488,7 +488,6 @@ public void setDelay(Integer delay) { /** * Get the x-delay header long value. - * * @return the delay. * @since 3.1.2 */ @@ -504,7 +503,6 @@ public Long getDelayLong() { /** * Set the x-delay header to a long value. - * * @param delay the delay. * @since 3.1.2 */ From 13cd9d625984188f210138259f7b5b8a37b7f2ea Mon Sep 17 00:00:00 2001 From: raylax Date: Sat, 20 Jan 2024 13:21:29 +0800 Subject: [PATCH 7/7] Remove deprecated API usage --- .../org/springframework/amqp/core/MessageProperties.java | 6 +++++- .../springframework/amqp/core/MessagePropertiesTests.java | 8 ++++---- .../amqp/support/SimpleAmqpHeaderMapperTests.java | 4 ++-- 3 files changed, 11 insertions(+), 7 deletions(-) diff --git a/spring-amqp/src/main/java/org/springframework/amqp/core/MessageProperties.java b/spring-amqp/src/main/java/org/springframework/amqp/core/MessageProperties.java index 2532600831..f999acc0a3 100644 --- a/spring-amqp/src/main/java/org/springframework/amqp/core/MessageProperties.java +++ b/spring-amqp/src/main/java/org/springframework/amqp/core/MessageProperties.java @@ -69,7 +69,11 @@ public class MessageProperties implements Serializable { public static final Integer DEFAULT_PRIORITY = 0; - public static final long X_DELAY_MAX = 0xffffffffL; // (2 ^ 32) - 1 + /** + * The maximum value of x-delay header. + * @since 3.1.2 + */ + public static final long X_DELAY_MAX = 0xffffffffL; private final Map headers = new HashMap<>(); diff --git a/spring-amqp/src/test/java/org/springframework/amqp/core/MessagePropertiesTests.java b/spring-amqp/src/test/java/org/springframework/amqp/core/MessagePropertiesTests.java index 58ed50f4f5..2d080e8ea4 100644 --- a/spring-amqp/src/test/java/org/springframework/amqp/core/MessagePropertiesTests.java +++ b/spring-amqp/src/test/java/org/springframework/amqp/core/MessagePropertiesTests.java @@ -54,10 +54,10 @@ public void testReplyToNullByDefault() { @Test public void testDelayHeader() { MessageProperties properties = new MessageProperties(); - Integer delay = 100; - properties.setDelay(delay); - assertThat(properties.getHeaders().get(MessageProperties.X_DELAY)).isEqualTo(100L); - properties.setDelay(null); + Long delay = 100L; + properties.setDelayLong(delay); + assertThat(properties.getHeaders().get(MessageProperties.X_DELAY)).isEqualTo(delay); + properties.setDelayLong(null); assertThat(properties.getHeaders().containsKey(MessageProperties.X_DELAY)).isFalse(); } diff --git a/spring-amqp/src/test/java/org/springframework/amqp/support/SimpleAmqpHeaderMapperTests.java b/spring-amqp/src/test/java/org/springframework/amqp/support/SimpleAmqpHeaderMapperTests.java index d99e2d1ad0..20967652f3 100644 --- a/spring-amqp/src/test/java/org/springframework/amqp/support/SimpleAmqpHeaderMapperTests.java +++ b/spring-amqp/src/test/java/org/springframework/amqp/support/SimpleAmqpHeaderMapperTests.java @@ -94,7 +94,7 @@ public void fromHeaders() { assertThat(amqpProperties.getTimestamp()).isEqualTo(testTimestamp); assertThat(amqpProperties.getType()).isEqualTo("test.type"); assertThat(amqpProperties.getUserId()).isEqualTo("test.userId"); - assertThat(amqpProperties.getDelay()).isEqualTo(Integer.valueOf(1234)); + assertThat(amqpProperties.getDelayLong()).isEqualTo(Long.valueOf(1234)); } @Test @@ -154,7 +154,7 @@ public void toHeaders() { amqpProperties.setMessageCount(42); amqpProperties.setMessageId("test.messageId"); amqpProperties.setPriority(22); - amqpProperties.setReceivedDelay(1234); + amqpProperties.setReceivedDelayLong(1234L); amqpProperties.setReceivedExchange("test.receivedExchange"); amqpProperties.setReceivedRoutingKey("test.receivedRoutingKey"); amqpProperties.setRedelivered(true);