From 116f78e2ea6c550eab0426741774da133124b375 Mon Sep 17 00:00:00 2001 From: Owais Kazi Date: Sun, 3 Dec 2023 16:41:33 -0800 Subject: [PATCH] Fixed timeout and exception issues Signed-off-by: Owais Kazi --- .../flowframework/model/WorkflowNode.java | 2 +- .../workflow/AbstractRetryableWorkflowStep.java | 2 ++ .../flowframework/workflow/DeployModelStep.java | 3 +++ .../flowframework/workflow/ModelGroupStep.java | 10 +++++++--- .../workflow/DeployModelStepTests.java | 14 +++++++++++--- .../workflow/WorkflowProcessSorterTests.java | 2 +- 6 files changed, 25 insertions(+), 8 deletions(-) diff --git a/src/main/java/org/opensearch/flowframework/model/WorkflowNode.java b/src/main/java/org/opensearch/flowframework/model/WorkflowNode.java index b942ccb16..42d59e07f 100644 --- a/src/main/java/org/opensearch/flowframework/model/WorkflowNode.java +++ b/src/main/java/org/opensearch/flowframework/model/WorkflowNode.java @@ -47,7 +47,7 @@ public class WorkflowNode implements ToXContentObject { /** The field defining the timeout value for this node */ public static final String NODE_TIMEOUT_FIELD = "node_timeout"; /** The default timeout value if the template doesn't override it */ - public static final String NODE_TIMEOUT_DEFAULT_VALUE = "10s"; + public static final String NODE_TIMEOUT_DEFAULT_VALUE = "15s"; private final String id; // unique id private final String type; // maps to a WorkflowStep diff --git a/src/main/java/org/opensearch/flowframework/workflow/AbstractRetryableWorkflowStep.java b/src/main/java/org/opensearch/flowframework/workflow/AbstractRetryableWorkflowStep.java index a3b66ffd0..b981a272e 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/AbstractRetryableWorkflowStep.java +++ b/src/main/java/org/opensearch/flowframework/workflow/AbstractRetryableWorkflowStep.java @@ -43,6 +43,8 @@ public abstract class AbstractRetryableWorkflowStep implements WorkflowStep { * Instantiates a new Retryable workflow step * @param settings Environment settings * @param clusterService the cluster service + * @param mlClient machine learning client + * @param flowFrameworkIndicesHandler FlowFrameworkIndicesHandler class to update system indices */ public AbstractRetryableWorkflowStep( Settings settings, diff --git a/src/main/java/org/opensearch/flowframework/workflow/DeployModelStep.java b/src/main/java/org/opensearch/flowframework/workflow/DeployModelStep.java index e4646fa5f..b9307b046 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/DeployModelStep.java +++ b/src/main/java/org/opensearch/flowframework/workflow/DeployModelStep.java @@ -39,7 +39,10 @@ public class DeployModelStep extends AbstractRetryableWorkflowStep { /** * Instantiate this class + * @param settings The OpenSearch settings + * @param clusterService The cluster service * @param mlClient client to instantiate MLClient + * @param flowFrameworkIndicesHandler FlowFrameworkIndicesHandler class to update system indices */ public DeployModelStep( Settings settings, diff --git a/src/main/java/org/opensearch/flowframework/workflow/ModelGroupStep.java b/src/main/java/org/opensearch/flowframework/workflow/ModelGroupStep.java index e2aea19df..fbf907776 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/ModelGroupStep.java +++ b/src/main/java/org/opensearch/flowframework/workflow/ModelGroupStep.java @@ -12,6 +12,7 @@ import org.apache.logging.log4j.Logger; import org.opensearch.ExceptionsHelper; import org.opensearch.core.action.ActionListener; +import org.opensearch.core.common.util.CollectionUtils; import org.opensearch.flowframework.common.WorkflowResources; import org.opensearch.flowframework.exception.FlowFrameworkException; import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler; @@ -72,7 +73,7 @@ public CompletableFuture execute( @Override public void onResponse(MLRegisterModelGroupResponse mlRegisterModelGroupResponse) { try { - logger.info("Remote Model registration successful"); + logger.info("Model group registration successful"); String resourceName = WorkflowResources.getResourceByWorkflowStep(getName()); flowFrameworkIndicesHandler.updateResourceInStateIndex( currentNodeInputs.getWorkflowId(), @@ -134,7 +135,7 @@ public void onFailure(Exception e) { if (description != null) { builder.description(description); } - if (!backendRoles.isEmpty()) { + if (!CollectionUtils.isEmpty(backendRoles)) { builder.backendRoles(backendRoles); } if (modelAccessMode != null) { @@ -160,6 +161,9 @@ public String getName() { @SuppressWarnings("unchecked") private List getBackendRoles(Map content) { - return (List) content.get(BACKEND_ROLES_FIELD); + if (content.containsKey(BACKEND_ROLES_FIELD)) { + return (List) content.get(BACKEND_ROLES_FIELD); + } + return null; } } diff --git a/src/test/java/org/opensearch/flowframework/workflow/DeployModelStepTests.java b/src/test/java/org/opensearch/flowframework/workflow/DeployModelStepTests.java index d52072523..fa27142f1 100644 --- a/src/test/java/org/opensearch/flowframework/workflow/DeployModelStepTests.java +++ b/src/test/java/org/opensearch/flowframework/workflow/DeployModelStepTests.java @@ -41,10 +41,18 @@ import org.mockito.MockitoAnnotations; import static org.opensearch.action.DocWriteResponse.Result.UPDATED; -import static org.opensearch.flowframework.common.CommonValue.*; +import static org.opensearch.flowframework.common.CommonValue.MODEL_ID; +import static org.opensearch.flowframework.common.CommonValue.REGISTER_MODEL_STATUS; +import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_STATE_INDEX; import static org.opensearch.flowframework.common.FlowFrameworkSettings.MAX_GET_TASK_REQUEST_RETRY; -import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.*; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.anyString; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; @ThreadLeakScope(ThreadLeakScope.Scope.NONE) public class DeployModelStepTests extends OpenSearchTestCase { diff --git a/src/test/java/org/opensearch/flowframework/workflow/WorkflowProcessSorterTests.java b/src/test/java/org/opensearch/flowframework/workflow/WorkflowProcessSorterTests.java index e9e792add..8103f4fbf 100644 --- a/src/test/java/org/opensearch/flowframework/workflow/WorkflowProcessSorterTests.java +++ b/src/test/java/org/opensearch/flowframework/workflow/WorkflowProcessSorterTests.java @@ -118,7 +118,7 @@ public void testNodeDetails() throws IOException { ProcessNode node = workflow.get(0); assertEquals("default_timeout", node.id()); assertEquals(CreateIngestPipelineStep.class, node.workflowStep().getClass()); - assertEquals(10, node.nodeTimeout().seconds()); + assertEquals(15, node.nodeTimeout().seconds()); node = workflow.get(1); assertEquals("custom_timeout", node.id()); assertEquals(CreateIndexStep.class, node.workflowStep().getClass());