Skip to content

Commit

Permalink
[Fix apache/incubator-kie-issues#987] New onError event
Browse files Browse the repository at this point in the history
  • Loading branch information
fjtirado committed Mar 5, 2024
1 parent cfd18f3 commit 283d96d
Show file tree
Hide file tree
Showing 6 changed files with 120 additions and 81 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -158,12 +158,11 @@ void fireOnUserTaskCommentAdded(
KieRuntime kruntime,
Comment addedComment);

//

void reset();

void addEventListener(KogitoProcessEventListener listener);

void removeEventListener(KogitoProcessEventListener listener);

}
void fireOnError(KogitoProcessInstance instance, KogitoNodeInstance nodeInstance, KieRuntime kruntime, Exception exception);
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.kie.kogito.uow.events;

import org.kie.api.event.process.ErrorEvent;
import org.kie.api.event.process.MessageEvent;
import org.kie.api.event.process.ProcessCompletedEvent;
import org.kie.api.event.process.ProcessEvent;
Expand Down Expand Up @@ -59,12 +60,11 @@ private void intercept(UserTaskEvent event) {

@Override
public void beforeProcessStarted(ProcessStartedEvent event) {

intercept(event);
}

@Override
public void afterProcessStarted(ProcessStartedEvent event) {
intercept(event);
}

@Override
Expand Down Expand Up @@ -193,4 +193,9 @@ public void onUserTaskOutputVariable(UserTaskVariableEvent event) {
intercept(event);
}

@Override
public void onError(ErrorEvent event) {
intercept(event);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@
package org.kie.kogito.event.impl;

import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.TreeSet;

import org.kie.api.event.process.ErrorEvent;
import org.kie.api.event.process.ProcessCompletedEvent;
import org.kie.api.event.process.ProcessEvent;
import org.kie.api.event.process.ProcessNodeEvent;
Expand All @@ -38,7 +40,6 @@
import org.kie.api.event.usertask.UserTaskAttachmentEvent;
import org.kie.api.event.usertask.UserTaskCommentEvent;
import org.kie.api.event.usertask.UserTaskDeadlineEvent;
import org.kie.api.event.usertask.UserTaskEvent;
import org.kie.api.event.usertask.UserTaskStateEvent;
import org.kie.api.event.usertask.UserTaskVariableEvent;
import org.kie.kogito.Addons;
Expand Down Expand Up @@ -70,7 +71,6 @@
import org.kie.kogito.event.usertask.UserTaskInstanceVariableEventBody;
import org.kie.kogito.internal.process.event.KogitoProcessVariableChangedEvent;
import org.kie.kogito.internal.process.runtime.KogitoNodeInstance;
import org.kie.kogito.internal.process.runtime.KogitoProcessInstance;
import org.kie.kogito.internal.process.runtime.KogitoWorkItem;
import org.kie.kogito.internal.process.runtime.KogitoWorkItemNodeInstance;
import org.kie.kogito.internal.process.runtime.KogitoWorkflowProcessInstance;
Expand All @@ -86,25 +86,19 @@ public class ProcessInstanceEventBatch implements EventBatch {
public ProcessInstanceEventBatch(String service, Addons addons) {
this.service = service;
this.addons = addons != null ? addons : Addons.EMTPY;
this.processedEvents = new ArrayList<>();
}

@Override
public void append(Object rawEvent) {
if (rawEvent instanceof ProcessEvent) {
addDataEvent((ProcessEvent) rawEvent);
} else if (rawEvent instanceof UserTaskEvent) {
addDataEvent((UserTaskEvent) rawEvent);
}
this.processedEvents = new TreeSet<>(new Comparator<DataEvent<?>>() {
@Override
public int compare(DataEvent<?> event1, DataEvent<?> event2) {
return event2 instanceof ProcessInstanceStateDataEvent &&
((ProcessInstanceStateDataEvent) event2).getData().getEventType() == ProcessInstanceStateEventBody.EVENT_TYPE_ENDED
|| event1 instanceof ProcessInstanceStateDataEvent &&
((ProcessInstanceStateDataEvent) event1).getData().getEventType() == ProcessInstanceStateEventBody.EVENT_TYPE_STARTED ? -1 : 1;
}
});
}

@Override
public Collection<DataEvent<?>> events() {
return processedEvents;
}

private void addDataEvent(ProcessEvent event) {
// process events
public void append(Object event) {
if (event instanceof ProcessStartedEvent) {
handleProcessStateEvent((ProcessStartedEvent) event);
} else if (event instanceof ProcessCompletedEvent) {
Expand All @@ -117,9 +111,28 @@ private void addDataEvent(ProcessEvent event) {
handleProcesssNodeEvent((SLAViolatedEvent) event);
} else if (event instanceof ProcessVariableChangedEvent) {
handleProcessVariableEvent((ProcessVariableChangedEvent) event);
} else if (event instanceof ErrorEvent) {
handleErrorEvent((ErrorEvent) event);
} else if (event instanceof UserTaskStateEvent) {
handleUserTaskStateEvent((UserTaskStateEvent) event);
} else if (event instanceof UserTaskDeadlineEvent) {
handleUserTaskDeadlineEvent((UserTaskDeadlineEvent) event);
} else if (event instanceof UserTaskAssignmentEvent) {
handleUserTaskAssignmentEvent((UserTaskAssignmentEvent) event);
} else if (event instanceof UserTaskVariableEvent) {
handleUserTaskVariableEvent((UserTaskVariableEvent) event);
} else if (event instanceof UserTaskAttachmentEvent) {
handleUserTaskAttachmentEvent((UserTaskAttachmentEvent) event);
} else if (event instanceof UserTaskCommentEvent) {
handleUserTaskCommentEvent((UserTaskCommentEvent) event);
}
}

@Override
public Collection<DataEvent<?>> events() {
return processedEvents;
}

private void handleProcessVariableEvent(ProcessVariableChangedEvent event) {
if (event.getTags().contains(KogitoTags.INTERNAL_TAG)) {
return;
Expand Down Expand Up @@ -245,52 +258,31 @@ private ProcessInstanceNodeDataEvent toProcessInstanceNodeEvent(ProcessNodeEvent
return piEvent;
}

private void handleProcessStateEvent(ProcessCompletedEvent event) {
private void handleErrorEvent(ErrorEvent event) {
KogitoWorkflowProcessInstance pi = (KogitoWorkflowProcessInstance) event.getProcessInstance();
ProcessInstanceErrorEventBody errorBody = ProcessInstanceErrorEventBody.create()
.eventDate(new Date())
.eventUser(event.getEventIdentity())
.processInstanceId(pi.getId())
.processId(pi.getProcessId())
.processVersion(pi.getProcessVersion())
.nodeDefinitionId(pi.getNodeIdInError())
.nodeInstanceId(pi.getNodeInstanceIdInError())
.errorMessage(pi.getErrorMessage())
.build();
Map<String, Object> metadata = buildProcessMetadata((KogitoWorkflowProcessInstance) event.getProcessInstance());
ProcessInstanceErrorDataEvent piEvent =
new ProcessInstanceErrorDataEvent(buildSource(event.getProcessInstance().getProcessId()), addons.toString(), event.getEventIdentity(), metadata, errorBody);
piEvent.setKogitoBusinessKey(pi.getBusinessKey());
processedEvents.add(piEvent);
}

private void handleProcessStateEvent(ProcessCompletedEvent event) {
processedEvents.add(toProcessInstanceStateEvent(event, ProcessInstanceStateEventBody.EVENT_TYPE_ENDED));

KogitoWorkflowProcessInstance pi = (KogitoWorkflowProcessInstance) event.getProcessInstance();
if (pi.getState() == KogitoProcessInstance.STATE_ERROR) {
ProcessInstanceErrorEventBody errorBody = ProcessInstanceErrorEventBody.create()
.eventDate(new Date())
.eventUser(event.getEventIdentity())
.processInstanceId(pi.getId())
.processId(pi.getProcessId())
.processVersion(pi.getProcessVersion())
.nodeDefinitionId(pi.getNodeIdInError())
.nodeInstanceId(pi.getNodeInstanceIdInError())
.errorMessage(pi.getErrorMessage())
.build();
Map<String, Object> metadata = buildProcessMetadata((KogitoWorkflowProcessInstance) event.getProcessInstance());
ProcessInstanceErrorDataEvent piEvent =
new ProcessInstanceErrorDataEvent(buildSource(event.getProcessInstance().getProcessId()), addons.toString(), event.getEventIdentity(), metadata, errorBody);
piEvent.setKogitoBusinessKey(pi.getBusinessKey());
processedEvents.add(piEvent);
}
}

private void handleProcessStateEvent(ProcessStartedEvent event) {
processedEvents.add(toProcessInstanceStateEvent(event, ProcessInstanceStateEventBody.EVENT_TYPE_STARTED));

KogitoWorkflowProcessInstance pi = (KogitoWorkflowProcessInstance) event.getProcessInstance();
if (pi.getState() == KogitoProcessInstance.STATE_ERROR) {
ProcessInstanceErrorEventBody errorBody = ProcessInstanceErrorEventBody.create()
.eventDate(new Date())
.eventUser(event.getEventIdentity())
.processInstanceId(pi.getId())
.processId(pi.getProcessId())
.processVersion(pi.getProcessVersion())
.nodeDefinitionId(pi.getNodeIdInError())
.nodeInstanceId(pi.getNodeInstanceIdInError())
.errorMessage(pi.getErrorMessage())
.build();
Map<String, Object> metadata = buildProcessMetadata((KogitoWorkflowProcessInstance) event.getProcessInstance());
ProcessInstanceErrorDataEvent piEvent =
new ProcessInstanceErrorDataEvent(buildSource(event.getProcessInstance().getProcessId()), addons.toString(), event.getEventIdentity(), metadata, errorBody);
piEvent.setKogitoBusinessKey(pi.getBusinessKey());
processedEvents.add(piEvent);
}

}

private ProcessInstanceStateDataEvent toProcessInstanceStateEvent(ProcessEvent event, int eventType) {
Expand Down Expand Up @@ -339,23 +331,6 @@ private Map<String, Object> buildProcessMetadata(KogitoWorkflowProcessInstance p
return metadata;
}

private void addDataEvent(UserTaskEvent event) {
// this should go in another event types
if (event instanceof UserTaskStateEvent) {
handleUserTaskStateEvent((UserTaskStateEvent) event);
} else if (event instanceof UserTaskDeadlineEvent) {
handleUserTaskDeadlineEvent((UserTaskDeadlineEvent) event);
} else if (event instanceof UserTaskAssignmentEvent) {
handleUserTaskAssignmentEvent((UserTaskAssignmentEvent) event);
} else if (event instanceof UserTaskVariableEvent) {
handleUserTaskVariableEvent((UserTaskVariableEvent) event);
} else if (event instanceof UserTaskAttachmentEvent) {
handleUserTaskAttachmentEvent((UserTaskAttachmentEvent) event);
} else if (event instanceof UserTaskCommentEvent) {
handleUserTaskCommentEvent((UserTaskCommentEvent) event);
}
}

private void handleUserTaskCommentEvent(UserTaskCommentEvent event) {
Map<String, Object> metadata = buildUserTaskMetadata((HumanTaskWorkItem) event.getWorkItem());
metadata.putAll(buildProcessMetadata((KogitoWorkflowProcessInstance) event.getProcessInstance()));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.jbpm.process.instance.event;

import org.kie.api.event.process.ErrorEvent;
import org.kie.api.runtime.KieRuntime;
import org.kie.api.runtime.process.NodeInstance;
import org.kie.api.runtime.process.ProcessInstance;

public class ErrorEventImpl extends ProcessEvent implements ErrorEvent {

private static final long serialVersionUID = 1L;
private final NodeInstance nodeInstance;
private final Exception exception;

public ErrorEventImpl(ProcessInstance instance, KieRuntime kruntime, NodeInstance nodeInstance,
Exception exception) {
super(instance, kruntime);
this.nodeInstance = nodeInstance;
this.exception = exception;
}

@Override
public NodeInstance getNodeInstance() {
return nodeInstance;
}

@Override
public Exception getException() {
return exception;
}

@Override
public String toString() {
return "ErrorEventImpl [nodeInstance=" + nodeInstance + ", exception=" + exception + "]";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -367,4 +367,10 @@ public void reset() {
this.clear();
}

@Override
public void fireOnError(KogitoProcessInstance instance, KogitoNodeInstance nodeInstance, KieRuntime kruntime, Exception exception) {
ErrorEventImpl event = new ErrorEventImpl(instance, kruntime, nodeInstance, exception);
notifyAllListeners(l -> l.onError(event));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -1149,6 +1149,7 @@ public void setErrorState(NodeInstance nodeInstanceInError, Exception e) {
this.errorMessage = rootException.getClass().getCanonicalName() + " - " + rootException.getMessage();
setState(STATE_ERROR);
logger.error("Unexpected error while executing node {} in process instance {}", nodeInstanceInError.getNode().getName(), this.getStringId(), e);
((InternalProcessRuntime) getKnowledgeRuntime().getProcessRuntime()).getProcessEventSupport().fireOnError(this, nodeInstanceInError, getKnowledgeRuntime(), e);
// remove node instance that caused an error
((org.jbpm.workflow.instance.NodeInstanceContainer) nodeInstanceInError.getNodeInstanceContainer()).removeNodeInstance(nodeInstanceInError);
}
Expand Down

0 comments on commit 283d96d

Please sign in to comment.