Skip to content

Commit

Permalink
Merge pull request #7635 from GlobalDataverseCommunityConsortium/DANS…
Browse files Browse the repository at this point in the history
…/7564_messaging_from_workflows

DANS/7564 messaging from workflows
  • Loading branch information
kcondon authored Mar 23, 2021
2 parents c86dae5 + 99d47bc commit dd36c7e
Show file tree
Hide file tree
Showing 18 changed files with 290 additions and 49 deletions.
38 changes: 33 additions & 5 deletions doc/sphinx-guides/source/developers/workflows.rst
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@ Steps can be internal (say, writing some data to the log) or external. External

The external system reports the step result back to the Dataverse installation, by sending a HTTP ``POST`` command to ``api/workflows/{invocation-id}`` with Content-Type: text/plain. The body of the request is passed to the paused step for further processing.

Steps can define messages to send to the log and to users. If defined, the message to users is sent as a user notification (creating an email and showing in the user notification tab) and will show once for the given user if/when they view the relevant dataset page. The latter provides a means for the asynchronous workflow execution to report success or failure analogous to the way the publication and other processes report on the page.

If a step in a workflow fails, the Dataverse installation makes an effort to roll back all the steps that preceded it. Some actions, such as writing to the log, cannot be rolled back. If such an action has a public external effect (e.g. send an EMail to a mailing list) it is advisable to put it in the post-release workflow.

.. tip::
For invoking external systems using a REST api, the Dataverse Software's internal step
provider offers two steps for sending and receiving customizable HTTP requests.
*http/sr* and *http/authExt*, detailed below, with the latter able to use the API to make changes to the dataset being processed. Both lock the dataset to prevent other processes from changing the dataset between the time the step is launched to when the external process responds to the Dataverse instance.
*http/sr* and *http/authExt*, detailed below, with the latter able to use the API to make changes to the dataset being processed. (Both lock the dataset to prevent other processes from changing the dataset between the time the step is launched to when the external process responds to the Dataverse instance.)

Administration
~~~~~~~~~~~~~~
Expand Down Expand Up @@ -70,6 +72,23 @@ The pause step is intended for testing - the invocationId required to end the pa
"stepType":"pause"
}
pause/message
+++++++++++++

A variant of the pause step that pauses the workflow and allows the external process to send a success/failure message. The workflow is paused until a POST request is sent to ``/api/workflows/{invocation-id}``.
The response in the POST body (Content-type:application/json) should be a json object (the same as for the http/extauth step) containing:
- "status" - can be "success" or "failure"
- "reason" - a message that will be logged
- "message" - a message to send to the user that will be sent as a notification and as a banner on the relevant dataset page.
An unparsable reponse will be considered a Failure that will be logged with no user message. (See the http/authext step for an example POST call)

.. code:: json
{
"provider":":internal",
"stepType":"pause/message"
}
http/sr
+++++++
Expand Down Expand Up @@ -113,11 +132,20 @@ The invocationId must be sent as an 'X-Dataverse-invocationId' HTTP Header or as
Once this step completes and responds, the invocationId is invalidated and will not allow further access.

The url, content type, and message body can use data from the workflow context, using a simple markup language. This step has specific parameters for rollback.
The workflow is restarted when the external system replies with a POST request to ``/api/workflows/{invocation-id}``.
The workflow is restarted when the external system replies with a POST request to ``/api/workflows/{invocation-id}`` (Content-Type: application/json).

The response has is expected to be a json object with three keys:
- "Status" - can be "Success" or "Failure"
- "Reason" - a message that will be logged
- "Message" - a message to send to the user (message sending is not yet implemented).
- "status" - can be "success" or "failure"
- "reason" - a message that will be logged
- "message" - a message to send to the user that will be sent as a notification and as a banner on the relevant dataset page.

.. code-block:: bash
export INVOCATION_ID=xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx
export SERVER_URL=https://demo.dataverse.org
export MESSAGE={"status":"success", "reason":"Workflow completed in 10 seconds", "message":"An external workflow to virus check your data was successfully run prior to publication of your data"}
curl -H 'Content-Type:application/json' -X POST -d $MESSAGE "$SERVER_URL/api/workflows/$INVOCATION_ID"
.. code:: json
Expand Down
24 changes: 23 additions & 1 deletion src/main/java/edu/harvard/iq/dataverse/DatasetPage.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@

