Skip to content

Commit

Permalink
Add some delays to let deletions propagate, reset workflow state
Browse files Browse the repository at this point in the history
Signed-off-by: Daniel Widdis <widdis@gmail.com>
  • Loading branch information
dbwiddis committed Dec 12, 2023
1 parent 7add348 commit 3c76dc9
Showing 1 changed file with 33 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.opensearch.client.Client;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.commons.authuser.User;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.flowframework.exception.FlowFrameworkException;
Expand Down Expand Up @@ -46,6 +47,7 @@
import static org.opensearch.flowframework.common.CommonValue.PROVISION_WORKFLOW;
import static org.opensearch.flowframework.common.WorkflowResources.getDeprovisionStepByWorkflowStep;
import static org.opensearch.flowframework.common.WorkflowResources.getResourceByWorkflowStep;
import static org.opensearch.flowframework.util.ParseUtils.getUserContext;

/**
* Transport Action to deprovision a workflow from a stored use case template
Expand Down Expand Up @@ -97,6 +99,7 @@ public DeprovisionWorkflowTransportAction(
@Override
protected void doExecute(Task task, WorkflowRequest request, ActionListener<WorkflowResponse> listener) {
// Retrieve use case template from global context
User user = getUserContext(client);
String workflowId = request.getWorkflowId();
GetRequest getRequest = new GetRequest(GLOBAL_CONTEXT_INDEX, workflowId);

Expand Down Expand Up @@ -127,7 +130,7 @@ protected void doExecute(Task task, WorkflowRequest request, ActionListener<Work
workflowProcessSorter.validateGraph(provisionProcessSequence);

// We have a valid template and sorted nodes, get the created resources
getResourcesAndExecute(request.getWorkflowId(), provisionProcessSequence, listener);
getResourcesAndExecute(user, request.getWorkflowId(), provisionProcessSequence, listener);
}, exception -> {
if (exception instanceof FlowFrameworkException) {
logger.error("Workflow validation failed for workflow : " + workflowId);
Expand All @@ -144,6 +147,7 @@ protected void doExecute(Task task, WorkflowRequest request, ActionListener<Work
}

private void getResourcesAndExecute(
User user,
String workflowId,
List<ProcessNode> provisionProcessSequence,
ActionListener<WorkflowResponse> listener
Expand All @@ -157,14 +161,15 @@ private void getResourcesAndExecute(
.collect(Collectors.toMap(ResourceCreated::workflowStepId, Function.identity()));

// Now finally do the deprovision
executeDeprovisionSequence(workflowId, resourceMap, provisionProcessSequence, listener);
executeDeprovisionSequence(user, workflowId, resourceMap, provisionProcessSequence, listener);
}, exception -> {
logger.error("Failed to get workflow state for workflow " + workflowId);
listener.onFailure(new FlowFrameworkException(exception.getMessage(), ExceptionsHelper.status(exception)));
}));
}

private void executeDeprovisionSequence(
User user,
String workflowId,
Map<String, ResourceCreated> resourceMap,
List<ProcessNode> provisionProcessSequence,
Expand Down Expand Up @@ -217,6 +222,8 @@ private void executeDeprovisionSequence(
logger.info("Successful {} for {}", deprovisionNode.id(), resourceNameAndId);
// Remove from list so we don't try again
iter.remove();
// Pause briefly before next step
Thread.sleep(100);
} catch (Throwable t) {
logger.info(
"Failed {} for {}: {}",
Expand All @@ -225,7 +232,6 @@ private void executeDeprovisionSequence(
t.getCause() == null ? t.getMessage() : t.getCause().getMessage()
);
}
if (deprovisionFuture.isCompletedExceptionally()) {} else {}
}
if (deprovisionProcessSequence.size() < resourceCount) {
// If we've deleted something, decrement and try again if not zero
Expand All @@ -241,15 +247,20 @@ private void executeDeprovisionSequence(
pn.nodeTimeout()
);
}).collect(Collectors.toList());
// Pause briefly before next loop
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
break;
}
} else {
// If nothing was deleted, exit loop
break;
}
}
if (deprovisionProcessSequence.isEmpty()) {
// Successful deprovision, return workflow ID
// TODO: Reset state for this workflow ID to NOT STARTED
listener.onResponse(new WorkflowResponse(workflowId));
// Successful deprovision
resetWorkflowState(user, workflowId, listener);
} else {
// Failed deprovision, give user list of remaining resources
listener.onFailure(
Expand All @@ -258,6 +269,8 @@ private void executeDeprovisionSequence(
"Failed to deprovision some resources: ["
+ deprovisionProcessSequence.stream()
.map(pn -> getResourceFromProcessNode(pn, resourceMap))
.filter(Objects::nonNull)
.distinct()
.collect(Collectors.joining(", "))
+ "].",
RestStatus.ACCEPTED
Expand All @@ -266,6 +279,20 @@ private void executeDeprovisionSequence(
}
}

private void resetWorkflowState(User user, String workflowId, ActionListener<WorkflowResponse> listener) {
flowFrameworkIndicesHandler.putInitialStateToWorkflowState(workflowId, user, ActionListener.wrap(stateResponse -> {
logger.info("create state workflow doc");
listener.onResponse(new WorkflowResponse(workflowId));
}, exception -> {
logger.error("Failed to save workflow state : {}", exception.getMessage());
if (exception instanceof FlowFrameworkException) {
listener.onFailure(exception);
} else {
listener.onFailure(new FlowFrameworkException(exception.getMessage(), RestStatus.BAD_REQUEST));
}
}));
}

private String getResourceFromProcessNode(ProcessNode deprovisionNode, Map<String, ResourceCreated> resourceMap) {
String deprovisionId = deprovisionNode.id();
int pos = deprovisionId.indexOf(DEPROVISION_SUFFIX);
Expand Down

0 comments on commit 3c76dc9

Please sign in to comment.