From 9c230923c850cb6eb394b22b387f409bf6315dcd Mon Sep 17 00:00:00 2001 From: Amit Galitzky Date: Mon, 11 Dec 2023 17:46:56 -0800 Subject: [PATCH] Update resources_created with deploy model: (#275) add deploy model resource Signed-off-by: Amit Galitzky --- .../AbstractRetryableWorkflowStep.java | 62 ++++++++----------- 1 file changed, 27 insertions(+), 35 deletions(-) diff --git a/src/main/java/org/opensearch/flowframework/workflow/AbstractRetryableWorkflowStep.java b/src/main/java/org/opensearch/flowframework/workflow/AbstractRetryableWorkflowStep.java index f807c752a..121f477bb 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/AbstractRetryableWorkflowStep.java +++ b/src/main/java/org/opensearch/flowframework/workflow/AbstractRetryableWorkflowStep.java @@ -20,7 +20,6 @@ import org.opensearch.flowframework.exception.FlowFrameworkException; import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler; import org.opensearch.ml.client.MachineLearningNodeClient; -import org.opensearch.ml.common.MLTask; import org.opensearch.ml.common.MLTaskState; import java.util.Map; @@ -59,24 +58,6 @@ public AbstractRetryableWorkflowStep( this.flowFrameworkIndicesHandler = flowFrameworkIndicesHandler; } - /** - * Completes the future for either deploy or register local model step - * @param resourceName resource name for the given step - * @param nodeId node ID of the given step - * @param workflowId workflow ID of the given workflow - * @param response Response from ml commons get Task API - * @param future CompletableFuture of the given step - */ - public void completeFuture(String resourceName, String nodeId, String workflowId, MLTask response, CompletableFuture future) { - future.complete( - new WorkflowData( - Map.ofEntries(Map.entry(resourceName, response.getModelId()), Map.entry(REGISTER_MODEL_STATUS, response.getState().name())), - workflowId, - nodeId - ) - ); - } - /** * Retryable get ml task * @param workflowId the workflow id @@ -110,25 +91,36 @@ void retryableGetMlTask( try { logger.info(workflowStep + " successful for {} and modelId {}", workflowId, response.getModelId()); String resourceName = WorkflowResources.getResourceByWorkflowStep(getName()); + String id; if (getName().equals(WorkflowResources.DEPLOY_MODEL.getWorkflowStep())) { - completeFuture(resourceName, nodeId, workflowId, response, future); + id = response.getModelId(); } else { - flowFrameworkIndicesHandler.updateResourceInStateIndex( - workflowId, - nodeId, - getName(), - response.getTaskId(), - ActionListener.wrap(updateResponse -> { - logger.info("successfully updated resources created in state index: {}", updateResponse.getIndex()); - completeFuture(resourceName, nodeId, workflowId, response, future); - }, exception -> { - logger.error("Failed to update new created resource", exception); - future.completeExceptionally( - new FlowFrameworkException(exception.getMessage(), ExceptionsHelper.status(exception)) - ); - }) - ); + id = response.getTaskId(); } + flowFrameworkIndicesHandler.updateResourceInStateIndex( + workflowId, + nodeId, + getName(), + id, + ActionListener.wrap(updateResponse -> { + logger.info("successfully updated resources created in state index: {}", updateResponse.getIndex()); + future.complete( + new WorkflowData( + Map.ofEntries( + Map.entry(resourceName, response.getModelId()), + Map.entry(REGISTER_MODEL_STATUS, response.getState().name()) + ), + workflowId, + nodeId + ) + ); + }, exception -> { + logger.error("Failed to update new created resource", exception); + future.completeExceptionally( + new FlowFrameworkException(exception.getMessage(), ExceptionsHelper.status(exception)) + ); + }) + ); } catch (Exception e) { logger.error("Failed to parse and update new created resource", e); future.completeExceptionally(new FlowFrameworkException(e.getMessage(), ExceptionsHelper.status(e)));