Skip to content

Commit

Permalink
GH-2602: Fix x-delay header to Long
Browse files Browse the repository at this point in the history
Fixes: #2602

* Update deprecated API
* Fix code style
* Remove deprecated API usage
* Some code clean of the affected classes
  • Loading branch information
raylax authored and artembilan committed Jan 22, 2024
1 parent cb15cf5 commit 966338e
Show file tree
Hide file tree
Showing 6 changed files with 149 additions and 57 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2022 the original author or authors.
* Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -24,6 +24,8 @@
import java.util.List;
import java.util.Map;

import org.springframework.util.Assert;

/**
* Message Properties for an AMQP message.
*
Expand All @@ -33,6 +35,7 @@
* @author Dmitry Chernyshov
* @author Artem Bilan
* @author Csaba Soti
* @author Raylax Grey
*/
public class MessageProperties implements Serializable {

Expand Down Expand Up @@ -66,6 +69,12 @@ public class MessageProperties implements Serializable {

public static final Integer DEFAULT_PRIORITY = 0;

/**
* The maximum value of x-delay header.
* @since 3.1.2
*/
public static final long X_DELAY_MAX = 0xffffffffL;

private final Map<String, Object> headers = new HashMap<>();

private Date timestamp;
Expand Down Expand Up @@ -118,7 +127,7 @@ public class MessageProperties implements Serializable {

private String consumerQueue;

private Integer receivedDelay;
private Long receivedDelay;

private MessageDeliveryMode receivedDeliveryMode;

Expand Down Expand Up @@ -352,19 +361,46 @@ public String getReceivedRoutingKey() {
* received message contains the delay.
* @return the received delay.
* @since 1.6
* @deprecated in favor of {@link #getReceivedDelayLong()}
* @see #getDelay()
*/
@Deprecated(since = "3.1.2", forRemoval = true)
public Integer getReceivedDelay() {
return this.receivedDelay;
Long receivedDelay = getReceivedDelayLong();
return receivedDelay != null ? Math.toIntExact(receivedDelay) : null;
}

/**
* When a delayed message exchange is used the x-delay header on a
* received message contains the delay.
* @param receivedDelay the received delay.
* @since 1.6
* @deprecated in favor of {@link #setReceivedDelayLong(Long)}
*/
@Deprecated(since = "3.1.2", forRemoval = true)
public void setReceivedDelay(Integer receivedDelay) {
setReceivedDelayLong(receivedDelay != null ? receivedDelay.longValue() : null);
}

/**
* When a delayed message exchange is used the x-delay header on a
* received message contains the delay.
* @return the received delay.
* @since 3.1.2
* @see #getDelayLong()
*/
public Long getReceivedDelayLong() {
return this.receivedDelay;
}

/**
* When a delayed message exchange is used the x-delay header on a
* received message contains the delay.
* @param receivedDelay the received delay.
* @since 3.1.2
* @see #setDelayLong(Long)
*/
public void setReceivedDelayLong(Long receivedDelay) {
this.receivedDelay = receivedDelay;
}

Expand Down Expand Up @@ -434,30 +470,54 @@ public void setConsumerQueue(String consumerQueue) {
* The x-delay header (outbound).
* @return the delay.
* @since 1.6
* @deprecated in favor of {@link #getDelayLong()}
* @see #getReceivedDelay()
*/
@Deprecated(since = "3.1.2", forRemoval = true)
public Integer getDelay() {
Long delay = getDelayLong();
return delay != null ? Math.toIntExact(delay) : null;
}

/**
* Set the x-delay header.
* @param delay the delay.
* @since 1.6
* @deprecated in favor of {@link #setDelayLong(Long)}
*/
@Deprecated(since = "3.1.2", forRemoval = true)
public void setDelay(Integer delay) {
setDelayLong(delay != null ? delay.longValue() : null);
}

/**
* Get the x-delay header long value.
* @return the delay.
* @since 3.1.2
*/
public Long getDelayLong() {
Object delay = this.headers.get(X_DELAY);
if (delay instanceof Integer) {
return (Integer) delay;
if (delay instanceof Long) {
return (Long) delay;
}
else {
return null;
}
}

/**
* Set the x-delay header.
* Set the x-delay header to a long value.
* @param delay the delay.
* @since 1.6
* @since 3.1.2
*/
public void setDelay(Integer delay) {
public void setDelayLong(Long delay) {
if (delay == null || delay < 0) {
this.headers.remove(X_DELAY);
return;
}
else {
this.headers.put(X_DELAY, delay);
}

Assert.isTrue(delay <= X_DELAY_MAX, "Delay cannot exceed " + X_DELAY_MAX);
this.headers.put(X_DELAY, delay);
}

public boolean isFinalRetryForMessageWithNoId() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -48,6 +48,7 @@
* @author Gary Russell
* @author Artem Bilan
* @author Stephane Nicoll
* @author Raylax Grey
* @since 1.4
*/
public class SimpleAmqpHeaderMapper extends AbstractHeaderMapper<MessageProperties> implements AmqpHeaderMapper {
Expand All @@ -69,8 +70,8 @@ public void fromHeaders(MessageHeaders headers, MessageProperties amqpMessagePro
amqpMessageProperties.setCorrelationId((String) correlationId);
}
javaUtils
.acceptIfNotNull(getHeaderIfAvailable(headers, AmqpHeaders.DELAY, Integer.class),
amqpMessageProperties::setDelay)
.acceptIfNotNull(getHeaderIfAvailable(headers, AmqpHeaders.DELAY, Long.class),
amqpMessageProperties::setDelayLong)
.acceptIfNotNull(getHeaderIfAvailable(headers, AmqpHeaders.DELIVERY_MODE, MessageDeliveryMode.class),
amqpMessageProperties::setDeliveryMode)
.acceptIfNotNull(getHeaderIfAvailable(headers, AmqpHeaders.DELIVERY_TAG, Long.class),
Expand Down Expand Up @@ -150,7 +151,7 @@ public MessageHeaders toHeaders(MessageProperties amqpMessageProperties) {
javaUtils
.acceptIfCondition(priority != null && priority > 0, AmqpMessageHeaderAccessor.PRIORITY, priority,
putObject)
.acceptIfNotNull(AmqpHeaders.RECEIVED_DELAY, amqpMessageProperties.getReceivedDelay(), putObject)
.acceptIfNotNull(AmqpHeaders.RECEIVED_DELAY, amqpMessageProperties.getReceivedDelayLong(), putObject)
.acceptIfHasText(AmqpHeaders.RECEIVED_EXCHANGE, amqpMessageProperties.getReceivedExchange(),
putString)
.acceptIfHasText(AmqpHeaders.RECEIVED_ROUTING_KEY, amqpMessageProperties.getReceivedRoutingKey(),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -30,6 +30,7 @@
* @author Artem Bilan
* @author Gary Russell
* @author Csaba Soti
* @author Raylax Grey
*
*/
public class MessagePropertiesTests {
Expand All @@ -53,10 +54,10 @@ public void testReplyToNullByDefault() {
@Test
public void testDelayHeader() {
MessageProperties properties = new MessageProperties();
Integer delay = 100;
properties.setDelay(delay);
Long delay = 100L;
properties.setDelayLong(delay);
assertThat(properties.getHeaders().get(MessageProperties.X_DELAY)).isEqualTo(delay);
properties.setDelay(null);
properties.setDelayLong(null);
assertThat(properties.getHeaders().containsKey(MessageProperties.X_DELAY)).isFalse();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,6 +17,7 @@
package org.springframework.amqp.support;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.assertj.core.api.Assertions.fail;

import java.util.Date;
Expand All @@ -37,21 +38,22 @@
* @author Mark Fisher
* @author Gary Russell
* @author Oleg Zhurakousky
* @author Raylax Grey
*/
public class SimpleAmqpHeaderMapperTests {

@Test
public void fromHeaders() {
SimpleAmqpHeaderMapper headerMapper = new SimpleAmqpHeaderMapper();
Map<String, Object> headerMap = new HashMap<String, Object>();
Map<String, Object> headerMap = new HashMap<>();
headerMap.put(AmqpHeaders.APP_ID, "test.appId");
headerMap.put(AmqpHeaders.CLUSTER_ID, "test.clusterId");
headerMap.put(AmqpHeaders.CONTENT_ENCODING, "test.contentEncoding");
headerMap.put(AmqpHeaders.CONTENT_LENGTH, 99L);
headerMap.put(AmqpHeaders.CONTENT_TYPE, "test.contentType");
String testCorrelationId = "foo";
headerMap.put(AmqpHeaders.CORRELATION_ID, testCorrelationId);
headerMap.put(AmqpHeaders.DELAY, 1234);
headerMap.put(AmqpHeaders.DELAY, 1234L);
headerMap.put(AmqpHeaders.DELIVERY_MODE, MessageDeliveryMode.NON_PERSISTENT);
headerMap.put(AmqpHeaders.DELIVERY_TAG, 1234L);
headerMap.put(AmqpHeaders.EXPIRATION, "test.expiration");
Expand Down Expand Up @@ -92,13 +94,39 @@ public void fromHeaders() {
assertThat(amqpProperties.getTimestamp()).isEqualTo(testTimestamp);
assertThat(amqpProperties.getType()).isEqualTo("test.type");
assertThat(amqpProperties.getUserId()).isEqualTo("test.userId");
assertThat(amqpProperties.getDelay()).isEqualTo(Integer.valueOf(1234));
assertThat(amqpProperties.getDelayLong()).isEqualTo(Long.valueOf(1234));
}

@Test
public void fromHeadersWithLongDelay() {
SimpleAmqpHeaderMapper headerMapper = new SimpleAmqpHeaderMapper();
Map<String, Object> headerMap = new HashMap<>();
headerMap.put(AmqpHeaders.DELAY, 1234L);
MessageHeaders messageHeaders = new MessageHeaders(headerMap);
MessageProperties amqpProperties = new MessageProperties();
headerMapper.fromHeaders(messageHeaders, amqpProperties);
assertThat(amqpProperties.getDelayLong()).isEqualTo(Long.valueOf(1234));

amqpProperties.setDelayLong(5678L);
assertThat(amqpProperties.getDelayLong()).isEqualTo(Long.valueOf(5678));

amqpProperties.setDelayLong(null);
assertThat(amqpProperties.getHeaders().containsKey(AmqpHeaders.DELAY)).isFalse();

amqpProperties.setDelayLong(MessageProperties.X_DELAY_MAX);
assertThat(amqpProperties.getDelayLong()).isEqualTo(Long.valueOf(MessageProperties.X_DELAY_MAX));

assertThatThrownBy(() -> amqpProperties.setDelayLong(MessageProperties.X_DELAY_MAX + 1))
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("Delay cannot exceed");

}


@Test
public void fromHeadersWithContentTypeAsMediaType() {
SimpleAmqpHeaderMapper headerMapper = new SimpleAmqpHeaderMapper();
Map<String, Object> headerMap = new HashMap<String, Object>();
Map<String, Object> headerMap = new HashMap<>();

headerMap.put(AmqpHeaders.CONTENT_TYPE, MimeTypeUtils.TEXT_HTML);

Expand Down Expand Up @@ -126,7 +154,7 @@ public void toHeaders() {
amqpProperties.setMessageCount(42);
amqpProperties.setMessageId("test.messageId");
amqpProperties.setPriority(22);
amqpProperties.setReceivedDelay(1234);
amqpProperties.setReceivedDelayLong(1234L);
amqpProperties.setReceivedExchange("test.receivedExchange");
amqpProperties.setReceivedRoutingKey("test.receivedRoutingKey");
amqpProperties.setRedelivered(true);
Expand All @@ -151,7 +179,7 @@ public void toHeaders() {
assertThat(headerMap.get(AmqpHeaders.EXPIRATION)).isEqualTo("test.expiration");
assertThat(headerMap.get(AmqpHeaders.MESSAGE_COUNT)).isEqualTo(42);
assertThat(headerMap.get(AmqpHeaders.MESSAGE_ID)).isEqualTo("test.messageId");
assertThat(headerMap.get(AmqpHeaders.RECEIVED_DELAY)).isEqualTo(1234);
assertThat(headerMap.get(AmqpHeaders.RECEIVED_DELAY)).isEqualTo(1234L);
assertThat(headerMap.get(AmqpHeaders.RECEIVED_EXCHANGE)).isEqualTo("test.receivedExchange");
assertThat(headerMap.get(AmqpHeaders.RECEIVED_ROUTING_KEY)).isEqualTo("test.receivedRoutingKey");
assertThat(headerMap.get(AmqpHeaders.REPLY_TO)).isEqualTo("test.replyTo");
Expand All @@ -170,7 +198,7 @@ public void jsonTypeIdNotOverwritten() {
Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter();
MessageProperties amqpProperties = new MessageProperties();
converter.toMessage("123", amqpProperties);
Map<String, Object> headerMap = new HashMap<String, Object>();
Map<String, Object> headerMap = new HashMap<>();
headerMap.put("__TypeId__", "java.lang.Integer");
MessageHeaders messageHeaders = new MessageHeaders(headerMap);
headerMapper.fromHeaders(messageHeaders, amqpProperties);
Expand Down
Loading

0 comments on commit 966338e

Please sign in to comment.