Skip to content

Commit

Permalink
Update resources_created with deploy model: (opensearch-project#275)
Browse files Browse the repository at this point in the history
add deploy model resource

Signed-off-by: Amit Galitzky <amgalitz@amazon.com>
  • Loading branch information
amitgalitz authored and dbwiddis committed Dec 15, 2023
1 parent 1b37073 commit 9c23092
Showing 1 changed file with 27 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.opensearch.flowframework.exception.FlowFrameworkException;
import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler;
import org.opensearch.ml.client.MachineLearningNodeClient;
import org.opensearch.ml.common.MLTask;
import org.opensearch.ml.common.MLTaskState;

import java.util.Map;
Expand Down Expand Up @@ -59,24 +58,6 @@ public AbstractRetryableWorkflowStep(
this.flowFrameworkIndicesHandler = flowFrameworkIndicesHandler;
}

/**
* Completes the future for either deploy or register local model step
* @param resourceName resource name for the given step
* @param nodeId node ID of the given step
* @param workflowId workflow ID of the given workflow
* @param response Response from ml commons get Task API
* @param future CompletableFuture of the given step
*/
public void completeFuture(String resourceName, String nodeId, String workflowId, MLTask response, CompletableFuture future) {
future.complete(
new WorkflowData(
Map.ofEntries(Map.entry(resourceName, response.getModelId()), Map.entry(REGISTER_MODEL_STATUS, response.getState().name())),
workflowId,
nodeId
)
);
}

/**
* Retryable get ml task
* @param workflowId the workflow id
Expand Down Expand Up @@ -110,25 +91,36 @@ void retryableGetMlTask(
try {
logger.info(workflowStep + " successful for {} and modelId {}", workflowId, response.getModelId());
String resourceName = WorkflowResources.getResourceByWorkflowStep(getName());
String id;
if (getName().equals(WorkflowResources.DEPLOY_MODEL.getWorkflowStep())) {
completeFuture(resourceName, nodeId, workflowId, response, future);
id = response.getModelId();
} else {
flowFrameworkIndicesHandler.updateResourceInStateIndex(
workflowId,
nodeId,
getName(),
response.getTaskId(),
ActionListener.wrap(updateResponse -> {
logger.info("successfully updated resources created in state index: {}", updateResponse.getIndex());
completeFuture(resourceName, nodeId, workflowId, response, future);
}, exception -> {
logger.error("Failed to update new created resource", exception);
future.completeExceptionally(
new FlowFrameworkException(exception.getMessage(), ExceptionsHelper.status(exception))
);
})
);
id = response.getTaskId();
}
flowFrameworkIndicesHandler.updateResourceInStateIndex(
workflowId,
nodeId,
getName(),
id,
ActionListener.wrap(updateResponse -> {
logger.info("successfully updated resources created in state index: {}", updateResponse.getIndex());
future.complete(
new WorkflowData(
Map.ofEntries(
Map.entry(resourceName, response.getModelId()),
Map.entry(REGISTER_MODEL_STATUS, response.getState().name())
),
workflowId,
nodeId
)
);
}, exception -> {
logger.error("Failed to update new created resource", exception);
future.completeExceptionally(
new FlowFrameworkException(exception.getMessage(), ExceptionsHelper.status(exception))
);
})
);
} catch (Exception e) {
logger.error("Failed to parse and update new created resource", e);
future.completeExceptionally(new FlowFrameworkException(e.getMessage(), ExceptionsHelper.status(e)));
Expand Down

0 comments on commit 9c23092

Please sign in to comment.