Skip to content

Commit

Permalink
Improve Delayer docs around messageGroupId
Browse files Browse the repository at this point in the history
* Mention in the Javadocs of the `DelayHandler`, `DelayerEndpointSpec`,
`BaseIntegrationFlowDefinition.delay()`, `GroovyIntegrationFlowDefinition.delay()`,
`KotlinIntegrationFlowDefinition.delay()` that `messageGroupId` is required option
* Explain in the docs why `messageGroupId` is required and why it cannot rely on a bean name
  • Loading branch information
artembilan committed Jun 13, 2023
1 parent 81af20a commit b576748
Show file tree
Hide file tree
Showing 6 changed files with 31 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1208,6 +1208,7 @@ public B delay(String groupId, @Nullable Consumer<DelayerEndpointSpec> endpointC

/**
* Populate a {@link DelayHandler} to the current integration flow position.
* The {@link DelayerEndpointSpec#messageGroupId(String)} is required option.
* @param endpointConfigurer the {@link Consumer} to provide integration endpoint options.
* @return the current {@link BaseIntegrationFlowDefinition}.
* @since 6.2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,14 @@

/**
* A {@link ConsumerEndpointSpec} for a {@link DelayHandler}.
* The {@link #messageGroupId(String)} is required option.
*
* @author Artem Bilan
* @author Gary Russell
*
* @since 5.0
*
* @see DelayHandler
*/
public class DelayerEndpointSpec extends ConsumerEndpointSpec<DelayerEndpointSpec, DelayHandler> {

Expand Down Expand Up @@ -96,13 +99,12 @@ public DelayerEndpointSpec delayedAdvice(Advice... advice) {
return _this();
}

public DelayerEndpointSpec delayExpression(Expression delayExpression) {
this.handler.setDelayExpression(delayExpression);
return this;
public DelayerEndpointSpec delayExpression(String delayExpression) {
return delayExpression(PARSER.parseExpression(delayExpression));
}

public DelayerEndpointSpec delayExpression(String delayExpression) {
this.handler.setDelayExpression(PARSER.parseExpression(delayExpression));
public DelayerEndpointSpec delayExpression(Expression delayExpression) {
this.handler.setDelayExpression(delayExpression);
return this;
}

Expand Down Expand Up @@ -225,12 +227,12 @@ public DelayerEndpointSpec transactionalRelease(TransactionManager transactionMa
* @return the endpoint spec.
*/
public <P> DelayerEndpointSpec delayFunction(Function<Message<P>, Object> delayFunction) {
this.handler.setDelayExpression(new FunctionExpression<>(delayFunction));
return this;
return delayExpression(new FunctionExpression<>(delayFunction));
}

/**
* Set a group id to manage delayed messages by this handler.
* Required.
* @param messageGroupId the group id for delayed messages.
* @return the endpoint spec.
* @since 6.2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,14 +83,24 @@
* Message can be released as soon as five seconds from the current time). If the value is
* a Date, it will be delayed at least until that Date occurs (i.e. the delay in that case
* is equivalent to {@code headerDate.getTime() - new Date().getTime()}).
* <p>
* Delayed messages are stored in the {@link MessageGroupStore} as a dedicated group.
* If an external persistent store is provided, those delayed messages will be rescheduled
* after application startup.
* The {@link #messageGroupId} is required option and must be unique for each delayer
* configuration to avoid work-stealing from the store and unexpected releases.
* Different instances of the same delayer can point to the same message group in the store.
* The {@link #messageGroupId} cannot rely on a bean name which might be generated.
* After application restart the bean may get a different generated name and its delayed
* messages might be lost from reschedule since its group is not managed
* by the application anymore.
*
* @author Mark Fisher
* @author Artem Bilan
* @author Gary Russell
*
* @since 1.0.3
*/

@ManagedResource
@IntegrationManagedResource
public class DelayHandler extends AbstractReplyProducingMessageHandler implements DelayHandlerManagement,
Expand Down Expand Up @@ -132,7 +142,7 @@ public class DelayHandler extends AbstractReplyProducingMessageHandler implement

/**
* Construct an instance with default options.
* The {@link #messageGroupId}must then be provided via the setter.
* The {@link #messageGroupId} must then be provided via the setter.
* @since 6.2
*/
public DelayHandler() {
Expand Down Expand Up @@ -165,6 +175,7 @@ public DelayHandler(String messageGroupId, TaskScheduler taskScheduler) {

/**
* Set a group id to manage delayed messages by this handler.
* Required.
* @param messageGroupId the group id for delayed messages.
* @since 6.2
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -573,6 +573,7 @@ class KotlinIntegrationFlowDefinition(@PublishedApi internal val delegate: Integ

/**
* Populate a [DelayHandler] to the current integration flow position.
* The [DelayerEndpointSpec#messageGroupId(String)] is required option.
* @since 6.2
*/
fun delay(endpointConfigurer: DelayerEndpointSpec.() -> Unit) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -633,9 +633,10 @@ class GroovyIntegrationFlowDefinition {

/**
* Populate a {@link org.springframework.integration.handler.DelayHandler} to the current integration flow position.
* The {@link DelayerEndpointSpec#messageGroupId(String)} is required option.
* @param endpointConfigurer the {@link Consumer} to provide integration endpoint options.
* @see org.springframework.integration.dsl.DelayerEndpointSpec
* @since 6.2
* @see DelayerEndpointSpec
*/
GroovyIntegrationFlowDefinition delay(
@DelegatesTo(value = DelayerEndpointSpec, strategy = Closure.DELEGATE_FIRST)
Expand Down
6 changes: 5 additions & 1 deletion src/reference/asciidoc/delayer.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -149,12 +149,16 @@ See <<delayer-release-failures>>.
==== Delayer and a Message Store

The `DelayHandler` persists delayed messages into the message group in the provided `MessageStore`.
(The 'groupId' is based on the required 'id' attribute of the `<delayer>` element.)
(The 'groupId' is based on the required 'id' attribute of the `<delayer>` element.
See also `DelayHandler.setMessageGroupId(String)`.)
A delayed message is removed from the `MessageStore` by the scheduled task immediately before the `DelayHandler` sends the message to the `output-channel`.
If the provided `MessageStore` is persistent (such as `JdbcMessageStore`), it provides the ability to not lose messages on the application shutdown.
After application startup, the `DelayHandler` reads messages from its message group in the `MessageStore` and reschedules them with a delay based on the original arrival time of the message (if the delay is numeric).
For messages where the delay header was a `Date`, that `Date` is used when rescheduling.
If a delayed message remains in the `MessageStore` more than its 'delay', it is sent immediately after startup.
The `messageGroupId` is required and cannot rely on a `DelayHandler` bean name which can be generated.
That way, after application restart, a `DelayHandler` may get a new generated bean name.
Therefore, delayed messages might be lost from rescheduling since their group is not managed by the application anymore.

The `<delayer>` can be enriched with either of two mutually exclusive elements: `<transactional>` and `<advice-chain>`.
The `List` of these AOP advices is applied to the proxied internal `DelayHandler.ReleaseMessageHandler`, which has the responsibility to release the message, after the delay, on a `Thread` of the scheduled task.
Expand Down

0 comments on commit b576748

Please sign in to comment.