Skip to content

Commit

Permalink
[Fix apache#3524] Use timeout per Event state, not per Event type
Browse files Browse the repository at this point in the history
  • Loading branch information
fjtirado committed May 23, 2024
1 parent edbfc64 commit baa8147
Show file tree
Hide file tree
Showing 5 changed files with 23 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ protected final CompositeContextNodeFactory<?> makeCompositeNode(RuleFlowNodeCon
} else {
connect(embeddedSubProcess.startNode(parserContext.newId()).name("EmbeddedStart"), embeddedSubProcess.endNode(parserContext.newId()).name("EmbeddedEnd").terminate(true)).done();
}
handleErrors(parserContext.factory(), embeddedSubProcess);
return embeddedSubProcess;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,10 @@
import org.kie.kogito.serverless.workflow.parser.ServerlessWorkflowParser;

import io.serverlessworkflow.api.Workflow;
import io.serverlessworkflow.api.actions.Action;
import io.serverlessworkflow.api.events.OnEvents;
import io.serverlessworkflow.api.states.EventState;

import static org.kie.kogito.serverless.workflow.parser.handlers.NodeFactoryUtils.eventBasedSplitNode;
import static org.kie.kogito.serverless.workflow.parser.handlers.NodeFactoryUtils.joinExclusiveNode;
import static org.kie.kogito.serverless.workflow.parser.handlers.NodeFactoryUtils.startMessageNode;
import static org.kie.kogito.serverless.workflow.utils.TimeoutsConfigResolver.resolveEventTimeout;

public class EventHandler extends CompositeContextNodeHandler<EventState> {

Expand All @@ -53,47 +49,27 @@ public void handleStart() {
// disable standard procedure
}

@Override
public MakeNodeResult makeNode(RuleFlowNodeContainerFactory<?, ?> factory) {
return joinNodes(factory, state.getOnEvents(), this::processOnEvent);
}

private MakeNodeResult processOnEvent(RuleFlowNodeContainerFactory<?, ?> factory, OnEvents onEvent) {
if (isStartState) {
MakeNodeResult result = joinNodes(factory,
onEvent.getEventRefs(), (fact, onEventRef) -> filterAndMergeNode(fact, onEvent.getEventDataFilter(), ServerlessWorkflowParser.DEFAULT_WORKFLOW_VAR,
(f, inputVar, outputVar) -> buildEventNode(f, onEventRef, inputVar, outputVar)));
CompositeContextNodeFactory<?> embeddedSubProcess = handleActions(makeCompositeNode(factory), onEvent.getActions());
connect(result.getOutgoingNode(), embeddedSubProcess);
return new MakeNodeResult(result.getIncomingNode(), embeddedSubProcess);
return joinNodes(factory, state.getOnEvents(), this::processOnEvent);
} else {
String varName = getVarName();
CompositeContextNodeFactory<?> embeddedSubProcess = makeCompositeNode(factory);
NodeFactory<?, ?> startNode = embeddedSubProcess.startNode(parserContext.newId()).name("EmbeddedStart");
JoinFactory<?> joinNode = null;
String eventTimeout = resolveEventTimeout(state, workflow);
if (eventTimeout != null) {
// creating a split-join branch for the timer
SplitFactory<?> splitNode = eventBasedSplitNode(embeddedSubProcess.splitNode(parserContext.newId()), Split.TYPE_XAND);
joinNode = joinExclusiveNode(embeddedSubProcess.joinNode(parserContext.newId()));
startNode = connect(startNode, splitNode);
createTimerNode(embeddedSubProcess, splitNode, joinNode, eventTimeout);
}
MakeNodeResult result = joinNodes(embeddedSubProcess,
onEvent.getEventRefs(), (fact, onEventRef) -> filterAndMergeNode(fact, onEvent.getEventDataFilter(), varName,
(f, inputVar, outputVar) -> buildEventNode(f, onEventRef, inputVar, outputVar)));
connect(startNode, result.getIncomingNode());
NodeFactory<?, ?> currentNode = result.getOutgoingNode();
for (Action action : onEvent.getActions()) {
currentNode = connect(currentNode, getActionNode(embeddedSubProcess, action, varName, true));
}
if (joinNode != null) {
currentNode = connect(currentNode, joinNode);
}
connect(currentNode, embeddedSubProcess.endNode(parserContext.newId()).name("EmbeddedEnd").terminate(true)).done();
handleErrors(parserContext.factory(), embeddedSubProcess);
return new MakeNodeResult(embeddedSubProcess);
CompositeContextNodeFactory<?> embeddedContainer = makeCompositeNode(factory);
connect(connect(embeddedContainer.startNode(parserContext.newId()).name("EmbeddedStart"),
makeTimeoutNode(embeddedContainer, joinNodes(embeddedContainer, state.getOnEvents(), this::processOnEvent))),
embeddedContainer.endNode(parserContext.newId()).name("EmbeddedEnd").terminate(true)).done();
handleErrors(factory, embeddedContainer);
return new MakeNodeResult(embeddedContainer);
}

}

private MakeNodeResult processOnEvent(RuleFlowNodeContainerFactory<?, ?> factory, OnEvents onEvent) {
MakeNodeResult result = joinNodes(factory,
onEvent.getEventRefs(), (fact, onEventRef) -> filterAndMergeNode(fact, onEvent.getEventDataFilter(), isStartState ? ServerlessWorkflowParser.DEFAULT_WORKFLOW_VAR : getVarName(),
(f, inputVar, outputVar) -> buildEventNode(f, onEventRef, inputVar, outputVar)));
CompositeContextNodeFactory<?> embeddedSubProcess = handleActions(makeCompositeNode(factory), onEvent.getActions());
connect(result.getOutgoingNode(), embeddedSubProcess);
return new MakeNodeResult(result.getIncomingNode(), embeddedSubProcess);
}

private <T> MakeNodeResult joinNodes(RuleFlowNodeContainerFactory<?, ?> factory, List<T> events, BiFunction<RuleFlowNodeContainerFactory<?, ?>, T, MakeNodeResult> function) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ protected MakeNodeResult makeNode(RuleFlowNodeContainerFactory<?, ?> factory) {
result.completionAction(new CollectorActionSupplier(workflow.getExpressionLang(), state.getOutputCollection(), DEFAULT_WORKFLOW_VAR, TEMP_OUTPUT_VAR));
}
handleActions(result, state.getActions(), FOR_EACH_OUTPUT_VARIABLE, false);
handleErrors(factory, result);
return new MakeNodeResult(result);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.kie.kogito.serverless.workflow.parser.handlers;

import org.jbpm.ruleflow.core.RuleFlowNodeContainerFactory;
import org.jbpm.ruleflow.core.factory.CompositeContextNodeFactory;
import org.kie.kogito.serverless.workflow.parser.ParserContext;

import io.serverlessworkflow.api.Workflow;
Expand All @@ -37,6 +38,8 @@ public boolean usedForCompensation() {

@Override
public MakeNodeResult makeNode(RuleFlowNodeContainerFactory<?, ?> factory) {
return new MakeNodeResult(handleActions(makeCompositeNode(factory), state.getActions()));
CompositeContextNodeFactory<?> embeddedContainer = handleActions(makeCompositeNode(factory), state.getActions());
handleErrors(parserContext.factory(), embeddedContainer);
return new MakeNodeResult(embeddedContainer);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ public MakeNodeResult makeNode(RuleFlowNodeContainerFactory<?, ?> factory) {
for (Branch branch : state.getBranches()) {
currentBranch = branch;
CompositeContextNodeFactory<?> embeddedSubProcess = handleActions(makeCompositeNode(factory, getName(branch)), branch.getActions());
handleErrors(factory, embeddedSubProcess);
WorkflowElementIdentifier branchId = embeddedSubProcess.getNode().getId();
embeddedSubProcess.done().connection(nodeFactory.getNode().getId(), branchId).connection(branchId, connectionNode.getNode().getId());
}
Expand Down

0 comments on commit baa8147

Please sign in to comment.