Skip to content

Commit

Permalink
INT-4473: Support prefix bean names with flow id
Browse files Browse the repository at this point in the history
JIRA: https://jira.spring.io/browse/INT-4473

Previously, dynamic registration of integration flows with components configured
with the same `id` would fail with duplicate bean names.

Add `useFlowIdAsPrefix()` to the registration builder to enable the option.
Then, in the BPP, check the flag before naming the beans.

**cherry-pick to 5.0.x**

* Polishing according PR comments
* Widen `flowNamePrefix` responsibility in the
`IntegrationFlowBeanPostProcessor` since we may have many other
components in the dynamic flow with the same id, not only consumer
endpoints

# Conflicts:
#	spring-integration-core/src/main/java/org/springframework/integration/config/dsl/IntegrationFlowBeanPostProcessor.java
#	spring-integration-core/src/main/java/org/springframework/integration/dsl/context/IntegrationFlowContext.java
#	spring-integration-core/src/main/java/org/springframework/integration/dsl/context/StandardIntegrationFlowContext.java
#	src/reference/asciidoc/dsl.adoc
  • Loading branch information
garyrussell authored and artembilan committed May 22, 2018
1 parent d0603a8 commit 48df4f2
Show file tree
Hide file tree
Showing 4 changed files with 110 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.SourcePollingChannelAdapterSpec;
import org.springframework.integration.dsl.StandardIntegrationFlow;
import org.springframework.integration.dsl.context.IntegrationFlowContext;
import org.springframework.integration.dsl.support.MessageChannelReference;
import org.springframework.integration.gateway.AnnotationGatewayProxyFactoryBean;
import org.springframework.integration.support.context.NamedComponent;
Expand All @@ -72,6 +73,8 @@ public class IntegrationFlowBeanPostProcessor

private ConfigurableListableBeanFactory beanFactory;

private IntegrationFlowContext flowContext;

@Override
public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
Assert.isInstanceOf(ConfigurableListableBeanFactory.class, beanFactory,
Expand All @@ -80,6 +83,8 @@ public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
);

this.beanFactory = (ConfigurableListableBeanFactory) beanFactory;
this.flowContext = this.beanFactory.getBean(IntegrationFlowContext.class);
Assert.notNull(this.flowContext, "There must be an IntegrationFlowContext in the application context");
}

@Override
Expand All @@ -105,14 +110,15 @@ public void afterSingletonsInstantiated() {
throw new BeanCreationNotAllowedException(beanName, "IntegrationFlows can not be scoped beans. " +
"Any dependant beans are registered as singletons, meanwhile IntegrationFlow is just a " +
"logical container for them. \n" +
"Consider to use [IntegrationFlowContext] for manual registration of IntegrationFlows.");
"Consider using [IntegrationFlowContext] for manual registration of IntegrationFlows.");
}
}
}
}

