Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update resources_created with deploy model: #275

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.opensearch.flowframework.exception.FlowFrameworkException;
import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler;
import org.opensearch.ml.client.MachineLearningNodeClient;
import org.opensearch.ml.common.MLTask;
import org.opensearch.ml.common.MLTaskState;

import java.util.Map;
Expand Down Expand Up @@ -59,24 +58,6 @@ public AbstractRetryableWorkflowStep(
this.flowFrameworkIndicesHandler = flowFrameworkIndicesHandler;
}

/**
* Completes the future for either deploy or register local model step
* @param resourceName resource name for the given step
* @param nodeId node ID of the given step
* @param workflowId workflow ID of the given workflow
* @param response Response from ml commons get Task API
* @param future CompletableFuture of the given step
*/
public void completeFuture(String resourceName, String nodeId, String workflowId, MLTask response, CompletableFuture future) {
future.complete(
new WorkflowData(
Map.ofEntries(Map.entry(resourceName, response.getModelId()), Map.entry(REGISTER_MODEL_STATUS, response.getState().name())),
workflowId,
nodeId
)
);
}

/**
* Retryable get ml task
* @param workflowId the workflow id
Expand Down Expand Up @@ -110,25 +91,36 @@ void retryableGetMlTask(
try {
logger.info(workflowStep + " successful for {} and modelId {}", workflowId, response.getModelId());
String resourceName = WorkflowResources.getResourceByWorkflowStep(getName());
String id;
amitgalitz marked this conversation as resolved.
Show resolved Hide resolved
if (getName().equals(WorkflowResources.DEPLOY_MODEL.getWorkflowStep())) {
completeFuture(resourceName, nodeId, workflowId, response, future);
id = response.getModelId();
amitgalitz marked this conversation as resolved.
Show resolved Hide resolved
} else {
flowFrameworkIndicesHandler.updateResourceInStateIndex(
workflowId,
nodeId,
getName(),
response.getTaskId(),
ActionListener.wrap(updateResponse -> {
logger.info("successfully updated resources created in state index: {}", updateResponse.getIndex());
completeFuture(resourceName, nodeId, workflowId, response, future);
}, exception -> {
logger.error("Failed to update new created resource", exception);
future.completeExceptionally(
new FlowFrameworkException(exception.getMessage(), ExceptionsHelper.status(exception))
);
})
);
id = response.getTaskId();
}
flowFrameworkIndicesHandler.updateResourceInStateIndex(
workflowId,
nodeId,
getName(),
id,
ActionListener.wrap(updateResponse -> {
logger.info("successfully updated resources created in state index: {}", updateResponse.getIndex());
future.complete(
new WorkflowData(
Map.ofEntries(
Map.entry(resourceName, response.getModelId()),
Map.entry(REGISTER_MODEL_STATUS, response.getState().name())
amitgalitz marked this conversation as resolved.
Show resolved Hide resolved
),
workflowId,
nodeId
)
);
}, exception -> {
logger.error("Failed to update new created resource", exception);
future.completeExceptionally(
new FlowFrameworkException(exception.getMessage(), ExceptionsHelper.status(exception))
);
})
);
} catch (Exception e) {
logger.error("Failed to parse and update new created resource", e);
future.completeExceptionally(new FlowFrameworkException(e.getMessage(), ExceptionsHelper.status(e)));
Expand Down
Loading