Skip to content

Commit

Permalink
INT-4509: Fix memory leak for static beans stores
Browse files Browse the repository at this point in the history
JIRA: https://jira.spring.io/browse/INT-4509

To avoid a re-usage of the `AbstractReplyProducingMessageHandler`,
the `IntegrationFlowDefinition` and `AbstractStandardMessageHandlerFactoryBean`
have a `static Set<>` to store already used producers and check it for
newly provided.
These stores are not cleaned when beans are destroyed leading to memory
leaks

* Implements a `DisposableBean` for the `AbstractStandardMessageHandlerFactoryBean`
to remove its `replyHandler` from the `referencedReplyProducers` on
`destroy()`
* Introduce a `IntegrationFlowDefinition.ReplyProducerCleaner` - an
`DestructionAwareBeanPostProcessor` to clean up removing `MessageProducer`
from the `IntegrationFlowDefinition.REFERENCED_REPLY_PRODUCERS`

**Cherry-pick to 5.0.x**
  • Loading branch information
artembilan authored and garyrussell committed Jul 26, 2018
1 parent d4cf440 commit 663984a
Show file tree
Hide file tree
Showing 5 changed files with 132 additions and 43 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2017 the original author or authors.
* Copyright 2002-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -21,6 +21,7 @@

import org.springframework.aop.TargetSource;
import org.springframework.aop.framework.Advised;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.expression.Expression;
import org.springframework.expression.ExpressionParser;
import org.springframework.expression.spel.standard.SpelExpressionParser;
Expand All @@ -41,21 +42,23 @@
* @author David Liu
*/
public abstract class AbstractStandardMessageHandlerFactoryBean
extends AbstractSimpleMessageHandlerFactoryBean<MessageHandler> {
extends AbstractSimpleMessageHandlerFactoryBean<MessageHandler> implements DisposableBean {

private static final ExpressionParser expressionParser = new SpelExpressionParser();

private static final Set<MessageHandler> referencedReplyProducers = new HashSet<>();

private volatile Boolean requiresReply;
private Boolean requiresReply;

private volatile Object targetObject;
private Object targetObject;

private volatile String targetMethodName;
private String targetMethodName;

private volatile Expression expression;
private Expression expression;

private volatile Long sendTimeout;
private Long sendTimeout;

private MessageHandler replyHandler;

/**
* Set the target POJO for the message handler.
Expand Down Expand Up @@ -101,6 +104,13 @@ public Long getSendTimeout() {
return this.sendTimeout;
}

@Override
public void destroy() {
if (this.replyHandler != null) {
referencedReplyProducers.remove(this.replyHandler);
}
}

@Override
protected MessageHandler createHandler() {
MessageHandler handler;
Expand Down Expand Up @@ -158,6 +168,7 @@ private void checkReuse(AbstractMessageProducingHandler replyHandler) {
"An AbstractMessageProducingMessageHandler may only be referenced once (" +
replyHandler.getComponentName() + ") - use scope=\"prototype\"");
referencedReplyProducers.add(replyHandler);
this.replyHandler = replyHandler;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.springframework.beans.factory.support.RootBeanDefinition;
import org.springframework.integration.config.IntegrationConfigurationInitializer;
import org.springframework.integration.dsl.IntegrationComponentSpec;
import org.springframework.integration.dsl.IntegrationFlowDefinition;
import org.springframework.integration.dsl.context.IntegrationFlowContext;
import org.springframework.util.Assert;

Expand All @@ -46,6 +47,9 @@ public class DslIntegrationConfigurationInitializer implements IntegrationConfig
private static final String INTEGRATION_FLOW_CONTEXT_BEAN_NAME =
Introspector.decapitalize(IntegrationFlowContext.class.getName());

private static final String INTEGRATION_FLOW_REPLY_PRODUCER_CLEANER_BEAN_NAME =
Introspector.decapitalize(IntegrationFlowDefinition.ReplyProducerCleaner.class.getName());

@Override
public void initialize(ConfigurableListableBeanFactory configurableListableBeanFactory) throws BeansException {
Assert.isInstanceOf(BeanDefinitionRegistry.class, configurableListableBeanFactory,
Expand All @@ -59,6 +63,8 @@ public void initialize(ConfigurableListableBeanFactory configurableListableBeanF
new RootBeanDefinition(IntegrationFlowBeanPostProcessor.class));
registry.registerBeanDefinition(INTEGRATION_FLOW_CONTEXT_BEAN_NAME,
new RootBeanDefinition(IntegrationFlowContext.class));
registry.registerBeanDefinition(INTEGRATION_FLOW_REPLY_PRODUCER_CLEANER_BEAN_NAME,
new RootBeanDefinition(IntegrationFlowDefinition.ReplyProducerCleaner.class));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@

import org.springframework.aop.framework.Advised;
import org.springframework.aop.support.AopUtils;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanCreationException;
import org.springframework.beans.factory.config.DestructionAwareBeanPostProcessor;
import org.springframework.expression.Expression;
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.integration.aggregator.AggregatingMessageHandler;
Expand Down Expand Up @@ -122,9 +124,9 @@ public abstract class IntegrationFlowDefinition<B extends IntegrationFlowDefinit

protected final Map<Object, String> integrationComponents = new LinkedHashMap<>();

protected MessageChannel currentMessageChannel;
private MessageChannel currentMessageChannel;

protected Object currentComponent;
private Object currentComponent;

private StandardIntegrationFlow integrationFlow;

Expand Down Expand Up @@ -481,7 +483,9 @@ public B transform(String expression) {
* @return the current {@link IntegrationFlowDefinition}.
* @see ExpressionEvaluatingTransformer
*/
public B transform(String expression, Consumer<GenericEndpointSpec<MessageTransformingHandler>> endpointConfigurer) {
public B transform(String expression,
Consumer<GenericEndpointSpec<MessageTransformingHandler>> endpointConfigurer) {

Assert.hasText(expression, "'expression' must not be empty");
return transform(new ExpressionEvaluatingTransformer(PARSER.parseExpression(expression)),
endpointConfigurer);
Expand Down Expand Up @@ -2773,4 +2777,21 @@ private void checkReuse(MessageProducer replyHandler) {
REFERENCED_REPLY_PRODUCERS.add(replyHandler);
}

public static final class ReplyProducerCleaner implements DestructionAwareBeanPostProcessor {

private ReplyProducerCleaner() {
}

@Override
public boolean requiresDestruction(Object bean) {
return IntegrationFlowDefinition.REFERENCED_REPLY_PRODUCERS.contains(bean);
}

@Override
public void postProcessBeforeDestruction(Object bean, String beanName) throws BeansException {
IntegrationFlowDefinition.REFERENCED_REPLY_PRODUCERS.remove(bean);
}

}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2013-2017 the original author or authors.
* Copyright 2013-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,6 +17,7 @@
package org.springframework.integration.config.xml;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
Expand All @@ -25,6 +26,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Set;

import org.junit.Test;
import org.junit.runner.RunWith;
Expand Down Expand Up @@ -52,68 +54,89 @@

/**
* @author Gary Russell
* @author Artem Bilan
*
* @since 3.0
*
*/
@ContextConfiguration
@RunWith(SpringJUnit4ClassRunner.class)
public class DelegatingConsumerParserTests {

@Autowired @Qualifier("directFilter.handler")
@Autowired
@Qualifier("directFilter.handler")
private MessageHandler directFilter;

@Autowired @Qualifier("refFilter.handler")
@Autowired
@Qualifier("refFilter.handler")
private MessageHandler refFilter;

@Autowired @Qualifier("filterWithMessageSelectorThatsAlsoAnARPMH.handler")
@Autowired
@Qualifier("filterWithMessageSelectorThatsAlsoAnARPMH.handler")
private MessageHandler filterWithMessageSelectorThatsAlsoAnARPMH;

@Autowired @Qualifier("directRouter.handler")
@Autowired
@Qualifier("directRouter.handler")
private MessageHandler directRouter;

@Autowired @Qualifier("refRouter.handler")
@Autowired
@Qualifier("refRouter.handler")
private MessageHandler refRouter;

@Autowired @Qualifier("directRouterMH.handler")
@Autowired
@Qualifier("directRouterMH.handler")
private MessageHandler directRouterMH;

@Autowired @Qualifier("refRouterMH.handler")
@Autowired
@Qualifier("refRouterMH.handler")
private MessageHandler refRouterMH;

@Autowired @Qualifier("directRouterARPMH.handler")
@Autowired
@Qualifier("directRouterARPMH.handler")
private MessageHandler directRouterARPMH;

@Autowired @Qualifier("refRouterARPMH.handler")
@Autowired
@Qualifier("refRouterARPMH.handler")
private MessageHandler refRouterARPMH;

@Autowired @Qualifier("directServiceARPMH.handler")
@Autowired
@Qualifier("directServiceARPMH.handler")
private MessageHandler directServiceARPMH;

@Autowired @Qualifier("refServiceARPMH.handler")
@Autowired
@Qualifier("refServiceARPMH.handler")
private MessageHandler refServiceARPMH;

@Autowired @Qualifier("directSplitter.handler")
@Autowired
@Qualifier("directSplitter.handler")
private MessageHandler directSplitter;

@Autowired @Qualifier("refSplitter.handler")
@Autowired
@Qualifier("refSplitter.handler")
private MessageHandler refSplitter;

@Autowired @Qualifier("splitterWithARPMH.handler")
@Autowired
@Qualifier("splitterWithARPMH.handler")
private MessageHandler splitterWithARPMH;

@Autowired @Qualifier("splitterWithARPMHWithAtts.handler")
@Autowired
@Qualifier("splitterWithARPMHWithAtts.handler")
private MessageHandler splitterWithARPMHWithAtts;

@Autowired @Qualifier("directTransformer.handler")
@Autowired
@Qualifier("directTransformer.handler")
private MessageHandler directTransformer;

@Autowired @Qualifier("refTransformer.handler")
@Autowired
@Qualifier("refTransformer.handler")
private MessageHandler refTransformer;

@Autowired @Qualifier("directTransformerARPMH.handler")
@Autowired
@Qualifier("directTransformerARPMH.handler")
private MessageHandler directTransformerARPMH;

@Autowired @Qualifier("refTransformerARPMH.handler")
@Autowired
@Qualifier("refTransformerARPMH.handler")
private MessageHandler refTransformerARPMH;

private static QueueChannel replyChannel = new QueueChannel();
Expand Down Expand Up @@ -153,7 +176,8 @@ public void testDelegates() {
assertTrue(splitterWithARPMH instanceof MySplitterThatsAnARPMH);
testHandler(splitterWithARPMH);
assertTrue(splitterWithARPMHWithAtts instanceof MySplitterThatsAnARPMH);
assertEquals(Long.valueOf(123), TestUtils.getPropertyValue(splitterWithARPMHWithAtts, "messagingTemplate.sendTimeout", Long.class));
assertEquals(Long.valueOf(123),
TestUtils.getPropertyValue(splitterWithARPMHWithAtts, "messagingTemplate.sendTimeout", Long.class));
testHandler(splitterWithARPMHWithAtts);

assertTrue(directTransformer instanceof MessageTransformingHandler);
Expand All @@ -170,24 +194,32 @@ public void testDelegates() {
}

@Test
@SuppressWarnings("unchecked")
public void testOneRefOnly() throws Exception {
ServiceActivatorFactoryBean fb = new ServiceActivatorFactoryBean();
fb.setBeanFactory(mock(BeanFactory.class));
MyServiceARPMH service = new MyServiceARPMH();
service.setBeanName("foo");
fb.setTargetObject(service);
fb.getObject();
fb = new ServiceActivatorFactoryBean();
fb.setBeanFactory(mock(BeanFactory.class));
fb.setTargetObject(service);

assertTrue(TestUtils.getPropertyValue(fb, "referencedReplyProducers", Set.class).contains(service));

ServiceActivatorFactoryBean fb2 = new ServiceActivatorFactoryBean();
fb2.setBeanFactory(mock(BeanFactory.class));
fb2.setTargetObject(service);
try {
fb.getObject();
fb2.getObject();
fail("expected exception");
}
catch (Exception e) {
assertEquals("An AbstractMessageProducingMessageHandler may only be referenced once (foo) - "
+ "use scope=\"prototype\"", e.getMessage());
}

fb.destroy();

assertFalse(TestUtils.getPropertyValue(fb, "referencedReplyProducers", Set.class).contains(service));
}

private void testHandler(MessageHandler handler) {
Expand Down
Loading

0 comments on commit 663984a

Please sign in to comment.