Skip to content

Commit

Permalink
AMQP-821: Repub Recoverer limit stack trace header
Browse files Browse the repository at this point in the history
JIRA: https://jira.spring.io/browse/AMQP-821

Since headers are not fragmented, limit the stack trace header length to
prevent failures.

**cherry-pick to 2.0.x**
  • Loading branch information
garyrussell authored and artembilan committed Jun 21, 2018
1 parent 4fdb1e4 commit 67a6857
Show file tree
Hide file tree
Showing 6 changed files with 168 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -889,10 +889,13 @@ public Properties getPublisherConnectionFactoryCacheProperties() {

private void putConnectionName(Properties props, ConnectionProxy connection, String keySuffix) {
Connection targetConnection = connection.getTargetConnection(); // NOSONAR (close())
if (targetConnection instanceof SimpleConnection) {
String name = ((SimpleConnection) targetConnection).getDelegate().getClientProvidedName();
if (name != null) {
props.put("connectionName" + keySuffix, name);
if (targetConnection != null) {
com.rabbitmq.client.Connection delegate = targetConnection.getDelegate();
if (delegate != null) {
String name = delegate.getClientProvidedName();
if (name != null) {
props.put("connectionName" + keySuffix, name);
}
}
}
}
Expand Down Expand Up @@ -1330,6 +1333,11 @@ public Connection getTargetConnection() {
return this.target;
}

@Override
public com.rabbitmq.client.Connection getDelegate() {
return this.target.getDelegate();
}

@Override
public int getLocalPort() {
Connection target = this.target; // NOSONAR (close)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2017 the original author or authors.
* Copyright 2002-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,6 +17,7 @@
package org.springframework.amqp.rabbit.connection;

import org.springframework.amqp.AmqpException;
import org.springframework.lang.Nullable;

import com.rabbitmq.client.BlockedListener;
import com.rabbitmq.client.Channel;
Expand All @@ -26,7 +27,7 @@
* @author Gary Russell
* @author Artem Bilan
*/
public interface Connection {
public interface Connection extends AutoCloseable {

/**
* Create a new channel, using an internally allocated channel number.
Expand All @@ -45,6 +46,7 @@ public interface Connection {
*
* @throws AmqpException if an I/O problem is encountered
*/
@Override
void close() throws AmqpException;

/**
Expand Down Expand Up @@ -78,4 +80,12 @@ public interface Connection {
*/
boolean removeBlockedListener(BlockedListener listener);

/**
* Return the underlying RabbitMQ connection.
* @return the connection.
*/
default @Nullable com.rabbitmq.client.Connection getDelegate() {
return null;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -311,4 +311,22 @@ public static boolean isExchangeDeclarationFailure(Exception e) {
}
}

/**
* Return the negotiated frame_max.
* @param connectionFactory the connection factory.
* @return the size or -1 if it cannot be determined.
*/
public static int getMaxFrame(ConnectionFactory connectionFactory) {
try (Connection connection = connectionFactory.createConnection()) {
com.rabbitmq.client.Connection rcon = connection.getDelegate();
if (rcon != null) {
return rcon.getFrameMax();
}
}
catch (RuntimeException e) {
// NOSONAR
}
return -1;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,8 @@ public int getPort() {
return this.delegate.getPort();
}

com.rabbitmq.client.Connection getDelegate() {
@Override
public com.rabbitmq.client.Connection getDelegate() {
return this.delegate;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.RabbitUtils;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.util.Assert;

/**
Expand Down Expand Up @@ -55,6 +57,8 @@ public class RepublishMessageRecoverer implements MessageRecoverer {

public static final String X_ORIGINAL_ROUTING_KEY = "x-original-routingKey";

public static final int DEFAULT_FRAME_MAX_HEADROOM = 20_000;

protected final Log logger = LogFactory.getLog(getClass());

protected final AmqpTemplate errorTemplate;
Expand All @@ -65,6 +69,10 @@ public class RepublishMessageRecoverer implements MessageRecoverer {

private String errorRoutingKeyPrefix = "error.";

private int frameMaxHeadroom = DEFAULT_FRAME_MAX_HEADROOM;

private volatile Integer maxStackTraceLength = -1;

private MessageDeliveryMode deliveryMode = MessageDeliveryMode.PERSISTENT;

public RepublishMessageRecoverer(AmqpTemplate errorTemplate) {
Expand All @@ -80,6 +88,9 @@ public RepublishMessageRecoverer(AmqpTemplate errorTemplate, String errorExchang
this.errorTemplate = errorTemplate;
this.errorExchangeName = errorExchange;
this.errorRoutingKey = errorRoutingKey;
if (!(this.errorTemplate instanceof RabbitTemplate)) {
this.maxStackTraceLength = Integer.MAX_VALUE;
}
}

/**
Expand All @@ -94,6 +105,19 @@ public RepublishMessageRecoverer errorRoutingKeyPrefix(String errorRoutingKeyPre
return this;
}

/**
* Set the amount by which the negotiated frame_max is to be reduced when considering
* truncating the stack trace header. Defaults to
* {@value #DEFAULT_FRAME_MAX_HEADROOM}.
* @param headroom the headroom
* @return this.
* @since 2.0.5
*/
public RepublishMessageRecoverer frameMaxHeadroom(int headroom) {
this.frameMaxHeadroom = headroom;
return this;
}

/**
* @param errorRoutingKeyPrefix The prefix (default "error.").
* @see #errorRoutingKeyPrefix(String)
Expand Down Expand Up @@ -126,7 +150,21 @@ protected MessageDeliveryMode getDeliveryMode() {
public void recover(Message message, Throwable cause) {
MessageProperties messageProperties = message.getMessageProperties();
Map<String, Object> headers = messageProperties.getHeaders();
headers.put(X_EXCEPTION_STACKTRACE, getStackTraceAsString(cause));
String stackTraceAsString = getStackTraceAsString(cause);
if (this.maxStackTraceLength < 0) {
int maxStackTraceLength = RabbitUtils
.getMaxFrame(((RabbitTemplate) this.errorTemplate).getConnectionFactory());
if (maxStackTraceLength > 0) {
maxStackTraceLength -= this.frameMaxHeadroom;
this.maxStackTraceLength = maxStackTraceLength;
}
}
if (this.maxStackTraceLength > 0 && stackTraceAsString.length() > this.maxStackTraceLength) {
stackTraceAsString = stackTraceAsString.substring(0, this.maxStackTraceLength);
this.logger.warn("Stack trace in republished message header truncated due to frame_max limitations; "
+ "consider increasing frame_max on the broker or reduce the stack trace depth", cause);
}
headers.put(X_EXCEPTION_STACKTRACE, stackTraceAsString);
headers.put(X_EXCEPTION_MESSAGE, cause.getCause() != null ? cause.getCause().getMessage() : cause.getMessage());
headers.put(X_ORIGINAL_EXCHANGE, messageProperties.getReceivedExchange());
headers.put(X_ORIGINAL_ROUTING_KEY, messageProperties.getReceivedRoutingKey());
Expand All @@ -140,17 +178,20 @@ public void recover(Message message, Throwable cause) {
}

if (null != this.errorExchangeName) {
String routingKey = this.errorRoutingKey != null ? this.errorRoutingKey : this.prefixedOriginalRoutingKey(message);
String routingKey = this.errorRoutingKey != null ? this.errorRoutingKey
: this.prefixedOriginalRoutingKey(message);
this.errorTemplate.send(this.errorExchangeName, routingKey, message);
if (this.logger.isWarnEnabled()) {
this.logger.warn("Republishing failed message to exchange " + this.errorExchangeName);
this.logger.warn("Republishing failed message to exchange '" + this.errorExchangeName
+ "' with routing key " + routingKey);
}
}
else {
final String routingKey = this.prefixedOriginalRoutingKey(message);
this.errorTemplate.send(routingKey, message);
if (this.logger.isWarnEnabled()) {
this.logger.warn("Republishing failed message to the template's default exchange with routing key " + routingKey);
this.logger.warn("Republishing failed message to the template's default exchange with routing key "
+ routingKey);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Copyright 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.amqp.rabbit.retry;

import static org.assertj.core.api.Assertions.assertThat;

import java.io.PrintWriter;
import java.io.StringWriter;

import org.junit.jupiter.api.Test;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.RabbitUtils;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.junit.RabbitAvailable;
import org.springframework.amqp.rabbit.junit.RabbitAvailableCondition;

import com.rabbitmq.client.LongString;

/**
* @author Gary Russell
* @since 2.0.5
*
*/
@RabbitAvailable(queues = RepublishMessageRecovererIntegrationTests.BIG_HEADER_QUEUE)
public class RepublishMessageRecovererIntegrationTests {

public static final String BIG_HEADER_QUEUE = "big.header.queue";

private static final String BIG_EXCEPTION_MESSAGE = new String(new byte[10_000]).replaceAll("\u0000", "x");

private int maxHeaderSize;

@Test
public void testBigHeader() {
RabbitTemplate template = new RabbitTemplate(
new CachingConnectionFactory(RabbitAvailableCondition.getBrokerRunning().getConnectionFactory()));
this.maxHeaderSize = RabbitUtils.getMaxFrame(template.getConnectionFactory()) - 20_000;
assertThat(this.maxHeaderSize).isGreaterThan(0);
RepublishMessageRecoverer recoverer = new RepublishMessageRecoverer(template, "", BIG_HEADER_QUEUE);
recoverer.recover(new Message("foo".getBytes(), new MessageProperties()),
bigCause(new RuntimeException(BIG_EXCEPTION_MESSAGE)));
Message received = template.receive(BIG_HEADER_QUEUE, 10_000);
assertThat(received).isNotNull();
assertThat(((LongString) received.getMessageProperties().getHeaders()
.get(RepublishMessageRecoverer.X_EXCEPTION_STACKTRACE)).length()).isEqualTo(this.maxHeaderSize);
}

private Throwable bigCause(Throwable cause) {
if (getStackTraceAsString(cause).length() > this.maxHeaderSize) {
return cause;
}
return bigCause(new RuntimeException(BIG_EXCEPTION_MESSAGE, cause));
}

private String getStackTraceAsString(Throwable cause) {
StringWriter stringWriter = new StringWriter();
PrintWriter printWriter = new PrintWriter(stringWriter, true);
cause.printStackTrace(printWriter);
return stringWriter.getBuffer().toString();
}

}

0 comments on commit 67a6857

Please sign in to comment.