Skip to content

Commit

Permalink
[Fix apache#3524] Use timeout per Event state, not per Event type
Browse files Browse the repository at this point in the history
  • Loading branch information
fjtirado committed May 23, 2024
1 parent edbfc64 commit b5fec23
Showing 1 changed file with 17 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,10 @@
import org.kie.kogito.serverless.workflow.parser.ServerlessWorkflowParser;

import io.serverlessworkflow.api.Workflow;
import io.serverlessworkflow.api.actions.Action;
import io.serverlessworkflow.api.events.OnEvents;
import io.serverlessworkflow.api.states.EventState;

import static org.kie.kogito.serverless.workflow.parser.handlers.NodeFactoryUtils.eventBasedSplitNode;
import static org.kie.kogito.serverless.workflow.parser.handlers.NodeFactoryUtils.joinExclusiveNode;
import static org.kie.kogito.serverless.workflow.parser.handlers.NodeFactoryUtils.startMessageNode;
import static org.kie.kogito.serverless.workflow.utils.TimeoutsConfigResolver.resolveEventTimeout;

public class EventHandler extends CompositeContextNodeHandler<EventState> {

Expand All @@ -53,47 +49,27 @@ public void handleStart() {
// disable standard procedure
}

@Override
public MakeNodeResult makeNode(RuleFlowNodeContainerFactory<?, ?> factory) {
return joinNodes(factory, state.getOnEvents(), this::processOnEvent);
}

private MakeNodeResult processOnEvent(RuleFlowNodeContainerFactory<?, ?> factory, OnEvents onEvent) {
if (isStartState) {
MakeNodeResult result = joinNodes(factory,
onEvent.getEventRefs(), (fact, onEventRef) -> filterAndMergeNode(fact, onEvent.getEventDataFilter(), ServerlessWorkflowParser.DEFAULT_WORKFLOW_VAR,
(f, inputVar, outputVar) -> buildEventNode(f, onEventRef, inputVar, outputVar)));
CompositeContextNodeFactory<?> embeddedSubProcess = handleActions(makeCompositeNode(factory), onEvent.getActions());
connect(result.getOutgoingNode(), embeddedSubProcess);
return new MakeNodeResult(result.getIncomingNode(), embeddedSubProcess);
return joinNodes(factory, state.getOnEvents(), this::processOnEvent);
} else {
String varName = getVarName();
CompositeContextNodeFactory<?> embeddedSubProcess = makeCompositeNode(factory);
NodeFactory<?, ?> startNode = embeddedSubProcess.startNode(parserContext.newId()).name("EmbeddedStart");
JoinFactory<?> joinNode = null;
String eventTimeout = resolveEventTimeout(state, workflow);
if (eventTimeout != null) {
// creating a split-join branch for the timer
SplitFactory<?> splitNode = eventBasedSplitNode(embeddedSubProcess.splitNode(parserContext.newId()), Split.TYPE_XAND);
joinNode = joinExclusiveNode(embeddedSubProcess.joinNode(parserContext.newId()));
startNode = connect(startNode, splitNode);
createTimerNode(embeddedSubProcess, splitNode, joinNode, eventTimeout);
}
MakeNodeResult result = joinNodes(embeddedSubProcess,
onEvent.getEventRefs(), (fact, onEventRef) -> filterAndMergeNode(fact, onEvent.getEventDataFilter(), varName,
(f, inputVar, outputVar) -> buildEventNode(f, onEventRef, inputVar, outputVar)));
connect(startNode, result.getIncomingNode());
NodeFactory<?, ?> currentNode = result.getOutgoingNode();
for (Action action : onEvent.getActions()) {
currentNode = connect(currentNode, getActionNode(embeddedSubProcess, action, varName, true));
}
if (joinNode != null) {
currentNode = connect(currentNode, joinNode);
}
connect(currentNode, embeddedSubProcess.endNode(parserContext.newId()).name("EmbeddedEnd").terminate(true)).done();
handleErrors(parserContext.factory(), embeddedSubProcess);
return new MakeNodeResult(embeddedSubProcess);
CompositeContextNodeFactory<?> embeddedContainer = makeCompositeNode(factory);
connect(connect(embeddedContainer.startNode(parserContext.newId()).name("EmbeddedStart"),
makeTimeoutNode(embeddedContainer, joinNodes(embeddedContainer, state.getOnEvents(), this::processOnEvent))),
embeddedContainer.endNode(parserContext.newId()).name("EmbeddedEnd").terminate(true)).done();
handleErrors(factory, embeddedContainer);
return new MakeNodeResult(embeddedContainer);
}

}

private MakeNodeResult processOnEvent(RuleFlowNodeContainerFactory<?, ?> factory, OnEvents onEvent) {
MakeNodeResult result = joinNodes(factory,
onEvent.getEventRefs(), (fact, onEventRef) -> filterAndMergeNode(fact, onEvent.getEventDataFilter(), isStartState ? ServerlessWorkflowParser.DEFAULT_WORKFLOW_VAR : getVarName(),
(f, inputVar, outputVar) -> buildEventNode(f, onEventRef, inputVar, outputVar)));
CompositeContextNodeFactory<?> embeddedSubProcess = handleActions(makeCompositeNode(factory), onEvent.getActions());
connect(result.getOutgoingNode(), embeddedSubProcess);
return new MakeNodeResult(result.getIncomingNode(), embeddedSubProcess);
}

private <T> MakeNodeResult joinNodes(RuleFlowNodeContainerFactory<?, ?> factory, List<T> events, BiFunction<RuleFlowNodeContainerFactory<?, ?>, T, MakeNodeResult> function) {
Expand Down

0 comments on commit b5fec23

Please sign in to comment.