From 67a6857a454ead585639ae15afaac31feb587e5a Mon Sep 17 00:00:00 2001 From: Gary Russell Date: Thu, 21 Jun 2018 14:12:25 -0400 Subject: [PATCH] AMQP-821: Repub Recoverer limit stack trace header JIRA: https://jira.spring.io/browse/AMQP-821 Since headers are not fragmented, limit the stack trace header length to prevent failures. **cherry-pick to 2.0.x** --- .../connection/CachingConnectionFactory.java | 16 +++- .../amqp/rabbit/connection/Connection.java | 14 +++- .../amqp/rabbit/connection/RabbitUtils.java | 18 +++++ .../rabbit/connection/SimpleConnection.java | 3 +- .../retry/RepublishMessageRecoverer.java | 49 +++++++++++- ...blishMessageRecovererIntegrationTests.java | 79 +++++++++++++++++++ 6 files changed, 168 insertions(+), 11 deletions(-) create mode 100644 spring-rabbit/src/test/java/org/springframework/amqp/rabbit/retry/RepublishMessageRecovererIntegrationTests.java diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/CachingConnectionFactory.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/CachingConnectionFactory.java index 1c706d5136..f6db5742d5 100644 --- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/CachingConnectionFactory.java +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/CachingConnectionFactory.java @@ -889,10 +889,13 @@ public Properties getPublisherConnectionFactoryCacheProperties() { private void putConnectionName(Properties props, ConnectionProxy connection, String keySuffix) { Connection targetConnection = connection.getTargetConnection(); // NOSONAR (close()) - if (targetConnection instanceof SimpleConnection) { - String name = ((SimpleConnection) targetConnection).getDelegate().getClientProvidedName(); - if (name != null) { - props.put("connectionName" + keySuffix, name); + if (targetConnection != null) { + com.rabbitmq.client.Connection delegate = targetConnection.getDelegate(); + if (delegate != null) { + String name = delegate.getClientProvidedName(); + if (name != null) { + props.put("connectionName" + keySuffix, name); + } } } } @@ -1330,6 +1333,11 @@ public Connection getTargetConnection() { return this.target; } + @Override + public com.rabbitmq.client.Connection getDelegate() { + return this.target.getDelegate(); + } + @Override public int getLocalPort() { Connection target = this.target; // NOSONAR (close) diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/Connection.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/Connection.java index 789e4c9ec1..cb471b1b91 100755 --- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/Connection.java +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/Connection.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2017 the original author or authors. + * Copyright 2002-2018 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,6 +17,7 @@ package org.springframework.amqp.rabbit.connection; import org.springframework.amqp.AmqpException; +import org.springframework.lang.Nullable; import com.rabbitmq.client.BlockedListener; import com.rabbitmq.client.Channel; @@ -26,7 +27,7 @@ * @author Gary Russell * @author Artem Bilan */ -public interface Connection { +public interface Connection extends AutoCloseable { /** * Create a new channel, using an internally allocated channel number. @@ -45,6 +46,7 @@ public interface Connection { * * @throws AmqpException if an I/O problem is encountered */ + @Override void close() throws AmqpException; /** @@ -78,4 +80,12 @@ public interface Connection { */ boolean removeBlockedListener(BlockedListener listener); + /** + * Return the underlying RabbitMQ connection. + * @return the connection. + */ + default @Nullable com.rabbitmq.client.Connection getDelegate() { + return null; + } + } diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/RabbitUtils.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/RabbitUtils.java index 288da43fdd..2a779b4874 100644 --- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/RabbitUtils.java +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/RabbitUtils.java @@ -311,4 +311,22 @@ public static boolean isExchangeDeclarationFailure(Exception e) { } } + /** + * Return the negotiated frame_max. + * @param connectionFactory the connection factory. + * @return the size or -1 if it cannot be determined. + */ + public static int getMaxFrame(ConnectionFactory connectionFactory) { + try (Connection connection = connectionFactory.createConnection()) { + com.rabbitmq.client.Connection rcon = connection.getDelegate(); + if (rcon != null) { + return rcon.getFrameMax(); + } + } + catch (RuntimeException e) { + // NOSONAR + } + return -1; + } + } diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/SimpleConnection.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/SimpleConnection.java index edfa437e1d..ad89e2aceb 100755 --- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/SimpleConnection.java +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/SimpleConnection.java @@ -139,7 +139,8 @@ public int getPort() { return this.delegate.getPort(); } - com.rabbitmq.client.Connection getDelegate() { + @Override + public com.rabbitmq.client.Connection getDelegate() { return this.delegate; } diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/retry/RepublishMessageRecoverer.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/retry/RepublishMessageRecoverer.java index 1b872816fa..3def20aadf 100644 --- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/retry/RepublishMessageRecoverer.java +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/retry/RepublishMessageRecoverer.java @@ -27,6 +27,8 @@ import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageDeliveryMode; import org.springframework.amqp.core.MessageProperties; +import org.springframework.amqp.rabbit.connection.RabbitUtils; +import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.util.Assert; /** @@ -55,6 +57,8 @@ public class RepublishMessageRecoverer implements MessageRecoverer { public static final String X_ORIGINAL_ROUTING_KEY = "x-original-routingKey"; + public static final int DEFAULT_FRAME_MAX_HEADROOM = 20_000; + protected final Log logger = LogFactory.getLog(getClass()); protected final AmqpTemplate errorTemplate; @@ -65,6 +69,10 @@ public class RepublishMessageRecoverer implements MessageRecoverer { private String errorRoutingKeyPrefix = "error."; + private int frameMaxHeadroom = DEFAULT_FRAME_MAX_HEADROOM; + + private volatile Integer maxStackTraceLength = -1; + private MessageDeliveryMode deliveryMode = MessageDeliveryMode.PERSISTENT; public RepublishMessageRecoverer(AmqpTemplate errorTemplate) { @@ -80,6 +88,9 @@ public RepublishMessageRecoverer(AmqpTemplate errorTemplate, String errorExchang this.errorTemplate = errorTemplate; this.errorExchangeName = errorExchange; this.errorRoutingKey = errorRoutingKey; + if (!(this.errorTemplate instanceof RabbitTemplate)) { + this.maxStackTraceLength = Integer.MAX_VALUE; + } } /** @@ -94,6 +105,19 @@ public RepublishMessageRecoverer errorRoutingKeyPrefix(String errorRoutingKeyPre return this; } + /** + * Set the amount by which the negotiated frame_max is to be reduced when considering + * truncating the stack trace header. Defaults to + * {@value #DEFAULT_FRAME_MAX_HEADROOM}. + * @param headroom the headroom + * @return this. + * @since 2.0.5 + */ + public RepublishMessageRecoverer frameMaxHeadroom(int headroom) { + this.frameMaxHeadroom = headroom; + return this; + } + /** * @param errorRoutingKeyPrefix The prefix (default "error."). * @see #errorRoutingKeyPrefix(String) @@ -126,7 +150,21 @@ protected MessageDeliveryMode getDeliveryMode() { public void recover(Message message, Throwable cause) { MessageProperties messageProperties = message.getMessageProperties(); Map headers = messageProperties.getHeaders(); - headers.put(X_EXCEPTION_STACKTRACE, getStackTraceAsString(cause)); + String stackTraceAsString = getStackTraceAsString(cause); + if (this.maxStackTraceLength < 0) { + int maxStackTraceLength = RabbitUtils + .getMaxFrame(((RabbitTemplate) this.errorTemplate).getConnectionFactory()); + if (maxStackTraceLength > 0) { + maxStackTraceLength -= this.frameMaxHeadroom; + this.maxStackTraceLength = maxStackTraceLength; + } + } + if (this.maxStackTraceLength > 0 && stackTraceAsString.length() > this.maxStackTraceLength) { + stackTraceAsString = stackTraceAsString.substring(0, this.maxStackTraceLength); + this.logger.warn("Stack trace in republished message header truncated due to frame_max limitations; " + + "consider increasing frame_max on the broker or reduce the stack trace depth", cause); + } + headers.put(X_EXCEPTION_STACKTRACE, stackTraceAsString); headers.put(X_EXCEPTION_MESSAGE, cause.getCause() != null ? cause.getCause().getMessage() : cause.getMessage()); headers.put(X_ORIGINAL_EXCHANGE, messageProperties.getReceivedExchange()); headers.put(X_ORIGINAL_ROUTING_KEY, messageProperties.getReceivedRoutingKey()); @@ -140,17 +178,20 @@ public void recover(Message message, Throwable cause) { } if (null != this.errorExchangeName) { - String routingKey = this.errorRoutingKey != null ? this.errorRoutingKey : this.prefixedOriginalRoutingKey(message); + String routingKey = this.errorRoutingKey != null ? this.errorRoutingKey + : this.prefixedOriginalRoutingKey(message); this.errorTemplate.send(this.errorExchangeName, routingKey, message); if (this.logger.isWarnEnabled()) { - this.logger.warn("Republishing failed message to exchange " + this.errorExchangeName); + this.logger.warn("Republishing failed message to exchange '" + this.errorExchangeName + + "' with routing key " + routingKey); } } else { final String routingKey = this.prefixedOriginalRoutingKey(message); this.errorTemplate.send(routingKey, message); if (this.logger.isWarnEnabled()) { - this.logger.warn("Republishing failed message to the template's default exchange with routing key " + routingKey); + this.logger.warn("Republishing failed message to the template's default exchange with routing key " + + routingKey); } } } diff --git a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/retry/RepublishMessageRecovererIntegrationTests.java b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/retry/RepublishMessageRecovererIntegrationTests.java new file mode 100644 index 0000000000..b5d061423a --- /dev/null +++ b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/retry/RepublishMessageRecovererIntegrationTests.java @@ -0,0 +1,79 @@ +/* + * Copyright 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.amqp.rabbit.retry; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.PrintWriter; +import java.io.StringWriter; + +import org.junit.jupiter.api.Test; + +import org.springframework.amqp.core.Message; +import org.springframework.amqp.core.MessageProperties; +import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; +import org.springframework.amqp.rabbit.connection.RabbitUtils; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.amqp.rabbit.junit.RabbitAvailable; +import org.springframework.amqp.rabbit.junit.RabbitAvailableCondition; + +import com.rabbitmq.client.LongString; + +/** + * @author Gary Russell + * @since 2.0.5 + * + */ +@RabbitAvailable(queues = RepublishMessageRecovererIntegrationTests.BIG_HEADER_QUEUE) +public class RepublishMessageRecovererIntegrationTests { + + public static final String BIG_HEADER_QUEUE = "big.header.queue"; + + private static final String BIG_EXCEPTION_MESSAGE = new String(new byte[10_000]).replaceAll("\u0000", "x"); + + private int maxHeaderSize; + + @Test + public void testBigHeader() { + RabbitTemplate template = new RabbitTemplate( + new CachingConnectionFactory(RabbitAvailableCondition.getBrokerRunning().getConnectionFactory())); + this.maxHeaderSize = RabbitUtils.getMaxFrame(template.getConnectionFactory()) - 20_000; + assertThat(this.maxHeaderSize).isGreaterThan(0); + RepublishMessageRecoverer recoverer = new RepublishMessageRecoverer(template, "", BIG_HEADER_QUEUE); + recoverer.recover(new Message("foo".getBytes(), new MessageProperties()), + bigCause(new RuntimeException(BIG_EXCEPTION_MESSAGE))); + Message received = template.receive(BIG_HEADER_QUEUE, 10_000); + assertThat(received).isNotNull(); + assertThat(((LongString) received.getMessageProperties().getHeaders() + .get(RepublishMessageRecoverer.X_EXCEPTION_STACKTRACE)).length()).isEqualTo(this.maxHeaderSize); + } + + private Throwable bigCause(Throwable cause) { + if (getStackTraceAsString(cause).length() > this.maxHeaderSize) { + return cause; + } + return bigCause(new RuntimeException(BIG_EXCEPTION_MESSAGE, cause)); + } + + private String getStackTraceAsString(Throwable cause) { + StringWriter stringWriter = new StringWriter(); + PrintWriter printWriter = new PrintWriter(stringWriter, true); + cause.printStackTrace(printWriter); + return stringWriter.getBuffer().toString(); + } + +}