From 1d2c8617d015433cba2aa0a293c3da031d3da2e4 Mon Sep 17 00:00:00 2001 From: Joshua Palis Date: Wed, 13 Dec 2023 12:01:54 -0800 Subject: [PATCH] [Feature/agent_framework] Adds a Search Workflow State API (#284) * Modifying workflow state index mapping and resources created Signed-off-by: Joshua Palis * Adding Search workflow state API Signed-off-by: Joshua Palis * Adding rest unit tests Signed-off-by: Joshua Palis * Transport unit tests Signed-off-by: Joshua Palis * Moving resourceType determination outside of the resources created class Signed-off-by: Joshua Palis --------- Signed-off-by: Joshua Palis --- .../flowframework/FlowFrameworkPlugin.java | 9 +- .../flowframework/common/CommonValue.java | 4 + .../indices/FlowFrameworkIndicesHandler.java | 8 +- .../flowframework/model/ResourceCreated.java | 54 +++++++----- .../rest/RestSearchWorkflowStateAction.java | 47 ++++++++++ .../transport/GetWorkflowStateResponse.java | 22 ++++- .../transport/SearchWorkflowStateAction.java | 29 +++++++ .../SearchWorkflowStateTransportAction.java | 50 +++++++++++ .../resources/mappings/workflow-state.json | 16 +++- .../FlowFrameworkPluginTests.java | 4 +- .../model/ResourceCreatedTests.java | 37 +++++--- .../RestSearchWorkflowStateActionTests.java | 85 +++++++++++++++++++ ...archWorkflowStateTransportActionTests.java | 79 +++++++++++++++++ 13 files changed, 403 insertions(+), 41 deletions(-) create mode 100644 src/main/java/org/opensearch/flowframework/rest/RestSearchWorkflowStateAction.java create mode 100644 src/main/java/org/opensearch/flowframework/transport/SearchWorkflowStateAction.java create mode 100644 src/main/java/org/opensearch/flowframework/transport/SearchWorkflowStateTransportAction.java create mode 100644 src/test/java/org/opensearch/flowframework/rest/RestSearchWorkflowStateActionTests.java create mode 100644 src/test/java/org/opensearch/flowframework/transport/SearchWorkflowStateTransportActionTests.java diff --git a/src/main/java/org/opensearch/flowframework/FlowFrameworkPlugin.java b/src/main/java/org/opensearch/flowframework/FlowFrameworkPlugin.java index ec9eb40da..40ddee2fa 100644 --- a/src/main/java/org/opensearch/flowframework/FlowFrameworkPlugin.java +++ b/src/main/java/org/opensearch/flowframework/FlowFrameworkPlugin.java @@ -32,6 +32,7 @@ import org.opensearch.flowframework.rest.RestGetWorkflowStateAction; import org.opensearch.flowframework.rest.RestProvisionWorkflowAction; import org.opensearch.flowframework.rest.RestSearchWorkflowAction; +import org.opensearch.flowframework.rest.RestSearchWorkflowStateAction; import org.opensearch.flowframework.transport.CreateWorkflowAction; import org.opensearch.flowframework.transport.CreateWorkflowTransportAction; import org.opensearch.flowframework.transport.GetWorkflowAction; @@ -41,6 +42,8 @@ import org.opensearch.flowframework.transport.ProvisionWorkflowAction; import org.opensearch.flowframework.transport.ProvisionWorkflowTransportAction; import org.opensearch.flowframework.transport.SearchWorkflowAction; +import org.opensearch.flowframework.transport.SearchWorkflowStateAction; +import org.opensearch.flowframework.transport.SearchWorkflowStateTransportAction; import org.opensearch.flowframework.transport.SearchWorkflowTransportAction; import org.opensearch.flowframework.util.EncryptorUtils; import org.opensearch.flowframework.workflow.WorkflowProcessSorter; @@ -130,7 +133,8 @@ public List getRestHandlers( new RestProvisionWorkflowAction(flowFrameworkFeatureEnabledSetting), new RestSearchWorkflowAction(flowFrameworkFeatureEnabledSetting), new RestGetWorkflowStateAction(flowFrameworkFeatureEnabledSetting), - new RestGetWorkflowAction(flowFrameworkFeatureEnabledSetting) + new RestGetWorkflowAction(flowFrameworkFeatureEnabledSetting), + new RestSearchWorkflowStateAction(flowFrameworkFeatureEnabledSetting) ); } @@ -141,7 +145,8 @@ public List getRestHandlers( new ActionHandler<>(ProvisionWorkflowAction.INSTANCE, ProvisionWorkflowTransportAction.class), new ActionHandler<>(SearchWorkflowAction.INSTANCE, SearchWorkflowTransportAction.class), new ActionHandler<>(GetWorkflowStateAction.INSTANCE, GetWorkflowStateTransportAction.class), - new ActionHandler<>(GetWorkflowAction.INSTANCE, GetWorkflowTransportAction.class) + new ActionHandler<>(GetWorkflowAction.INSTANCE, GetWorkflowTransportAction.class), + new ActionHandler<>(SearchWorkflowStateAction.INSTANCE, SearchWorkflowStateTransportAction.class) ); } diff --git a/src/main/java/org/opensearch/flowframework/common/CommonValue.java b/src/main/java/org/opensearch/flowframework/common/CommonValue.java index 0863565c0..8b8f9deae 100644 --- a/src/main/java/org/opensearch/flowframework/common/CommonValue.java +++ b/src/main/java/org/opensearch/flowframework/common/CommonValue.java @@ -168,6 +168,10 @@ private CommonValue() {} public static final String WORKFLOW_STEP_NAME = "workflow_step_name"; /** The field name for the step ID where a resource is created */ public static final String WORKFLOW_STEP_ID = "workflow_step_id"; + /** The field name for the resource type */ + public static final String RESOURCE_TYPE = "resource_type"; + /** The field name for the resource id */ + public static final String RESOURCE_ID = "resource_id"; /** The tools' field for an agent */ public static final String TOOLS_FIELD = "tools"; /** The memory field for an agent */ diff --git a/src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java b/src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java index 63df7824c..2449297b6 100644 --- a/src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java +++ b/src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java @@ -34,6 +34,7 @@ import org.opensearch.core.xcontent.ToXContent; import org.opensearch.core.xcontent.ToXContentObject; import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.flowframework.common.WorkflowResources; import org.opensearch.flowframework.exception.FlowFrameworkException; import org.opensearch.flowframework.model.ProvisioningProgress; import org.opensearch.flowframework.model.ResourceCreated; @@ -500,7 +501,12 @@ public void updateResourceInStateIndex( String resourceId, ActionListener listener ) throws IOException { - ResourceCreated newResource = new ResourceCreated(workflowStepName, nodeId, resourceId); + ResourceCreated newResource = new ResourceCreated( + workflowStepName, + nodeId, + WorkflowResources.getResourceByWorkflowStep(workflowStepName), + resourceId + ); XContentBuilder builder = XContentBuilder.builder(XContentType.JSON.xContent()); newResource.toXContent(builder, ToXContentObject.EMPTY_PARAMS); diff --git a/src/main/java/org/opensearch/flowframework/model/ResourceCreated.java b/src/main/java/org/opensearch/flowframework/model/ResourceCreated.java index d039e2f8c..b12f4d044 100644 --- a/src/main/java/org/opensearch/flowframework/model/ResourceCreated.java +++ b/src/main/java/org/opensearch/flowframework/model/ResourceCreated.java @@ -17,12 +17,13 @@ import org.opensearch.core.xcontent.ToXContentObject; import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.core.xcontent.XContentParser; -import org.opensearch.flowframework.common.WorkflowResources; import org.opensearch.flowframework.exception.FlowFrameworkException; import java.io.IOException; import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken; +import static org.opensearch.flowframework.common.CommonValue.RESOURCE_ID; +import static org.opensearch.flowframework.common.CommonValue.RESOURCE_TYPE; import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_STEP_ID; import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_STEP_NAME; @@ -36,17 +37,20 @@ public class ResourceCreated implements ToXContentObject, Writeable { private final String workflowStepName; private final String workflowStepId; + private final String resourceType; private final String resourceId; /** * Create this resources created object with given workflow step name, ID and resource ID. * @param workflowStepName The workflow step name associating to the step where it was created * @param workflowStepId The workflow step ID associating to the step where it was created + * @param resourceType The resource type * @param resourceId The resources ID for relating to the created resource */ - public ResourceCreated(String workflowStepName, String workflowStepId, String resourceId) { + public ResourceCreated(String workflowStepName, String workflowStepId, String resourceType, String resourceId) { this.workflowStepName = workflowStepName; this.workflowStepId = workflowStepId; + this.resourceType = resourceType; this.resourceId = resourceId; } @@ -58,6 +62,7 @@ public ResourceCreated(String workflowStepName, String workflowStepId, String re public ResourceCreated(StreamInput input) throws IOException { this.workflowStepName = input.readString(); this.workflowStepId = input.readString(); + this.resourceType = input.readString(); this.resourceId = input.readString(); } @@ -66,7 +71,8 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws XContentBuilder xContentBuilder = builder.startObject() .field(WORKFLOW_STEP_NAME, workflowStepName) .field(WORKFLOW_STEP_ID, workflowStepId) - .field(WorkflowResources.getResourceByWorkflowStep(workflowStepName), resourceId); + .field(RESOURCE_TYPE, resourceType) + .field(RESOURCE_ID, resourceId); return xContentBuilder.endObject(); } @@ -74,6 +80,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws public void writeTo(StreamOutput out) throws IOException { out.writeString(workflowStepName); out.writeString(workflowStepId); + out.writeString(resourceType); out.writeString(resourceId); } @@ -86,6 +93,15 @@ public String resourceId() { return resourceId; } + /** + * Gets the resource type. + * + * @return the resource type. + */ + public String resourceType() { + return resourceType; + } + /** * Gets the workflow step name associated to the created resource * @@ -114,6 +130,7 @@ public String workflowStepId() { public static ResourceCreated parse(XContentParser parser) throws IOException { String workflowStepName = null; String workflowStepId = null; + String resourceType = null; String resourceId = null; ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser); @@ -128,15 +145,14 @@ public static ResourceCreated parse(XContentParser parser) throws IOException { case WORKFLOW_STEP_ID: workflowStepId = parser.text(); break; + case RESOURCE_TYPE: + resourceType = parser.text(); + break; + case RESOURCE_ID: + resourceId = parser.text(); + break; default: - if (!isValidFieldName(fieldName)) { - throw new IOException("Unable to parse field [" + fieldName + "] in a resources_created object."); - } else { - if (fieldName.equals(WorkflowResources.getResourceByWorkflowStep(workflowStepName))) { - resourceId = parser.text(); - } - break; - } + throw new IOException("Unable to parse field [" + fieldName + "] in a resources_created object."); } } if (workflowStepName == null) { @@ -147,17 +163,15 @@ public static ResourceCreated parse(XContentParser parser) throws IOException { logger.error("Resource created object failed parsing: workflowStepId: {}", workflowStepId); throw new FlowFrameworkException("A ResourceCreated object requires workflowStepId", RestStatus.BAD_REQUEST); } + if (resourceType == null) { + logger.error("Resource created object failed parsing: resourceType: {}", resourceType); + throw new FlowFrameworkException("A ResourceCreated object requires resourceType", RestStatus.BAD_REQUEST); + } if (resourceId == null) { logger.error("Resource created object failed parsing: resourceId: {}", resourceId); throw new FlowFrameworkException("A ResourceCreated object requires resourceId", RestStatus.BAD_REQUEST); } - return new ResourceCreated(workflowStepName, workflowStepId, resourceId); - } - - private static boolean isValidFieldName(String fieldName) { - return (WORKFLOW_STEP_NAME.equals(fieldName) - || WORKFLOW_STEP_ID.equals(fieldName) - || WorkflowResources.getAllResourcesCreated().contains(fieldName)); + return new ResourceCreated(workflowStepName, workflowStepId, resourceType, resourceId); } @Override @@ -165,7 +179,9 @@ public String toString() { return "resources_Created [workflow_step_name= " + workflowStepName + ", workflow_step_id= " - + workflowStepName + + workflowStepId + + ", resource_type= " + + resourceType + ", resource_id= " + resourceId + "]"; diff --git a/src/main/java/org/opensearch/flowframework/rest/RestSearchWorkflowStateAction.java b/src/main/java/org/opensearch/flowframework/rest/RestSearchWorkflowStateAction.java new file mode 100644 index 000000000..dfbcc0eb2 --- /dev/null +++ b/src/main/java/org/opensearch/flowframework/rest/RestSearchWorkflowStateAction.java @@ -0,0 +1,47 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.flowframework.rest; + +import com.google.common.collect.ImmutableList; +import org.opensearch.flowframework.common.FlowFrameworkFeatureEnabledSetting; +import org.opensearch.flowframework.model.WorkflowState; +import org.opensearch.flowframework.transport.SearchWorkflowStateAction; + +import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_STATE_INDEX; +import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_URI; + +/** + * Rest Action to facilitate requests to search workflow states + */ +public class RestSearchWorkflowStateAction extends AbstractSearchWorkflowAction { + + private static final String SEARCH_WORKFLOW_STATE_ACTION = "search_workflow_state_action"; + private static final String SEARCH_WORKFLOW_STATE_PATH = WORKFLOW_URI + "/state/_search"; + + /** + * Instantiates a new RestSearchWorkflowStateAction + * + * @param flowFrameworkFeatureEnabledSetting Whether this API is enabled + */ + public RestSearchWorkflowStateAction(FlowFrameworkFeatureEnabledSetting flowFrameworkFeatureEnabledSetting) { + super( + ImmutableList.of(SEARCH_WORKFLOW_STATE_PATH), + WORKFLOW_STATE_INDEX, + WorkflowState.class, + SearchWorkflowStateAction.INSTANCE, + flowFrameworkFeatureEnabledSetting + ); + } + + @Override + public String getName() { + return SEARCH_WORKFLOW_STATE_ACTION; + } + +} diff --git a/src/main/java/org/opensearch/flowframework/transport/GetWorkflowStateResponse.java b/src/main/java/org/opensearch/flowframework/transport/GetWorkflowStateResponse.java index fe155237e..6f8b9e14b 100644 --- a/src/main/java/org/opensearch/flowframework/transport/GetWorkflowStateResponse.java +++ b/src/main/java/org/opensearch/flowframework/transport/GetWorkflowStateResponse.java @@ -23,9 +23,9 @@ public class GetWorkflowStateResponse extends ActionResponse implements ToXContentObject { /** The workflow state */ - public WorkflowState workflowState; + private final WorkflowState workflowState; /** Flag to indicate if the entire state should be returned */ - public boolean allStatus; + private final boolean allStatus; /** * Instantiates a new GetWorkflowStateResponse from an input stream @@ -44,6 +44,7 @@ public GetWorkflowStateResponse(StreamInput in) throws IOException { * @param allStatus whether to return all fields in state index */ public GetWorkflowStateResponse(WorkflowState workflowState, boolean allStatus) { + this.allStatus = allStatus; if (allStatus) { this.workflowState = workflowState; } else { @@ -58,10 +59,27 @@ public GetWorkflowStateResponse(WorkflowState workflowState, boolean allStatus) @Override public void writeTo(StreamOutput out) throws IOException { workflowState.writeTo(out); + out.writeBoolean(allStatus); } @Override public XContentBuilder toXContent(XContentBuilder xContentBuilder, Params params) throws IOException { return workflowState.toXContent(xContentBuilder, params); } + + /** + * Gets the workflow state. + * @return the workflow state + */ + public WorkflowState getWorkflowState() { + return workflowState; + } + + /** + * Gets whether to return the entire state. + * @return true if the entire state should be returned + */ + public boolean isAllStatus() { + return allStatus; + } } diff --git a/src/main/java/org/opensearch/flowframework/transport/SearchWorkflowStateAction.java b/src/main/java/org/opensearch/flowframework/transport/SearchWorkflowStateAction.java new file mode 100644 index 000000000..6b331ef02 --- /dev/null +++ b/src/main/java/org/opensearch/flowframework/transport/SearchWorkflowStateAction.java @@ -0,0 +1,29 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.flowframework.transport; + +import org.opensearch.action.ActionType; +import org.opensearch.action.search.SearchResponse; + +import static org.opensearch.flowframework.common.CommonValue.TRANSPORT_ACTION_NAME_PREFIX; + +/** + * External Action for public facing RestSearchWorkflowStateAction + */ +public class SearchWorkflowStateAction extends ActionType { + + /** The name of this action */ + public static final String NAME = TRANSPORT_ACTION_NAME_PREFIX + "workflow_state/search"; + /** An instance of this action */ + public static final SearchWorkflowStateAction INSTANCE = new SearchWorkflowStateAction(); + + private SearchWorkflowStateAction() { + super(NAME, SearchResponse::new); + } +} diff --git a/src/main/java/org/opensearch/flowframework/transport/SearchWorkflowStateTransportAction.java b/src/main/java/org/opensearch/flowframework/transport/SearchWorkflowStateTransportAction.java new file mode 100644 index 000000000..b10bdaeb6 --- /dev/null +++ b/src/main/java/org/opensearch/flowframework/transport/SearchWorkflowStateTransportAction.java @@ -0,0 +1,50 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.flowframework.transport; + +import org.opensearch.action.search.SearchRequest; +import org.opensearch.action.search.SearchResponse; +import org.opensearch.action.support.ActionFilters; +import org.opensearch.action.support.HandledTransportAction; +import org.opensearch.client.Client; +import org.opensearch.common.inject.Inject; +import org.opensearch.common.util.concurrent.ThreadContext; +import org.opensearch.core.action.ActionListener; +import org.opensearch.tasks.Task; +import org.opensearch.transport.TransportService; + +/** + * Transport Action to search workflow states + */ +public class SearchWorkflowStateTransportAction extends HandledTransportAction { + + private Client client; + + /** + * Intantiates a new SearchWorkflowStateTransportAction + * @param transportService the TransportService + * @param actionFilters action filters + * @param client The client used to make the request to OS + */ + @Inject + public SearchWorkflowStateTransportAction(TransportService transportService, ActionFilters actionFilters, Client client) { + super(SearchWorkflowStateAction.NAME, transportService, actionFilters, SearchRequest::new); + this.client = client; + } + + @Override + protected void doExecute(Task task, SearchRequest request, ActionListener actionListener) { + // TODO: AccessController should take care of letting the user with right permission to view the workflow + try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) { + client.search(request, ActionListener.runBefore(actionListener, () -> context.restore())); + } catch (Exception e) { + actionListener.onFailure(e); + } + } +} diff --git a/src/main/resources/mappings/workflow-state.json b/src/main/resources/mappings/workflow-state.json index 86fbeef6e..fedce568c 100644 --- a/src/main/resources/mappings/workflow-state.json +++ b/src/main/resources/mappings/workflow-state.json @@ -31,7 +31,21 @@ "type": "object" }, "resources_created": { - "type": "object" + "type": "nested", + "properties": { + "workflow_step_name": { + "type": "keyword" + }, + "workflow_step_id": { + "type": "keyword" + }, + "resource_type": { + "type": "keyword" + }, + "resource_id": { + "type": "keyword" + } + } }, "ui_metadata": { "type": "object", diff --git a/src/test/java/org/opensearch/flowframework/FlowFrameworkPluginTests.java b/src/test/java/org/opensearch/flowframework/FlowFrameworkPluginTests.java index 6370d2312..cbf988eee 100644 --- a/src/test/java/org/opensearch/flowframework/FlowFrameworkPluginTests.java +++ b/src/test/java/org/opensearch/flowframework/FlowFrameworkPluginTests.java @@ -82,8 +82,8 @@ public void testPlugin() throws IOException { 4, ffp.createComponents(client, clusterService, threadPool, null, null, null, environment, null, null, null, null).size() ); - assertEquals(5, ffp.getRestHandlers(settings, null, null, null, null, null, null).size()); - assertEquals(5, ffp.getActions().size()); + assertEquals(6, ffp.getRestHandlers(settings, null, null, null, null, null, null).size()); + assertEquals(6, ffp.getActions().size()); assertEquals(1, ffp.getExecutorBuilders(settings).size()); assertEquals(5, ffp.getSettings().size()); } diff --git a/src/test/java/org/opensearch/flowframework/model/ResourceCreatedTests.java b/src/test/java/org/opensearch/flowframework/model/ResourceCreatedTests.java index 216c18c9e..4f0bf5163 100644 --- a/src/test/java/org/opensearch/flowframework/model/ResourceCreatedTests.java +++ b/src/test/java/org/opensearch/flowframework/model/ResourceCreatedTests.java @@ -9,6 +9,7 @@ package org.opensearch.flowframework.model; import org.opensearch.flowframework.common.WorkflowResources; +import org.opensearch.flowframework.exception.FlowFrameworkException; import org.opensearch.test.OpenSearchTestCase; import java.io.IOException; @@ -22,30 +23,38 @@ public void setUp() throws Exception { public void testParseFeature() throws IOException { String workflowStepName = WorkflowResources.CREATE_CONNECTOR.getWorkflowStep(); - ResourceCreated ResourceCreated = new ResourceCreated(workflowStepName, "workflow_step_1", "L85p1IsBbfF"); - assertEquals(ResourceCreated.workflowStepName(), workflowStepName); - assertEquals(ResourceCreated.workflowStepId(), "workflow_step_1"); - assertEquals(ResourceCreated.resourceId(), "L85p1IsBbfF"); + String resourceType = WorkflowResources.getResourceByWorkflowStep(workflowStepName); + ResourceCreated resourceCreated = new ResourceCreated(workflowStepName, "workflow_step_1", resourceType, "L85p1IsBbfF"); + assertEquals(workflowStepName, resourceCreated.workflowStepName()); + assertEquals("workflow_step_1", resourceCreated.workflowStepId()); + assertEquals("connector_id", resourceCreated.resourceType()); + assertEquals("L85p1IsBbfF", resourceCreated.resourceId()); String expectedJson = - "{\"workflow_step_name\":\"create_connector\",\"workflow_step_id\":\"workflow_step_1\",\"connector_id\":\"L85p1IsBbfF\"}"; - String json = TemplateTestJsonUtil.parseToJson(ResourceCreated); + "{\"workflow_step_name\":\"create_connector\",\"workflow_step_id\":\"workflow_step_1\",\"resource_type\":\"connector_id\",\"resource_id\":\"L85p1IsBbfF\"}"; + String json = TemplateTestJsonUtil.parseToJson(resourceCreated); assertEquals(expectedJson, json); - ResourceCreated ResourceCreatedTwo = ResourceCreated.parse(TemplateTestJsonUtil.jsonToParser(json)); - assertEquals(workflowStepName, ResourceCreatedTwo.workflowStepName()); - assertEquals("workflow_step_1", ResourceCreatedTwo.workflowStepId()); - assertEquals("L85p1IsBbfF", ResourceCreatedTwo.resourceId()); + ResourceCreated resourceCreatedTwo = ResourceCreated.parse(TemplateTestJsonUtil.jsonToParser(json)); + assertEquals(workflowStepName, resourceCreatedTwo.workflowStepName()); + assertEquals("workflow_step_1", resourceCreatedTwo.workflowStepId()); + assertEquals("L85p1IsBbfF", resourceCreatedTwo.resourceId()); } public void testExceptions() throws IOException { String badJson = "{\"wrong\":\"A\",\"resource_id\":\"B\"}"; - IOException e = assertThrows(IOException.class, () -> ResourceCreated.parse(TemplateTestJsonUtil.jsonToParser(badJson))); - assertEquals("Unable to parse field [wrong] in a resources_created object.", e.getMessage()); + IOException badJsonException = assertThrows( + IOException.class, + () -> ResourceCreated.parse(TemplateTestJsonUtil.jsonToParser(badJson)) + ); + assertEquals("Unable to parse field [wrong] in a resources_created object.", badJsonException.getMessage()); String missingJson = "{\"resource_id\":\"B\"}"; - e = assertThrows(IOException.class, () -> ResourceCreated.parse(TemplateTestJsonUtil.jsonToParser(missingJson))); - assertEquals("Unable to parse field [resource_id] in a resources_created object.", e.getMessage()); + FlowFrameworkException missingJsonException = assertThrows( + FlowFrameworkException.class, + () -> ResourceCreated.parse(TemplateTestJsonUtil.jsonToParser(missingJson)) + ); + assertEquals("A ResourceCreated object requires workflowStepName", missingJsonException.getMessage()); } } diff --git a/src/test/java/org/opensearch/flowframework/rest/RestSearchWorkflowStateActionTests.java b/src/test/java/org/opensearch/flowframework/rest/RestSearchWorkflowStateActionTests.java new file mode 100644 index 000000000..028860831 --- /dev/null +++ b/src/test/java/org/opensearch/flowframework/rest/RestSearchWorkflowStateActionTests.java @@ -0,0 +1,85 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.flowframework.rest; + +import org.opensearch.client.node.NodeClient; +import org.opensearch.core.common.bytes.BytesArray; +import org.opensearch.core.rest.RestStatus; +import org.opensearch.core.xcontent.MediaTypeRegistry; +import org.opensearch.core.xcontent.XContentParseException; +import org.opensearch.flowframework.common.FlowFrameworkFeatureEnabledSetting; +import org.opensearch.rest.RestHandler.Route; +import org.opensearch.rest.RestRequest; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.test.rest.FakeRestChannel; +import org.opensearch.test.rest.FakeRestRequest; + +import java.util.List; +import java.util.Locale; + +import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_URI; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class RestSearchWorkflowStateActionTests extends OpenSearchTestCase { + private RestSearchWorkflowStateAction restSearchWorkflowStateAction; + private String searchPath; + private NodeClient nodeClient; + private FlowFrameworkFeatureEnabledSetting flowFrameworkFeatureEnabledSetting; + + @Override + public void setUp() throws Exception { + super.setUp(); + + this.searchPath = String.format(Locale.ROOT, "%s/%s", WORKFLOW_URI, "state/_search"); + flowFrameworkFeatureEnabledSetting = mock(FlowFrameworkFeatureEnabledSetting.class); + when(flowFrameworkFeatureEnabledSetting.isFlowFrameworkEnabled()).thenReturn(true); + this.restSearchWorkflowStateAction = new RestSearchWorkflowStateAction(flowFrameworkFeatureEnabledSetting); + this.nodeClient = mock(NodeClient.class); + } + + public void testRestSearchWorkflowStateActionName() { + String name = restSearchWorkflowStateAction.getName(); + assertEquals("search_workflow_state_action", name); + } + + public void testRestSearchWorkflowStateActionRoutes() { + List routes = restSearchWorkflowStateAction.routes(); + assertNotNull(routes); + assertEquals(2, routes.size()); + assertEquals(RestRequest.Method.POST, routes.get(0).getMethod()); + assertEquals(RestRequest.Method.GET, routes.get(1).getMethod()); + assertEquals(this.searchPath, routes.get(0).getPath()); + assertEquals(this.searchPath, routes.get(1).getPath()); + } + + public void testInvalidSearchRequest() { + final String requestContent = "{\"query\":{\"bool\":{\"filter\":[{\"term\":{\"template\":\"1.0.0\"}}]}}}"; + RestRequest request = new FakeRestRequest.Builder(xContentRegistry()).withMethod(RestRequest.Method.GET) + .withPath(this.searchPath) + .withContent(new BytesArray(requestContent), MediaTypeRegistry.JSON) + .build(); + + XContentParseException ex = expectThrows(XContentParseException.class, () -> { + restSearchWorkflowStateAction.prepareRequest(request, nodeClient); + }); + assertEquals("unknown named object category [org.opensearch.index.query.QueryBuilder]", ex.getMessage()); + } + + public void testFeatureFlagNotEnabled() throws Exception { + when(flowFrameworkFeatureEnabledSetting.isFlowFrameworkEnabled()).thenReturn(false); + RestRequest request = new FakeRestRequest.Builder(xContentRegistry()).withMethod(RestRequest.Method.POST) + .withPath(this.searchPath) + .build(); + FakeRestChannel channel = new FakeRestChannel(request, false, 1); + restSearchWorkflowStateAction.handleRequest(request, channel, nodeClient); + assertEquals(RestStatus.FORBIDDEN, channel.capturedResponse().status()); + assertTrue(channel.capturedResponse().content().utf8ToString().contains("This API is disabled.")); + } +} diff --git a/src/test/java/org/opensearch/flowframework/transport/SearchWorkflowStateTransportActionTests.java b/src/test/java/org/opensearch/flowframework/transport/SearchWorkflowStateTransportActionTests.java new file mode 100644 index 000000000..d5dcddb8e --- /dev/null +++ b/src/test/java/org/opensearch/flowframework/transport/SearchWorkflowStateTransportActionTests.java @@ -0,0 +1,79 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.flowframework.transport; + +import org.opensearch.action.search.SearchRequest; +import org.opensearch.action.search.SearchResponse; +import org.opensearch.action.support.ActionFilters; +import org.opensearch.client.Client; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.concurrent.ThreadContext; +import org.opensearch.core.action.ActionListener; +import org.opensearch.tasks.Task; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.TransportService; + +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class SearchWorkflowStateTransportActionTests extends OpenSearchTestCase { + + private SearchWorkflowStateTransportAction searchWorkflowStateTransportAction; + private Client client; + private ThreadPool threadPool; + private ThreadContext threadContext; + + @Override + public void setUp() throws Exception { + super.setUp(); + this.client = mock(Client.class); + this.threadPool = mock(ThreadPool.class); + this.threadContext = new ThreadContext(Settings.EMPTY); + + when(client.threadPool()).thenReturn(threadPool); + when(threadPool.getThreadContext()).thenReturn(threadContext); + + this.searchWorkflowStateTransportAction = new SearchWorkflowStateTransportAction( + mock(TransportService.class), + mock(ActionFilters.class), + client + ); + + } + + public void testFailedSearchWorkflow() { + @SuppressWarnings("unchecked") + ActionListener listener = mock(ActionListener.class); + SearchRequest searchRequest = new SearchRequest(); + + doAnswer(invocation -> { + ActionListener responseListener = invocation.getArgument(1); + responseListener.onFailure(new Exception("Search failed")); + return null; + }).when(client).search(any(), any()); + + searchWorkflowStateTransportAction.doExecute(mock(Task.class), searchRequest, listener); + verify(listener, times(1)).onFailure(any()); + } + + public void testSearchWorkflow() { + @SuppressWarnings("unchecked") + ActionListener listener = mock(ActionListener.class); + SearchRequest searchRequest = new SearchRequest(); + + searchWorkflowStateTransportAction.doExecute(mock(Task.class), searchRequest, listener); + verify(client, times(1)).search(any(SearchRequest.class), any()); + } + +}