Skip to content

Commit

Permalink
Fix IntegrationFlowContext concurrency issue
Browse files Browse the repository at this point in the history
When we register `IntegrationFlow` s concurrently at runtime, we may
end up with the problem when we register the same object with the same
bean name, but in different places.
Or when we turn off bean overriding, we end up with the exception that
bean with the name already registered

* Wrap `IntegrationFlow` bean registration in the
`StandardIntegrationFlowContext` into the `Lock` when its bean name
is generating
* Make `StandardIntegrationFlowContext.registry` as `ConcurrentHashMap`
to avoid `ConcurrentModificationException` during `put()` and `remove()`
* Fix concurrency for beans registration with the generation names in
the `IntegrationFlowBeanPostProcessor` using an `IntegrationFlow` id
as a prefix for uniqueness.

**Cherry-pick to 5.0.x**

Fix generated bean name in the WebFluxDslTests

Use only single `Lock` in the `StandardIntegrationFlowContext`:
we don't need a fully blown `LockRegistry` there anymore since we have
only one synchronization block there and it is always around the same
type
* Add `What's New` note, and mention changes in the `dsl.adoc`

Minor doc polishing.

# Conflicts:
#	spring-integration-core/src/main/java/org/springframework/integration/dsl/context/StandardIntegrationFlowContext.java
#	spring-integration-core/src/test/java/org/springframework/integration/dsl/manualflow/ManualFlowTests.java
#	src/reference/asciidoc/dsl.adoc
#	src/reference/asciidoc/whats-new.adoc
  • Loading branch information
artembilan committed May 21, 2018
1 parent 566b4b8 commit d0603a8
Show file tree
Hide file tree
Showing 6 changed files with 162 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -128,21 +128,18 @@ private Object processStandardIntegrationFlow(StandardIntegrationFlow flow, Stri
String id = endpointSpec.getId();

if (id == null) {
id = generateBeanName(endpoint, entry.getValue());
id = generateBeanName(endpoint, flowNamePrefix, entry.getValue());
}

Collection<?> messageHandlers =
this.beanFactory.getBeansOfType(messageHandler.getClass(), false, false)
.values();

if (!messageHandlers.contains(messageHandler)) {
String handlerBeanName = generateBeanName(messageHandler);
String[] handlerAlias = new String[] { id + IntegrationConfigUtils.HANDLER_ALIAS_SUFFIX };
String handlerBeanName = generateBeanName(messageHandler, flowNamePrefix);

registerComponent(messageHandler, handlerBeanName, flowBeanName);
for (String alias : handlerAlias) {
this.beanFactory.registerAlias(handlerBeanName, alias);
}
this.beanFactory.registerAlias(handlerBeanName, id + IntegrationConfigUtils.HANDLER_ALIAS_SUFFIX);
}

registerComponent(endpoint, id, flowBeanName);
Expand Down Expand Up @@ -192,12 +189,13 @@ else if (component instanceof SourcePollingChannelAdapterSpec) {
.values()
.contains(o.getKey()))
.forEach(o ->
registerComponent(o.getKey(), generateBeanName(o.getKey(), o.getValue())));
registerComponent(o.getKey(),
generateBeanName(o.getKey(), flowNamePrefix, o.getValue())));
}
SourcePollingChannelAdapterFactoryBean pollingChannelAdapterFactoryBean = spec.get().getT1();
String id = spec.getId();
if (!StringUtils.hasText(id)) {
id = generateBeanName(pollingChannelAdapterFactoryBean, entry.getValue());
id = generateBeanName(pollingChannelAdapterFactoryBean, flowNamePrefix, entry.getValue());
}
registerComponent(pollingChannelAdapterFactoryBean, id, flowBeanName);
targetIntegrationComponents.put(pollingChannelAdapterFactoryBean, id);
Expand Down Expand Up @@ -243,7 +241,7 @@ else if (component instanceof AnnotationGatewayProxyFactoryBean) {
targetIntegrationComponents.put(component, gatewayId);
}
else {
String generatedBeanName = generateBeanName(component, entry.getValue());
String generatedBeanName = generateBeanName(component, flowNamePrefix, entry.getValue());
registerComponent(component, generatedBeanName, flowBeanName);
targetIntegrationComponents.put(component, generatedBeanName);
}
Expand Down Expand Up @@ -307,19 +305,19 @@ private void registerComponent(Object component, String beanName, String parentN
this.beanFactory.getBean(beanName);
}