private Object processStandardIntegrationFlow(StandardIntegrationFlow flow, String flowBeanName) {
String flowNamePrefix = flowBeanName + ".";
boolean useFlowIdAsPrefix = this.flowContext.isUseIdAsPrefix(flowBeanName);
int subFlowNameIndex = 0;
int channelNameIndex = 0;

Expand All @@ -128,7 +134,10 @@ private Object processStandardIntegrationFlow(StandardIntegrationFlow flow, Stri
String id = endpointSpec.getId();

if (id == null) {
id = generateBeanName(endpoint, flowNamePrefix, entry.getValue());
id = generateBeanName(endpoint, flowNamePrefix, entry.getValue(), useFlowIdAsPrefix);
}
else if (useFlowIdAsPrefix) {
id = flowNamePrefix + id;
}

Collection<?> messageHandlers =
Expand Down Expand Up @@ -190,13 +199,19 @@ else if (component instanceof SourcePollingChannelAdapterSpec) {
.contains(o.getKey()))
.forEach(o ->
registerComponent(o.getKey(),
generateBeanName(o.getKey(), flowNamePrefix, o.getValue())));
generateBeanName(o.getKey(), flowNamePrefix, o.getValue(),
useFlowIdAsPrefix)));
}
SourcePollingChannelAdapterFactoryBean pollingChannelAdapterFactoryBean = spec.get().getT1();
String id = spec.getId();
if (!StringUtils.hasText(id)) {
id = generateBeanName(pollingChannelAdapterFactoryBean, flowNamePrefix, entry.getValue());
if (id == null) {
id = generateBeanName(pollingChannelAdapterFactoryBean, flowNamePrefix, entry.getValue(),
useFlowIdAsPrefix);
}
else if (useFlowIdAsPrefix) {
id = flowNamePrefix + id;
}

registerComponent(pollingChannelAdapterFactoryBean, id, flowBeanName);
targetIntegrationComponents.put(pollingChannelAdapterFactoryBean, id);

Expand Down Expand Up @@ -241,7 +256,9 @@ else if (component instanceof AnnotationGatewayProxyFactoryBean) {
targetIntegrationComponents.put(component, gatewayId);
}
else {
String generatedBeanName = generateBeanName(component, flowNamePrefix, entry.getValue());
String generatedBeanName =
generateBeanName(component, flowNamePrefix, entry.getValue(), useFlowIdAsPrefix);

registerComponent(component, generatedBeanName, flowBeanName);
targetIntegrationComponents.put(component, generatedBeanName);
}
Expand Down Expand Up @@ -306,15 +323,19 @@ private void registerComponent(Object component, String beanName, String parentN
}

private String generateBeanName(Object instance, String prefix) {
return generateBeanName(instance, prefix, null);
return generateBeanName(instance, prefix, null, false);
}

private String generateBeanName(Object instance, String prefix, String fallbackId) {
private String generateBeanName(Object instance, String prefix, String fallbackId, boolean useFlowIdAsPrefix) {
if (instance instanceof NamedComponent && ((NamedComponent) instance).getComponentName() != null) {
return ((NamedComponent) instance).getComponentName();
return useFlowIdAsPrefix
? prefix + ((NamedComponent) instance).getComponentName()
: ((NamedComponent) instance).getComponentName();
}
else if (fallbackId != null) {
return fallbackId;
return useFlowIdAsPrefix
? prefix + fallbackId
: fallbackId;
}

String generatedBeanName = prefix + instance.getClass().getName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.springframework.integration.support.context.NamedComponent;
import org.springframework.messaging.MessageChannel;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;

/**
* A public API for dynamic (manual) registration of {@link IntegrationFlow}s,
Expand Down Expand Up @@ -70,6 +71,8 @@ public final class IntegrationFlowContext implements BeanFactoryAware {

private final Map<String, IntegrationFlowRegistration> registry = new ConcurrentHashMap<>();

private final Map<String, Boolean> useFlowIdAsPrefix = new ConcurrentHashMap<>();

private final Lock registerFlowsLock = new ReentrantLock();

private ConfigurableListableBeanFactory beanFactory;
Expand All @@ -96,6 +99,15 @@ public IntegrationFlowRegistrationBuilder registration(IntegrationFlow integrati
return new IntegrationFlowRegistrationBuilder(integrationFlow);
}

/**
* Return true to prefix flow bean names with the flow id and a period.
* @param flowId the flow id.
* @return true to use as a prefix.
* @since 5.0.6
*/
public boolean isUseIdAsPrefix(String flowId) {
return Boolean.TRUE.equals(this.useFlowIdAsPrefix.get(flowId));
}

private void register(IntegrationFlowRegistrationBuilder builder) {
IntegrationFlow integrationFlow = builder.integrationFlowRegistration.getIntegrationFlow();
Expand Down Expand Up @@ -228,8 +240,12 @@ private String generateBeanName(Object instance, String parentName) {
}

/**
* @author Gary Russell
* @since 5.1
*
* A Builder pattern implementation for the options to register {@link IntegrationFlow}
* in the application context.
*/
public final class IntegrationFlowRegistrationBuilder {

Expand All @@ -239,6 +255,8 @@ public final class IntegrationFlowRegistrationBuilder {

private boolean autoStartup = true;

private boolean idAsPrefix;

IntegrationFlowRegistrationBuilder(IntegrationFlow integrationFlow) {
this.integrationFlowRegistration = new IntegrationFlowRegistration(integrationFlow);
this.integrationFlowRegistration.setBeanFactory(IntegrationFlowContext.this.beanFactory);
Expand Down Expand Up @@ -293,13 +311,33 @@ public IntegrationFlowRegistrationBuilder addBean(String name, Object bean) {
return this;
}

/**
* Invoke this method to prefix bean names in the flow with the (required) flow id
* and a period. This is useful if you wish to register the same flow multiple times
* while retaining the ability to reference beans within the flow; adding the unique
* flow id to the bean name makes the name unique.
* @return the current builder instance.
* @see #id(String)
* @since 5.0.6
*/
public IntegrationFlowRegistrationBuilder useFlowIdAsPrefix() {
this.idAsPrefix = true;
return this;
}

/**
* Register an {@link IntegrationFlow} and all the dependant and support components
* in the application context and return an associated {@link IntegrationFlowRegistration}
* control object.
* @return the {@link IntegrationFlowRegistration} instance.
*/
public IntegrationFlowRegistration register() {
String id = this.integrationFlowRegistration.getId();
Assert.state(!this.idAsPrefix || StringUtils.hasText(id),
"An 'id' must be present to use 'useFlowIdAsPrefix'");
if (this.idAsPrefix) {
IntegrationFlowContext.this.useFlowIdAsPrefix.put(id, this.idAsPrefix);
}
IntegrationFlowContext.this.register(this);
return this.integrationFlowRegistration;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,12 +170,13 @@ public void testManualFlowRegistration() throws InterruptedException {
.fixedDelay(10)
.maxMessagesPerPoll(1)
.receiveTimeout(10)))
.handle(new BeanFactoryHandler());
.handle(new BeanFactoryHandler(), e -> e.id("anId"));

BeanFactoryHandler additionalBean = new BeanFactoryHandler();
IntegrationFlowRegistration flowRegistration =
this.integrationFlowContext.registration(myFlow)
.id(flowId)
.useFlowIdAsPrefix()
.addBean(additionalBean)
.register();

Expand All @@ -184,6 +185,7 @@ public void testManualFlowRegistration() throws InterruptedException {
BeanFactoryHandler.class);
assertSame(additionalBean, bean);
assertSame(this.beanFactory, bean.beanFactory);
bean = this.beanFactory.getBean(flowRegistration.getId() + "." + "anId.handler", BeanFactoryHandler.class);

MessagingTemplate messagingTemplate = flowRegistration.getMessagingTemplate();
messagingTemplate.setReceiveTimeout(10000);
Expand Down
40 changes: 38 additions & 2 deletions src/reference/asciidoc/dsl.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -578,7 +578,7 @@ The result of this definition is the same bunch of Integration components wired
Only limitation is here, that this flow is started with named direct channel - `lambdaFlow.input`.
And Lambda flow can't start from `MessageSource` or `MessageProducer`.

Starting with _version 5.0.5_, the generated bean names for the components in an `IntegrationFlow` include the flow bean followed by a dot as a prefix.
Starting with _version 5.0.6_, the generated bean names for the components in an `IntegrationFlow` include the flow bean followed by a dot as a prefix.
For example the `ConsumerEndpointFactoryBean` for the `.transform("Hello "::concat)` in the sample above, will end up with te bean name like `lambdaFlow.org.springframework.integration.config.ConsumerEndpointFactoryBean#0`.
The `Transformer` implementation bean for that endpoint will have a bean name such as `lambdaFlow.org.springframework.integration.transformer.MethodInvokingTransformer#0`.
These generated bean names are prepended with the flow id prefix for purposes such as parsing logs or grouping components together in some analysis tool, as well as to avoid a race condition when we concurrently register integration flows at runtime.
Expand Down Expand Up @@ -895,10 +895,46 @@ Usually those additional beans are connection factories (AMQP, JMS, (S)FTP, TCP/
Such a dynamically registered `IntegrationFlow` and all its dependant beans can be removed afterwards using `IntegrationFlowRegistration.destroy()` callback.
See `IntegrationFlowContext` JavaDocs for more information.

NOTE: Starting with _version 5.0.5_, all generated bean names in an `IntegrationFlow` definition are prepended with flow id as a prefix.
NOTE: Starting with _version 5.0.6_, all generated bean names in an `IntegrationFlow` definition are prepended with flow id as a prefix.
It is recommended to always specify an explicit flow id, otherwise a synchronization barrier is initiated in the `IntegrationFlowContext` to generate the bean name for the `IntegrationFlow` and register its beans.
We synchronize on these two operations to avoid a race condition when the same generated bean name may be used for different `IntegrationFlow` instances.

Also, starting with _version 5.0.6_, the registration builder API has a new method `useFlowIdAsPrefix()`.
This is useful if you wish to declare multiple instances of the same flow and avoid bean name collisions if components in the flows have the same id.

For example:

[source, java]
----
private void registerFlows() {
IntegrationFlowRegistration flow1 =
this.flowContext.registration(buildFlow(1234))
.id("tcp1")
.useFlowIdAsPrefix()
.register();
IntegrationFlowRegistration flow2 =
this.flowContext.registration(buildFlow(1235))
.id("tcp2")
.useFlowIdAsPrefix()
.register();
}
private IntegrationFlow buildFlow(int port) {
return f -> f
.handle(Tcp.outboundGateway(Tcp.netClient("localhost", port)
.serializer(TcpCodecs.crlf())
.deserializer(TcpCodecs.lengthHeader1())
.id("client"))
.remoteTimeout(m -> 5000))
.transform(Transformers.objectToString());
}
----

In this case, the message handler for the first flow can be referenced with bean name `tcp1.client.handler`.

NOTE: an `id` is required when using `useFlowIdAsPrefix()`.

[[java-dsl-gateway]]
=== IntegrationFlow as Gateway

Expand Down

0 comments on commit 48df4f2

Please sign in to comment.