Skip to content

Commit

Permalink
INT-4421: Fix failedMessage in some exceptions
Browse files Browse the repository at this point in the history
JIRA: https://jira.spring.io/browse/INT-4421

If a component invoked by an SPCA threw a `MessagingException` with no
`failedMessage`, the resulting ErrorMessage payload had no `failedMessage`.

The `UnicastingDispatcher` had a check for this so it wasn't an issue
as long as at least one `DirectChannel` was between the poller and the
component.

Promote the wrapping code to `IntgrationUtils` and invoke it from
places that blindly rethrew `MessagingException`s.
  • Loading branch information
garyrussell authored and artembilan committed Mar 5, 2018
1 parent 1c4db65 commit 6065745
Show file tree
Hide file tree
Showing 8 changed files with 96 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,10 @@
import org.springframework.integration.support.management.MetricsContext;
import org.springframework.integration.support.management.Statistics;
import org.springframework.integration.support.management.TrackableComponent;
import org.springframework.integration.support.utils.IntegrationUtils;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageDeliveryException;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.converter.MessageConverter;
import org.springframework.messaging.support.ChannelInterceptor;
import org.springframework.util.Assert;
Expand Down Expand Up @@ -473,11 +473,8 @@ public boolean send(Message<?> message, long timeout) {
if (interceptorStack != null) {
interceptors.afterSendCompletion(message, this, sent, e, interceptorStack);
}
if (e instanceof MessagingException) {
throw (MessagingException) e;
}
throw new MessageDeliveryException(message,
"failed to send Message to channel '" + this.getComponentName() + "'", e);
throw IntegrationUtils.wrapInDeliveryExceptionIfNecessary(message,
() -> "failed to send Message to channel '" + this.getComponentName() + "'", e);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2016 the original author or authors.
* Copyright 2002-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -123,6 +123,23 @@ protected boolean tryOptimizedDispatch(Message<?> message) {
return false;
}

/**
* If the exception is not a {@link MessagingException} or does not have a
* {@link MessagingException#getFailedMessage() failedMessage}, wrap it in a new
* {@link MessagingException} with the message. There is some inconsistency here in
* that {@link MessagingException}s are wrapped in a {@link MessagingException} whereas
* {@link Exception}s are wrapped in {@link MessageDeliveryException}. It is retained
* for backwards compatibility and will be resolved in 5.1.
* It also does not wrap other {@link RuntimeException}s.
* TODO: Remove this in favor of
* {@code #wrapInDeliveryExceptionIfNecessary(Message, Supplier, Exception)} in 5.1.
* @deprecated in favor of
* {@code IntegrationUtils#wrapInDeliveryExceptionIfNecessary(Message, Supplier, Exception)}
* @param message the message.
* @param e the exception.
* @return the wrapper, if necessary, or the original exception.
*/
@Deprecated
protected RuntimeException wrapExceptionIfNecessary(Message<?> message, Exception e) {
RuntimeException runtimeException = (e instanceof RuntimeException)
? (RuntimeException) e
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2016 the original author or authors.
* Copyright 2002-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -145,7 +145,8 @@ private boolean doDispatch(Message<?> message) {
success = true; // we have a winner.
}
catch (Exception e) {
RuntimeException runtimeException = this.wrapExceptionIfNecessary(message, e);
@SuppressWarnings("deprecation")
RuntimeException runtimeException = wrapExceptionIfNecessary(message, e);
exceptions.add(runtimeException);
this.handleExceptions(exceptions, message, !handlerIterator.hasNext());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@
import org.springframework.integration.channel.ExecutorChannelInterceptorAware;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.router.MessageRouter;
import org.springframework.integration.support.utils.IntegrationUtils;
import org.springframework.integration.transaction.IntegrationResourceHolder;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageDeliveryException;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.PollableChannel;
import org.springframework.messaging.support.ChannelInterceptor;
import org.springframework.messaging.support.ExecutorChannelInterceptor;
Expand Down Expand Up @@ -135,11 +135,10 @@ protected void handleMessage(Message<?> message) {
if (!CollectionUtils.isEmpty(interceptorStack)) {
triggerAfterMessageHandled(theMessage, ex, interceptorStack);
}
if (ex instanceof MessagingException) {
throw (MessagingException) ex;
}
String description = "Failed to handle " + theMessage + " to " + this + " in " + this.handler;
throw new MessageDeliveryException(theMessage, description, ex);
// TODO: In 5.1 remove this; adding the failed message to the text is redundant
final Message<?> messageForText = theMessage;
throw IntegrationUtils.wrapInDeliveryExceptionIfNecessary(theMessage,
() -> "Failed to handle " + messageForText + " to " + this + " in " + this.handler, ex);
}
catch (Error ex) { //NOSONAR - ok, we re-throw below
if (!CollectionUtils.isEmpty(interceptorStack)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@
import org.springframework.integration.support.management.MetricsContext;
import org.springframework.integration.support.management.Statistics;
import org.springframework.integration.support.management.TrackableComponent;
import org.springframework.integration.support.utils.IntegrationUtils;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessageHandlingException;
import org.springframework.messaging.MessagingException;
import org.springframework.util.Assert;

Expand Down Expand Up @@ -171,10 +171,8 @@ public void handleMessage(Message<?> message) {
if (countsEnabled) {
handlerMetrics.afterHandle(start, false);
}
if (e instanceof MessagingException) {
throw (MessagingException) e;
}
throw new MessageHandlingException(message, "error occurred in message handler [" + this + "]", e);
throw IntegrationUtils.wrapInHandlingExceptionIfNecessary(message,
() -> "error occurred in message handler [" + this + "]", e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.springframework.integration.support.utils;

import java.io.UnsupportedEncodingException;
import java.util.function.Supplier;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
Expand All @@ -25,6 +26,10 @@
import org.springframework.core.convert.ConversionService;
import org.springframework.integration.support.DefaultMessageBuilderFactory;
import org.springframework.integration.support.MessageBuilderFactory;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageDeliveryException;
import org.springframework.messaging.MessageHandlingException;
import org.springframework.messaging.MessagingException;
import org.springframework.util.Assert;

/**
Expand Down Expand Up @@ -139,4 +144,46 @@ public static String bytesToString(byte[] bytes, String encoding) {
}
}

/**
* If the exception is not a {@link MessagingException} or does not have
* a {@link MessagingException#getFailedMessage() failedMessage}, wrap it
* in a new {@link MessageDeliveryException} with the message.
* @param message the message.
* @param text a Supplier for the new exception's message text.
* @param e the exception.
* @return the wrapper, if necessary, or the original exception.
* @since 5.0.4
*/
public static RuntimeException wrapInDeliveryExceptionIfNecessary(Message<?> message, Supplier<String> text, Exception e) {
RuntimeException runtimeException = (e instanceof RuntimeException)
? (RuntimeException) e
: new MessageDeliveryException(message, text.get(), e);
if (!(e instanceof MessagingException) ||
((MessagingException) e).getFailedMessage() == null) {
runtimeException = new MessageDeliveryException(message, text.get(), e);
}
return runtimeException;
}

/**
* If the exception is not a {@link MessagingException} or does not have
* a {@link MessagingException#getFailedMessage() failedMessage}, wrap it
* in a new {@link MessageHandlingException} with the message.
* @param message the message.
* @param text a Supplier for the new exception's message text.
* @param e the exception.
* @return the wrapper, if necessary, or the original exception.
* @since 5.0.4
*/
public static RuntimeException wrapInHandlingExceptionIfNecessary(Message<?> message, Supplier<String> text, Exception e) {
RuntimeException runtimeException = (e instanceof RuntimeException)
? (RuntimeException) e
: new MessageHandlingException(message, text.get(), e);
if (!(e instanceof MessagingException) ||
((MessagingException) e).getFailedMessage() == null) {
runtimeException = new MessageHandlingException(message, text.get(), e);
}
return runtimeException;
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2016 the original author or authors.
* Copyright 2002-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,8 +16,10 @@

package org.springframework.integration.handler;

import static org.hamcrest.CoreMatchers.instanceOf;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;

import org.hamcrest.Factory;
import org.hamcrest.Matcher;
Expand All @@ -27,17 +29,19 @@
import org.springframework.integration.message.MessageMatcher;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandlingException;
import org.springframework.messaging.PollableChannel;
import org.springframework.messaging.core.DestinationResolutionException;
import org.springframework.messaging.support.GenericMessage;

/**
* @author Mark Fisher
* @author Iwein Fuld
* @author Gary Russell
*/
public class BridgeHandlerTests {

private BridgeHandler handler = new BridgeHandler();
private final BridgeHandler handler = new BridgeHandler();

@Factory
public static Matcher<Message<?>> sameExceptImmutableHeaders(Message<?> expected) {
Expand All @@ -55,10 +59,16 @@ public void simpleBridge() {
assertThat(reply, sameExceptImmutableHeaders(request));
}

@Test(expected = DestinationResolutionException.class)
@Test
public void missingOutputChannelVerifiedAtRuntime() {
Message<?> request = new GenericMessage<String>("test");
handler.handleMessage(request);
try {
handler.handleMessage(request);
fail("Expected exception");
}
catch (MessageHandlingException e) {
assertThat(e.getCause(), instanceOf(DestinationResolutionException.class));
}
}

@Test(timeout = 1000)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public class TcpOutboundGatewayTests {

private static final Log logger = LogFactory.getLog(TcpOutboundGatewayTests.class);

private AsyncTaskExecutor executor = new SimpleAsyncTaskExecutor();
private final AsyncTaskExecutor executor = new SimpleAsyncTaskExecutor();

@ClassRule
public static LongRunningIntegrationTest longTests = new LongRunningIntegrationTest();
Expand Down Expand Up @@ -730,7 +730,7 @@ private void testGWPropagatesSocketCloseGuts(final int port, AbstractClientConne
fail("expected failure");
}
catch (Exception e) {
assertThat(e.getCause(), instanceOf(EOFException.class));
assertThat(e.getCause().getCause(), instanceOf(EOFException.class));
}
assertEquals(0, TestUtils.getPropertyValue(gateway, "pendingReplies", Map.class).size());
Message<?> reply = replyChannel.receive(0);
Expand Down Expand Up @@ -839,7 +839,7 @@ private void testGWPropagatesSocketTimeoutGuts(final int port, AbstractClientCon
fail("expected failure");
}
catch (Exception e) {
assertThat(e.getCause(), instanceOf(SocketTimeoutException.class));
assertThat(e.getCause().getCause(), instanceOf(SocketTimeoutException.class));
}
assertEquals(0, TestUtils.getPropertyValue(gateway, "pendingReplies", Map.class).size());
Message<?> reply = replyChannel.receive(0);
Expand Down

0 comments on commit 6065745

Please sign in to comment.