private String generateBeanName(Object instance) {
return generateBeanName(instance, null);
private String generateBeanName(Object instance, String prefix) {
return generateBeanName(instance, prefix, null);
}

private String generateBeanName(Object instance, String fallbackId) {
private String generateBeanName(Object instance, String prefix, String fallbackId) {
if (instance instanceof NamedComponent && ((NamedComponent) instance).getComponentName() != null) {
return ((NamedComponent) instance).getComponentName();
}
else if (fallbackId != null) {
return fallbackId;
}

String generatedBeanName = instance.getClass().getName();
String generatedBeanName = prefix + instance.getClass().getName();
String id = generatedBeanName;
int counter = -1;
while (counter == -1 || this.beanFactory.containsBean(id)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
Expand Down Expand Up @@ -65,7 +68,9 @@
*/
public final class IntegrationFlowContext implements BeanFactoryAware {

private final Map<String, IntegrationFlowRegistration> registry = new HashMap<>();
private final Map<String, IntegrationFlowRegistration> registry = new ConcurrentHashMap<>();

private final Lock registerFlowsLock = new ReentrantLock();

private ConfigurableListableBeanFactory beanFactory;

Expand All @@ -91,20 +96,33 @@ public IntegrationFlowRegistrationBuilder registration(IntegrationFlow integrati
return new IntegrationFlowRegistrationBuilder(integrationFlow);
}


private void register(IntegrationFlowRegistrationBuilder builder) {
IntegrationFlow integrationFlow = builder.integrationFlowRegistration.getIntegrationFlow();
String flowId = builder.integrationFlowRegistration.getId();
if (flowId == null) {
flowId = generateBeanName(integrationFlow, null);
builder.id(flowId);
Lock registerBeanLock = null;
try {
if (flowId == null) {
registerBeanLock = this.registerFlowsLock;
registerBeanLock.lock();
flowId = generateBeanName(integrationFlow, null);
builder.id(flowId);
}
else if (this.registry.containsKey(flowId)) {
throw new IllegalArgumentException("An IntegrationFlow '" + this.registry.get(flowId) +
"' with flowId '" + flowId + "' is already registered.\n" +
"An existing IntegrationFlowRegistration must be destroyed before overriding.");
}

integrationFlow = (IntegrationFlow) registerBean(integrationFlow, flowId, null);
}
else if (this.registry.containsKey(flowId)) {
throw new IllegalArgumentException("An IntegrationFlow '" + this.registry.get(flowId) +
"' with flowId '" + flowId + "' is already registered.\n" +
"An existing IntegrationFlowRegistration must be destroyed before overriding.");
finally {
if (registerBeanLock != null) {
registerBeanLock.unlock();
}
}
IntegrationFlow theFlow = (IntegrationFlow) registerBean(integrationFlow, flowId, null);
builder.integrationFlowRegistration.setIntegrationFlow(theFlow);

builder.integrationFlowRegistration.setIntegrationFlow(integrationFlow);

final String theFlowId = flowId;
builder.additionalBeans.forEach((key, value) -> registerBean(key, value, theFlowId));
Expand Down Expand Up @@ -149,19 +167,26 @@ public IntegrationFlowRegistration getRegistrationById(String flowId) {
* for provided {@code flowId} and clean up all the local cache for it.
* @param flowId the bean name to destroy from
*/
public synchronized void remove(String flowId) {
public void remove(String flowId) {
if (this.registry.containsKey(flowId)) {
IntegrationFlowRegistration flowRegistration = this.registry.remove(flowId);
flowRegistration.stop();

Arrays.stream(this.beanFactory.getDependentBeans(flowId))
.forEach(((BeanDefinitionRegistry) this.beanFactory)::removeBeanDefinition);
BeanDefinitionRegistry beanDefinitionRegistry = (BeanDefinitionRegistry) this.beanFactory;

((BeanDefinitionRegistry) this.beanFactory).removeBeanDefinition(flowId);
Arrays.stream(this.beanFactory.getDependentBeans(flowId))
.forEach(beanName -> {
beanDefinitionRegistry.removeBeanDefinition(beanName);
// TODO until https://jira.spring.io/browse/SPR-16837
Arrays.asList(beanDefinitionRegistry.getAliases(beanName))
.forEach(beanDefinitionRegistry::removeAlias);
});

beanDefinitionRegistry.removeBeanDefinition(flowId);
}
else {
throw new IllegalStateException("Only manually registered IntegrationFlows can be removed. "
+ "But [" + flowId + "] ins't one of them.");
throw new IllegalStateException("An IntegrationFlow with the id "
+ "[" + flowId + "] doesn't exist in the registry.");
}
}

Expand Down
Loading

0 comments on commit d0603a8

Please sign in to comment.