Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-2602: Fix x-delay header to Long #2603

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2022 the original author or authors.
* Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -24,6 +24,8 @@
import java.util.List;
import java.util.Map;

import org.springframework.util.Assert;

/**
* Message Properties for an AMQP message.
*
Expand All @@ -33,6 +35,7 @@
* @author Dmitry Chernyshov
* @author Artem Bilan
* @author Csaba Soti
* @author Raylax Grey
*/
public class MessageProperties implements Serializable {

Expand Down Expand Up @@ -66,6 +69,12 @@ public class MessageProperties implements Serializable {

public static final Integer DEFAULT_PRIORITY = 0;

/**
* The maximum value of x-delay header.
* @since 3.1.2
*/
public static final long X_DELAY_MAX = 0xffffffffL;

private final Map<String, Object> headers = new HashMap<>();

private Date timestamp;
Expand Down Expand Up @@ -118,7 +127,7 @@ public class MessageProperties implements Serializable {

private String consumerQueue;

private Integer receivedDelay;
private Long receivedDelay;

private MessageDeliveryMode receivedDeliveryMode;

Expand Down Expand Up @@ -352,19 +361,46 @@ public String getReceivedRoutingKey() {
* received message contains the delay.
* @return the received delay.
* @since 1.6
* @deprecated in favor of {@link #getReceivedDelayLong()}
* @see #getDelay()
*/
@Deprecated(since = "3.1.2", forRemoval = true)
public Integer getReceivedDelay() {
return this.receivedDelay;
Long receivedDelay = getReceivedDelayLong();
return receivedDelay != null ? Math.toIntExact(receivedDelay) : null;
}

/**
* When a delayed message exchange is used the x-delay header on a
* received message contains the delay.
* @param receivedDelay the received delay.
* @since 1.6
* @deprecated in favor of {@link #setReceivedDelayLong(Long)}
*/
@Deprecated(since = "3.1.2", forRemoval = true)
public void setReceivedDelay(Integer receivedDelay) {
setReceivedDelayLong(receivedDelay != null ? receivedDelay.longValue() : null);
}

/**
* When a delayed message exchange is used the x-delay header on a
* received message contains the delay.
* @return the received delay.
* @since 3.1.2
* @see #getDelayLong()
*/
public Long getReceivedDelayLong() {
return this.receivedDelay;
}

/**
* When a delayed message exchange is used the x-delay header on a
* received message contains the delay.
* @param receivedDelay the received delay.
* @since 3.1.2
* @see #setDelayLong(Long)
*/
public void setReceivedDelayLong(Long receivedDelay) {
this.receivedDelay = receivedDelay;
}

Expand Down Expand Up @@ -434,30 +470,54 @@ public void setConsumerQueue(String consumerQueue) {
* The x-delay header (outbound).
* @return the delay.
* @since 1.6
* @deprecated in favor of {@link #getDelayLong()}
raylax marked this conversation as resolved.
Show resolved Hide resolved
* @see #getReceivedDelay()
*/
@Deprecated(since = "3.1.2", forRemoval = true)
public Integer getDelay() {
Long delay = getDelayLong();
return delay != null ? Math.toIntExact(delay) : null;
}

/**
* Set the x-delay header.
* @param delay the delay.
* @since 1.6
* @deprecated in favor of {@link #setDelayLong(Long)}
raylax marked this conversation as resolved.
Show resolved Hide resolved
*/
@Deprecated(since = "3.1.2", forRemoval = true)
public void setDelay(Integer delay) {
setDelayLong(delay != null ? delay.longValue() : null);
}

/**
* Get the x-delay header long value.
* @return the delay.
* @since 3.1.2
*/
public Long getDelayLong() {
Object delay = this.headers.get(X_DELAY);
if (delay instanceof Integer) {
return (Integer) delay;
if (delay instanceof Long) {
return (Long) delay;
}
else {
return null;
}
}

/**
* Set the x-delay header.
* Set the x-delay header to a long value.
* @param delay the delay.
* @since 1.6
* @since 3.1.2
*/
public void setDelay(Integer delay) {
public void setDelayLong(Long delay) {
if (delay == null || delay < 0) {
this.headers.remove(X_DELAY);
return;
}
else {
this.headers.put(X_DELAY, delay);
}

Assert.isTrue(delay <= X_DELAY_MAX, "Delay cannot exceed " + X_DELAY_MAX);
this.headers.put(X_DELAY, delay);
}

public boolean isFinalRetryForMessageWithNoId() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -48,6 +48,7 @@
* @author Gary Russell
* @author Artem Bilan
* @author Stephane Nicoll
* @author Raylax Grey
* @since 1.4
*/
public class SimpleAmqpHeaderMapper extends AbstractHeaderMapper<MessageProperties> implements AmqpHeaderMapper {
Expand All @@ -69,8 +70,8 @@ public void fromHeaders(MessageHeaders headers, MessageProperties amqpMessagePro
amqpMessageProperties.setCorrelationId((String) correlationId);
}
javaUtils
.acceptIfNotNull(getHeaderIfAvailable(headers, AmqpHeaders.DELAY, Integer.class),
amqpMessageProperties::setDelay)
.acceptIfNotNull(getHeaderIfAvailable(headers, AmqpHeaders.DELAY, Long.class),
amqpMessageProperties::setDelayLong)
.acceptIfNotNull(getHeaderIfAvailable(headers, AmqpHeaders.DELIVERY_MODE, MessageDeliveryMode.class),
amqpMessageProperties::setDeliveryMode)
.acceptIfNotNull(getHeaderIfAvailable(headers, AmqpHeaders.DELIVERY_TAG, Long.class),
Expand Down Expand Up @@ -150,7 +151,7 @@ public MessageHeaders toHeaders(MessageProperties amqpMessageProperties) {
javaUtils
.acceptIfCondition(priority != null && priority > 0, AmqpMessageHeaderAccessor.PRIORITY, priority,
putObject)
.acceptIfNotNull(AmqpHeaders.RECEIVED_DELAY, amqpMessageProperties.getReceivedDelay(), putObject)
.acceptIfNotNull(AmqpHeaders.RECEIVED_DELAY, amqpMessageProperties.getReceivedDelayLong(), putObject)
.acceptIfHasText(AmqpHeaders.RECEIVED_EXCHANGE, amqpMessageProperties.getReceivedExchange(),
putString)
.acceptIfHasText(AmqpHeaders.RECEIVED_ROUTING_KEY, amqpMessageProperties.getReceivedRoutingKey(),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -30,6 +30,7 @@
* @author Artem Bilan
* @author Gary Russell
* @author Csaba Soti
* @author Raylax Grey
*
*/
public class MessagePropertiesTests {
Expand All @@ -53,10 +54,10 @@ public void testReplyToNullByDefault() {
@Test
public void testDelayHeader() {
MessageProperties properties = new MessageProperties();
Integer delay = 100;
properties.setDelay(delay);
Long delay = 100L;
properties.setDelayLong(delay);
assertThat(properties.getHeaders().get(MessageProperties.X_DELAY)).isEqualTo(delay);
properties.setDelay(null);
properties.setDelayLong(null);
assertThat(properties.getHeaders().containsKey(MessageProperties.X_DELAY)).isFalse();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,6 +17,7 @@
package org.springframework.amqp.support;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.assertj.core.api.Assertions.fail;

import java.util.Date;
Expand All @@ -37,6 +38,7 @@
* @author Mark Fisher
* @author Gary Russell
* @author Oleg Zhurakousky
* @author Raylax Grey
*/
public class SimpleAmqpHeaderMapperTests {

Expand All @@ -51,7 +53,7 @@ public void fromHeaders() {
headerMap.put(AmqpHeaders.CONTENT_TYPE, "test.contentType");
String testCorrelationId = "foo";
headerMap.put(AmqpHeaders.CORRELATION_ID, testCorrelationId);
headerMap.put(AmqpHeaders.DELAY, 1234);
headerMap.put(AmqpHeaders.DELAY, 1234L);
headerMap.put(AmqpHeaders.DELIVERY_MODE, MessageDeliveryMode.NON_PERSISTENT);
headerMap.put(AmqpHeaders.DELIVERY_TAG, 1234L);
headerMap.put(AmqpHeaders.EXPIRATION, "test.expiration");
Expand Down Expand Up @@ -92,9 +94,35 @@ public void fromHeaders() {
assertThat(amqpProperties.getTimestamp()).isEqualTo(testTimestamp);
assertThat(amqpProperties.getType()).isEqualTo("test.type");
assertThat(amqpProperties.getUserId()).isEqualTo("test.userId");
assertThat(amqpProperties.getDelay()).isEqualTo(Integer.valueOf(1234));
assertThat(amqpProperties.getDelayLong()).isEqualTo(Long.valueOf(1234));
}

@Test
public void fromHeadersWithLongDelay() {
SimpleAmqpHeaderMapper headerMapper = new SimpleAmqpHeaderMapper();
Map<String, Object> headerMap = new HashMap<>();
headerMap.put(AmqpHeaders.DELAY, 1234L);
MessageHeaders messageHeaders = new MessageHeaders(headerMap);
MessageProperties amqpProperties = new MessageProperties();
headerMapper.fromHeaders(messageHeaders, amqpProperties);
assertThat(amqpProperties.getDelayLong()).isEqualTo(Long.valueOf(1234));

amqpProperties.setDelayLong(5678L);
assertThat(amqpProperties.getDelayLong()).isEqualTo(Long.valueOf(5678));

amqpProperties.setDelayLong(null);
assertThat(amqpProperties.getHeaders().containsKey(AmqpHeaders.DELAY)).isFalse();

amqpProperties.setDelayLong(MessageProperties.X_DELAY_MAX);
assertThat(amqpProperties.getDelayLong()).isEqualTo(Long.valueOf(MessageProperties.X_DELAY_MAX));

assertThatThrownBy(() -> amqpProperties.setDelayLong(MessageProperties.X_DELAY_MAX + 1))
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("Delay cannot exceed");

}


@Test
public void fromHeadersWithContentTypeAsMediaType() {
SimpleAmqpHeaderMapper headerMapper = new SimpleAmqpHeaderMapper();
Expand Down Expand Up @@ -126,7 +154,7 @@ public void toHeaders() {
amqpProperties.setMessageCount(42);
amqpProperties.setMessageId("test.messageId");
amqpProperties.setPriority(22);
amqpProperties.setReceivedDelay(1234);
amqpProperties.setReceivedDelayLong(1234L);
amqpProperties.setReceivedExchange("test.receivedExchange");
amqpProperties.setReceivedRoutingKey("test.receivedRoutingKey");
amqpProperties.setRedelivered(true);
Expand All @@ -151,7 +179,7 @@ public void toHeaders() {
assertThat(headerMap.get(AmqpHeaders.EXPIRATION)).isEqualTo("test.expiration");
assertThat(headerMap.get(AmqpHeaders.MESSAGE_COUNT)).isEqualTo(42);
assertThat(headerMap.get(AmqpHeaders.MESSAGE_ID)).isEqualTo("test.messageId");
assertThat(headerMap.get(AmqpHeaders.RECEIVED_DELAY)).isEqualTo(1234);
assertThat(headerMap.get(AmqpHeaders.RECEIVED_DELAY)).isEqualTo(1234L);
assertThat(headerMap.get(AmqpHeaders.RECEIVED_EXCHANGE)).isEqualTo("test.receivedExchange");
assertThat(headerMap.get(AmqpHeaders.RECEIVED_ROUTING_KEY)).isEqualTo("test.receivedRoutingKey");
assertThat(headerMap.get(AmqpHeaders.REPLY_TO)).isEqualTo("test.replyTo");
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2022 the original author or authors.
* Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -40,6 +40,7 @@
* @author Mark Fisher
* @author Gary Russell
* @author Soeren Unruh
* @author Raylax Grey
* @since 1.0
*/
public class DefaultMessagePropertiesConverter implements MessagePropertiesConverter {
Expand Down Expand Up @@ -92,8 +93,14 @@ public MessageProperties toMessageProperties(final BasicProperties source, final
String key = entry.getKey();
if (MessageProperties.X_DELAY.equals(key)) {
Object value = entry.getValue();
if (value instanceof Integer integ) {
target.setReceivedDelay(integ);
if (value instanceof Integer intValue) {
long receivedDelayLongValue = intValue.longValue();
target.setReceivedDelayLong(receivedDelayLongValue);
target.setHeader(key, receivedDelayLongValue);
}
else if (value instanceof Long longVal) {
target.setReceivedDelayLong(longVal);
target.setHeader(key, longVal);
}
}
else {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2022 the original author or authors.
* Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -59,6 +59,7 @@
* @author Gary Russell
* @author Gunnar Hillert
* @author Artem Bilan
* @author Raylax Grey
*/
@RabbitAvailable(management = true)
public class RabbitAdminIntegrationTests extends NeedsManagementTests {
Expand Down Expand Up @@ -397,20 +398,20 @@ public void testDeclareDelayedExchange() throws Exception {
RabbitTemplate template = new RabbitTemplate(this.connectionFactory);
template.setReceiveTimeout(10000);
template.convertAndSend(exchangeName, queue.getName(), "foo", message -> {
message.getMessageProperties().setDelay(1000);
message.getMessageProperties().setDelayLong(1000L);
return message;
});
MessageProperties properties = new MessageProperties();
properties.setDelay(500);
properties.setDelayLong(500L);
template.send(exchangeName, queue.getName(),
MessageBuilder.withBody("foo".getBytes()).andProperties(properties).build());
long t1 = System.currentTimeMillis();
Message received = template.receive(queue.getName());
assertThat(received).isNotNull();
assertThat(received.getMessageProperties().getReceivedDelay()).isEqualTo(Integer.valueOf(500));
assertThat(received.getMessageProperties().getDelayLong()).isEqualTo(Long.valueOf(500L));
received = template.receive(queue.getName());
assertThat(received).isNotNull();
assertThat(received.getMessageProperties().getReceivedDelay()).isEqualTo(Integer.valueOf(1000));
assertThat(received.getMessageProperties().getDelayLong()).isEqualTo(Long.valueOf(1000L));
assertThat(System.currentTimeMillis() - t1).isGreaterThan(950L);

Map<String, Object> exchange2 = getExchange(exchangeName);
Expand Down