Skip to content

Commit

Permalink
GH-1258: Add OOMHandler to the listener containers
Browse files Browse the repository at this point in the history
Resolves #1258 (comment)

**Backported to 2.2.x with a no-op default**
  • Loading branch information
garyrussell committed Oct 20, 2020
1 parent d73e6ae commit 3879496
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -242,10 +242,12 @@ public abstract class AbstractMessageListenerContainer extends RabbitAccessor

private boolean micrometerEnabled = true;

private volatile boolean lazyLoad;

private boolean isBatchListener;

private OOMHandler oOMHandler = error -> System.exit(99);

private volatile boolean lazyLoad;

@Override
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
this.applicationEventPublisher = applicationEventPublisher;
Expand Down Expand Up @@ -1123,6 +1125,21 @@ public void setMicrometerEnabled(boolean micrometerEnabled) {
this.micrometerEnabled = micrometerEnabled;
}

protected OOMHandler getOOMHandler() {
return this.oOMHandler;
}

/**
* Provide an OOMHandler implementation; by default, {@code System.exit(99)} is
* called.
* @param oOMHandler the handler.
* @since 2.2.12
*/
public void setOOMHandler(OOMHandler oOMHandler) {
Assert.notNull(oOMHandler, "'oOMHandler' cannot be null");
this.oOMHandler = oOMHandler;
}

/**
* Delegates to {@link #validateConfiguration()} and {@link #initialize()}.
*/
Expand Down Expand Up @@ -1950,6 +1967,22 @@ private interface ContainerDelegate {

}

/**
* A handler for {@link OutOfMemoryError} on the container thread(s).
* @since 2.2.12
*
*/
@FunctionalInterface
public interface OOMHandler {

/**
* Handle the error; typically, the JVM will be terminated.
* @param error the error.
*/
void handle(OutOfMemoryError error);

}

/**
* Exception that indicates that the initial setup of this container's shared Rabbit Connection failed. This is
* indicating to invokers that they need to establish the shared Connection themselves on first access.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1085,6 +1085,10 @@ private void callExecuteListener(Object data, long deliveryTag) {
}
}
}
catch (OutOfMemoryError e) { // NOSONAR
getOOMHandler().handle(e);
throw e;
}
}

private void handleAck(long deliveryTag, boolean channelLocallyTransacted) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1266,6 +1266,9 @@ public void run() { // NOSONAR - line count
catch (Error e) { //NOSONAR
// ok to catch Error - we're aborting so will stop
logger.error("Consumer thread error, thread abort.", e);
if (e instanceof OutOfMemoryError) { // NOSONAR
getOOMHandler().handle((OutOfMemoryError) e);
}
publishConsumerFailedEvent("Consumer threw an Error", true, e);
aborted = true;
}
Expand Down
10 changes: 10 additions & 0 deletions src/reference/asciidoc/amqp.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -5430,6 +5430,16 @@ Default is a `SimpleAsyncTaskExecutor`, using internally managed threads.
a| image::images/tickmark.png[]
a| image::images/tickmark.png[]

| oOMHandler
(N/A)

| An `AbstractMessageListenerContainer.OOMHandler` implementation that is called when a container thread catches an `OutOfMemoryException`.
The default implementation does nothing, for backwards compatibility.
Normally, the JVM should be terminated; this will be the default action in a future release.

a| image::images/tickmark.png[]
a| image::images/tickmark.png[]

| errorHandler
(error-handler)

Expand Down
7 changes: 4 additions & 3 deletions src/reference/asciidoc/whats-new.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ See <<exception-handling>> for more information.
Listener performance can now be monitored using Micrometer `Timer` s.
See <<micrometer>> for more information.

You can now configure an `OOMHandler` (out of memory handler); the default implementation does nothing, for backwards compatibility.
Normally, the JVM should be terminated; this will be the default action in a future release.
See <<containerAttributes>> for more information.

==== @RabbitListener Changes

You can now configure an `executor` on each listener, overriding the factory configuration, to more easily identify threads associated with the listener.
Expand Down Expand Up @@ -128,9 +132,6 @@ See <<message-properties-converters>> for more information.
Recovery of failed producer-created batches is now supported.
See <<batch-retry>> for more information.

A new listener container property `consumeDelay` is now available; it is helpful when using the https://github.com/rabbitmq/rabbitmq-sharding[RabbitMQ Sharding Plugin].
See <<containerAttributes>> for more information.

==== MessagePostProcessor Changes

The compressing `MessagePostProcessor` s now can use a comma to separate multiple content encodings instead of a colon.
Expand Down

0 comments on commit 3879496

Please sign in to comment.