import edu.harvard.iq.dataverse.util.StringUtil;
import edu.harvard.iq.dataverse.util.SystemConfig;
import edu.harvard.iq.dataverse.workflows.WorkflowComment;

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
Expand Down Expand Up @@ -1883,7 +1885,9 @@ private String init(boolean initFull) {
MakeDataCountEntry entry = new MakeDataCountEntry(FacesContext.getCurrentInstance(), dvRequestService, workingVersion);
mdcLogService.logEntry(entry);
}

displayWorkflowComments();


if (initFull) {
// init the list of FileMetadatas
if (workingVersion.isDraft() && canUpdateDataset()) {
Expand Down Expand Up @@ -2043,6 +2047,22 @@ private String init(boolean initFull) {
return null;
}

private void displayWorkflowComments() {
List<WorkflowComment> comments = workingVersion.getWorkflowComments();
for (WorkflowComment wfc : comments) {
if (wfc.isToBeShown() && wfc.getDatasetVersion().equals(workingVersion)
&& wfc.getAuthenticatedUser().equals(session.getUser())) {
if (wfc.getType() == WorkflowComment.Type.WORKFLOW_SUCCESS) {
JsfHelper.addSuccessMessage(wfc.getMessage());

} else if (wfc.getType() == WorkflowComment.Type.WORKFLOW_FAILURE) {
JsfHelper.addWarningMessage(wfc.getMessage());
}
datasetService.markWorkflowCommentAsRead(wfc);
}
}
}

private void displayLockInfo(Dataset dataset) {
// Various info messages, when the dataset is locked (for various reasons):
if (dataset.isLocked() && canUpdateDataset()) {
Expand Down Expand Up @@ -2729,6 +2749,8 @@ public String refresh() {
}
}

displayWorkflowComments();

return "";
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -787,6 +787,12 @@ public WorkflowComment addWorkflowComment(WorkflowComment workflowComment) {
return workflowComment;
}

public void markWorkflowCommentAsRead(WorkflowComment workflowComment) {
workflowComment.setToBeShown(false);
em.merge(workflowComment);
}


/**
* This method used to throw CommandException, which was pretty pointless
* seeing how it's called asynchronously. As of v5.0 any CommanExceptiom
Expand Down
22 changes: 22 additions & 0 deletions src/main/java/edu/harvard/iq/dataverse/MailServiceBean.java
Original file line number Diff line number Diff line change
Expand Up @@ -494,6 +494,26 @@ public String getMessageTextBasedOnNotification(UserNotification userNotificatio
version.getDataset().getOwner().getDisplayName(), getDataverseLink(version.getDataset().getOwner()), optionalReturnReason};
messageText += MessageFormat.format(pattern, paramArrayReturnedDataset);
return messageText;

case WORKFLOW_SUCCESS:
version = (DatasetVersion) targetObject;
pattern = BundleUtil.getStringFromBundle("notification.email.workflow.success");

if (comment == null) {
comment = BundleUtil.getStringFromBundle("notification.email.workflow.nullMessage");
}
String[] paramArrayWorkflowSuccess = {version.getDataset().getDisplayName(), getDatasetLink(version.getDataset()), comment};
messageText += MessageFormat.format(pattern, paramArrayWorkflowSuccess);
return messageText;
case WORKFLOW_FAILURE:
version = (DatasetVersion) targetObject;
pattern = BundleUtil.getStringFromBundle("notification.email.workflow.failure");
if (comment == null) {
comment = BundleUtil.getStringFromBundle("notification.email.workflow.nullMessage");
}
String[] paramArrayWorkflowFailure = {version.getDataset().getDisplayName(), getDatasetLink(version.getDataset()), comment};
messageText += MessageFormat.format(pattern, paramArrayWorkflowFailure);
return messageText;
case CREATEACC:
InternetAddress systemAddress = getSystemAddress();
String accountCreatedMessage = BundleUtil.getStringFromBundle("notification.email.welcome", Arrays.asList(
Expand Down Expand Up @@ -591,6 +611,8 @@ private Object getObjectOfNotification (UserNotification userNotification){
case PUBLISHEDDS:
case PUBLISHFAILED_PIDREG:
case RETURNEDDS:
case WORKFLOW_SUCCESS:
case WORKFLOW_FAILURE:
return versionService.find(userNotification.getObjectId());
case CREATEACC:
return userNotification.getUser();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public enum Type {
ASSIGNROLE, REVOKEROLE, CREATEDV, CREATEDS, CREATEACC, SUBMITTEDDS, RETURNEDDS,
PUBLISHEDDS, REQUESTFILEACCESS, GRANTFILEACCESS, REJECTFILEACCESS, FILESYSTEMIMPORT,
CHECKSUMIMPORT, CHECKSUMFAIL, CONFIRMEMAIL, APIGENERATED, INGESTCOMPLETED, INGESTCOMPLETEDWITHERRORS,
PUBLISHFAILED_PIDREG
PUBLISHFAILED_PIDREG, WORKFLOW_SUCCESS, WORKFLOW_FAILURE
};

private static final long serialVersionUID = 1L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,8 @@ public void displayNotification() {
case PUBLISHEDDS:
case PUBLISHFAILED_PIDREG:
case RETURNEDDS:
case WORKFLOW_SUCCESS:
case WORKFLOW_FAILURE:
userNotification.setTheObject(datasetVersionService.find(userNotification.getObjectId()));
break;

Expand Down
4 changes: 4 additions & 0 deletions src/main/java/edu/harvard/iq/dataverse/util/MailUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ public static String getSubjectTextBasedOnNotification(UserNotification userNoti
return BundleUtil.getStringFromBundle("notification.email.publishFailure.dataset.subject", rootDvNameAsList);
case RETURNEDDS:
return BundleUtil.getStringFromBundle("notification.email.returned.dataset.subject", rootDvNameAsList);
case WORKFLOW_SUCCESS:
return BundleUtil.getStringFromBundle("notification.email.workflow.success.subject", rootDvNameAsList);
case WORKFLOW_FAILURE:
return BundleUtil.getStringFromBundle("notification.email.workflow.failure.subject", rootDvNameAsList);
case CREATEACC:
return BundleUtil.getStringFromBundle("notification.email.create.account.subject", rootDvNameAsList);
case CHECKSUMFAIL:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
import edu.harvard.iq.dataverse.DataverseRequestServiceBean;
import edu.harvard.iq.dataverse.EjbDataverseEngine;
import edu.harvard.iq.dataverse.RoleAssigneeServiceBean;
import edu.harvard.iq.dataverse.UserNotification;
import edu.harvard.iq.dataverse.UserNotification.Type;
import edu.harvard.iq.dataverse.UserNotificationServiceBean;
import edu.harvard.iq.dataverse.authorization.users.ApiToken;
import edu.harvard.iq.dataverse.authorization.users.AuthenticatedUser;
import edu.harvard.iq.dataverse.engine.command.CommandContext;
Expand All @@ -16,11 +19,14 @@
import edu.harvard.iq.dataverse.workflow.internalspi.InternalWorkflowStepSP;
import edu.harvard.iq.dataverse.workflow.step.Failure;
import edu.harvard.iq.dataverse.workflow.step.Pending;
import edu.harvard.iq.dataverse.workflow.step.Success;
import edu.harvard.iq.dataverse.workflow.step.WorkflowStep;
import edu.harvard.iq.dataverse.workflow.step.WorkflowStepData;
import edu.harvard.iq.dataverse.workflow.step.WorkflowStepResult;
import edu.harvard.iq.dataverse.workflows.WorkflowComment;

import java.util.Date;
import java.sql.Timestamp;
import java.time.Instant;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -63,6 +69,9 @@ public class WorkflowServiceBean {
@EJB
SystemConfig systemConfig;

@EJB
UserNotificationServiceBean userNotificationService;

@EJB
EjbDataverseEngine engine;

Expand Down Expand Up @@ -209,10 +218,22 @@ private void doResume(PendingWorkflowInvocation pending, String body) {
final WorkflowContext ctxt = refresh(newCtxt,retrieveRequestedSettings( wf.getRequiredSettings()), getCurrentApiToken(newCtxt.getRequest().getAuthenticatedUser()));
WorkflowStepResult res = pendingStep.resume(ctxt, pending.getLocalData(), body);
if (res instanceof Failure) {
logger.warning(((Failure) res).getReason());
userNotificationService.sendNotification(ctxt.getRequest().getAuthenticatedUser(), Timestamp.from(Instant.now()), UserNotification.Type.WORKFLOW_FAILURE, ctxt.getDataset().getLatestVersion().getId(), ((Failure) res).getMessage());
//UserNotification isn't meant to be a long-term record and doesn't store the comment, so we'll also keep it as a workflow comment
WorkflowComment wfc = new WorkflowComment(ctxt.getDataset().getLatestVersion(), WorkflowComment.Type.WORKFLOW_FAILURE, ((Failure) res).getMessage(), ctxt.getRequest().getAuthenticatedUser());
datasets.addWorkflowComment(wfc);
rollback(wf, ctxt, (Failure) res, pending.getPendingStepIdx() - 1);
} else if (res instanceof Pending) {
pauseAndAwait(wf, ctxt, (Pending) res, pending.getPendingStepIdx());
} else {
if (res instanceof Success) {
logger.info(((Success) res).getReason());
userNotificationService.sendNotification(ctxt.getRequest().getAuthenticatedUser(), Timestamp.from(Instant.now()), UserNotification.Type.WORKFLOW_SUCCESS, ctxt.getDataset().getLatestVersion().getId(), ((Success) res).getMessage());
//UserNotification isn't meant to be a long-term record and doesn't store the comment, so we'll also keep it as a workflow comment
WorkflowComment wfc = new WorkflowComment(ctxt.getDataset().getLatestVersion(), WorkflowComment.Type.WORKFLOW_SUCCESS, ((Success) res).getMessage(), ctxt.getRequest().getAuthenticatedUser());
datasets.addWorkflowComment(wfc);
}
executeSteps(wf, ctxt, pending.getPendingStepIdx() + 1);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import edu.harvard.iq.dataverse.workflow.step.Success;
import edu.harvard.iq.dataverse.workflow.step.WorkflowStep;
import edu.harvard.iq.dataverse.workflow.step.WorkflowStepResult;
import edu.harvard.iq.dataverse.workflows.WorkflowUtil;

import static edu.harvard.iq.dataverse.workflow.step.WorkflowStepResult.OK;

import java.io.StringReader;
Expand Down Expand Up @@ -77,29 +79,7 @@ public WorkflowStepResult run(WorkflowContext context) {

@Override
public WorkflowStepResult resume(WorkflowContext context, Map<String, String> internalData, String externalData) {
try (StringReader reader = new StringReader(externalData)) {
JsonObject response = Json.createReader(reader).readObject();
String status = response.getString("Status");
String reason = null;
String message = null;
if (response.containsKey("Reason")) {
reason = response.getString("Reason");
}
if (response.containsKey("Message")) {
message = response.getString("Message");
}
switch (status) {
case "Success":
logger.log(Level.FINE, "AuthExt Worfklow Step Succeeded: " + reason);
return new Success(reason, message);
case "Failure":
logger.log(Level.WARNING, "Remote system indicates workflow failed: {0}", reason);
return new Failure(reason, message);
}
} catch (Exception e) {
logger.log(Level.WARNING, "Remote system returned a bad reposonse: {0}", externalData);
}
return new Failure("Workflow failure: Response from remote server could not be parsed:" + externalData, null);
return WorkflowUtil.parseResponse(externalData);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ public WorkflowStep getStep(String stepType, Map<String, String> stepParameters)
return new LoggingWorkflowStep(stepParameters);
case "pause":
return new PauseStep(stepParameters);
case "pause/message":
return new PauseWithMessageStep(stepParameters);
case "http/sr":
return new HttpSendReceiveClientStep(stepParameters);
case "http/authExt":
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package edu.harvard.iq.dataverse.workflow.internalspi;

import edu.harvard.iq.dataverse.workflow.WorkflowContext;
import edu.harvard.iq.dataverse.workflow.step.Failure;
import edu.harvard.iq.dataverse.workflow.step.Pending;
import edu.harvard.iq.dataverse.workflow.step.WorkflowStep;
import edu.harvard.iq.dataverse.workflow.step.WorkflowStepResult;
import edu.harvard.iq.dataverse.workflows.WorkflowUtil;

import java.util.HashMap;
import java.util.Map;
import java.util.logging.Logger;


/**
* A sample step that pauses the workflow.
*
* @author michael
*/
public class PauseWithMessageStep implements WorkflowStep {

/** Constant used by testing to simulate a failed step. */
public static final String FAILURE_RESPONSE="fail";

private final Map<String,String> params = new HashMap<>();

public PauseWithMessageStep( Map<String,String> paramSet ) {
params.putAll(paramSet);
}

@Override
public WorkflowStepResult run(WorkflowContext context) {
final Pending result = new Pending();
result.getData().putAll(params);
return result;
}

@Override
public WorkflowStepResult resume(WorkflowContext context, Map<String, String> internalData, String externalData) {
return WorkflowUtil.parseResponse(externalData);
}

@Override
public void rollback(WorkflowContext context, Failure reason) {
// nothing to roll back
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ public class Failure implements WorkflowStepResult {
private final String message;

public Failure( String reason ) {
this(reason, reason);
this(reason, null);
}

/**
Expand Down
Loading

0 comments on commit dd36c7e

Please sign in to comment.