diff --git a/src/main/java/org/opensearch/flowframework/FlowFrameworkPlugin.java b/src/main/java/org/opensearch/flowframework/FlowFrameworkPlugin.java index 14df7e17e..60e094763 100644 --- a/src/main/java/org/opensearch/flowframework/FlowFrameworkPlugin.java +++ b/src/main/java/org/opensearch/flowframework/FlowFrameworkPlugin.java @@ -28,11 +28,14 @@ import org.opensearch.flowframework.common.FlowFrameworkFeatureEnabledSetting; import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler; import org.opensearch.flowframework.rest.RestCreateWorkflowAction; +import org.opensearch.flowframework.rest.RestDeprovisionWorkflowAction; import org.opensearch.flowframework.rest.RestGetWorkflowAction; import org.opensearch.flowframework.rest.RestProvisionWorkflowAction; import org.opensearch.flowframework.rest.RestSearchWorkflowAction; import org.opensearch.flowframework.transport.CreateWorkflowAction; import org.opensearch.flowframework.transport.CreateWorkflowTransportAction; +import org.opensearch.flowframework.transport.DeprovisionWorkflowAction; +import org.opensearch.flowframework.transport.DeprovisionWorkflowTransportAction; import org.opensearch.flowframework.transport.GetWorkflowAction; import org.opensearch.flowframework.transport.GetWorkflowTransportAction; import org.opensearch.flowframework.transport.ProvisionWorkflowAction; @@ -124,6 +127,7 @@ public List getRestHandlers( return ImmutableList.of( new RestCreateWorkflowAction(flowFrameworkFeatureEnabledSetting, settings, clusterService), new RestProvisionWorkflowAction(flowFrameworkFeatureEnabledSetting), + new RestDeprovisionWorkflowAction(flowFrameworkFeatureEnabledSetting), new RestSearchWorkflowAction(flowFrameworkFeatureEnabledSetting), new RestGetWorkflowAction(flowFrameworkFeatureEnabledSetting) ); @@ -134,6 +138,7 @@ public List getRestHandlers( return ImmutableList.of( new ActionHandler<>(CreateWorkflowAction.INSTANCE, CreateWorkflowTransportAction.class), new ActionHandler<>(ProvisionWorkflowAction.INSTANCE, ProvisionWorkflowTransportAction.class), + new ActionHandler<>(DeprovisionWorkflowAction.INSTANCE, DeprovisionWorkflowTransportAction.class), new ActionHandler<>(SearchWorkflowAction.INSTANCE, SearchWorkflowTransportAction.class), new ActionHandler<>(GetWorkflowAction.INSTANCE, GetWorkflowTransportAction.class) ); diff --git a/src/main/java/org/opensearch/flowframework/common/WorkflowResources.java b/src/main/java/org/opensearch/flowframework/common/WorkflowResources.java index d43a9e0f9..05ee4813d 100644 --- a/src/main/java/org/opensearch/flowframework/common/WorkflowResources.java +++ b/src/main/java/org/opensearch/flowframework/common/WorkflowResources.java @@ -23,32 +23,34 @@ public enum WorkflowResources { /** official workflow step name for creating a connector and associated created resource */ - CREATE_CONNECTOR("create_connector", "connector_id"), + CREATE_CONNECTOR("create_connector", "connector_id", "delete_connetor"), /** official workflow step name for registering a remote model and associated created resource */ - REGISTER_REMOTE_MODEL("register_remote_model", "model_id"), + REGISTER_REMOTE_MODEL("register_remote_model", "model_id", "delete_model"), /** official workflow step name for registering a local model and associated created resource */ - REGISTER_LOCAL_MODEL("register_local_model", "model_id"), + REGISTER_LOCAL_MODEL("register_local_model", "model_id", "delete_model"), /** official workflow step name for registering a model group and associated created resource */ - REGISTER_MODEL_GROUP("register_model_group", "model_group_id"), + REGISTER_MODEL_GROUP("register_model_group", "model_group_id", null), // TODO /** official workflow step name for deploying a model and associated created resource */ - DEPLOY_MODEL("deploy_model", "model_id"), + DEPLOY_MODEL("deploy_model", "model_id", "undeploy_model"), /** official workflow step name for creating an ingest-pipeline and associated created resource */ - CREATE_INGEST_PIPELINE("create_ingest_pipeline", "pipeline_id"), + CREATE_INGEST_PIPELINE("create_ingest_pipeline", "pipeline_id", null), // TODO /** official workflow step name for creating an index and associated created resource */ - CREATE_INDEX("create_index", "index_name"), + CREATE_INDEX("create_index", "index_name", null), // TODO /** official workflow step name for register an agent and the associated created resource */ - REGISTER_AGENT("register_agent", "agent_id"); + REGISTER_AGENT("register_agent", "agent_id", "delete_agent"); private final String workflowStep; private final String resourceCreated; + private final String deprovisionStep; private static final Logger logger = LogManager.getLogger(WorkflowResources.class); private static final Set allResources = Stream.of(values()) .map(WorkflowResources::getResourceCreated) .collect(Collectors.toSet()); - WorkflowResources(String workflowStep, String resourceCreated) { + WorkflowResources(String workflowStep, String resourceCreated, String deprovisionStep) { this.workflowStep = workflowStep; this.resourceCreated = resourceCreated; + this.deprovisionStep = deprovisionStep; } /** @@ -68,7 +70,15 @@ public String getResourceCreated() { } /** - * gets the resources created type based on the workflowStep + * Returns the deprovisionStep for the given enum Constant + * @return the deprovisionStep of this data. + */ + public String getDeprovisionStep() { + return deprovisionStep; + } + + /** + * Gets the resources created type based on the workflowStep. * @param workflowStep workflow step name * @return the resource that will be created * @throws FlowFrameworkException if workflow step doesn't exist in enum @@ -85,6 +95,24 @@ public static String getResourceByWorkflowStep(String workflowStep) throws FlowF throw new FlowFrameworkException("Unable to find resource type for step: " + workflowStep, RestStatus.BAD_REQUEST); } + /** + * Gets the deprovision step type based on the workflowStep. + * @param workflowStep workflow step name + * @return the corresponding step to deprovision + * @throws FlowFrameworkException if workflow step doesn't exist in enum + */ + public static String getDeprovisionStepByWorkflowStep(String workflowStep) throws FlowFrameworkException { + if (workflowStep != null && !workflowStep.isEmpty()) { + for (WorkflowResources mapping : values()) { + if (mapping.getWorkflowStep().equals(workflowStep)) { + return mapping.getDeprovisionStep(); + } + } + } + logger.error("Unable to find deprovision step for step: " + workflowStep); + throw new FlowFrameworkException("Unable to find deprovision step for step: " + workflowStep, RestStatus.BAD_REQUEST); + } + /** * Returns all the possible resource created types in enum * @return a set of all the resource created types diff --git a/src/main/java/org/opensearch/flowframework/rest/RestDeprovisionWorkflowAction.java b/src/main/java/org/opensearch/flowframework/rest/RestDeprovisionWorkflowAction.java new file mode 100644 index 000000000..834901b90 --- /dev/null +++ b/src/main/java/org/opensearch/flowframework/rest/RestDeprovisionWorkflowAction.java @@ -0,0 +1,108 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.flowframework.rest; + +import com.google.common.collect.ImmutableList; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.ExceptionsHelper; +import org.opensearch.client.node.NodeClient; +import org.opensearch.core.action.ActionListener; +import org.opensearch.core.rest.RestStatus; +import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.flowframework.common.FlowFrameworkFeatureEnabledSetting; +import org.opensearch.flowframework.exception.FlowFrameworkException; +import org.opensearch.flowframework.transport.DeprovisionWorkflowAction; +import org.opensearch.flowframework.transport.WorkflowRequest; +import org.opensearch.rest.BaseRestHandler; +import org.opensearch.rest.BytesRestResponse; +import org.opensearch.rest.RestRequest; + +import java.io.IOException; +import java.util.List; +import java.util.Locale; + +import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_ID; +import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_URI; +import static org.opensearch.flowframework.common.FlowFrameworkSettings.FLOW_FRAMEWORK_ENABLED; + +/** + * Rest Action to facilitate requests to de-provision a workflow + */ +public class RestDeprovisionWorkflowAction extends BaseRestHandler { + + private static final String DEPROVISION_WORKFLOW_ACTION = "deprovision_workflow"; + private static final Logger logger = LogManager.getLogger(RestDeprovisionWorkflowAction.class); + private final FlowFrameworkFeatureEnabledSetting flowFrameworkFeatureEnabledSetting; + + /** + * Instantiates a new RestDeprovisionWorkflowAction + * @param flowFrameworkFeatureEnabledSetting Whether this API is enabled + */ + public RestDeprovisionWorkflowAction(FlowFrameworkFeatureEnabledSetting flowFrameworkFeatureEnabledSetting) { + this.flowFrameworkFeatureEnabledSetting = flowFrameworkFeatureEnabledSetting; + } + + @Override + public String getName() { + return DEPROVISION_WORKFLOW_ACTION; + } + + @Override + protected BaseRestHandler.RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException { + + try { + if (!flowFrameworkFeatureEnabledSetting.isFlowFrameworkEnabled()) { + throw new FlowFrameworkException( + "This API is disabled. To enable it, update the setting [" + FLOW_FRAMEWORK_ENABLED.getKey() + "] to true.", + RestStatus.FORBIDDEN + ); + } + + // Validate content + if (request.hasContent()) { + throw new FlowFrameworkException("No request body is required", RestStatus.BAD_REQUEST); + } + // Validate params + String workflowId = request.param(WORKFLOW_ID); + if (workflowId == null) { + throw new FlowFrameworkException("workflow_id cannot be null", RestStatus.BAD_REQUEST); + } + WorkflowRequest workflowRequest = new WorkflowRequest(workflowId, null); + + return channel -> client.execute(DeprovisionWorkflowAction.INSTANCE, workflowRequest, ActionListener.wrap(response -> { + XContentBuilder builder = response.toXContent(channel.newBuilder(), ToXContent.EMPTY_PARAMS); + channel.sendResponse(new BytesRestResponse(RestStatus.OK, builder)); + }, exception -> { + try { + FlowFrameworkException ex = new FlowFrameworkException(exception.getMessage(), ExceptionsHelper.status(exception)); + XContentBuilder exceptionBuilder = ex.toXContent(channel.newErrorBuilder(), ToXContent.EMPTY_PARAMS); + channel.sendResponse(new BytesRestResponse(ex.getRestStatus(), exceptionBuilder)); + + } catch (IOException e) { + logger.error("Failed to send back provision workflow exception", e); + channel.sendResponse(new BytesRestResponse(ExceptionsHelper.status(e), e.getMessage())); + } + })); + + } catch (FlowFrameworkException ex) { + return channel -> channel.sendResponse( + new BytesRestResponse(ex.getRestStatus(), ex.toXContent(channel.newErrorBuilder(), ToXContent.EMPTY_PARAMS)) + ); + } + } + + @Override + public List routes() { + return ImmutableList.of( + new Route(RestRequest.Method.GET, String.format(Locale.ROOT, "%s/{%s}/%s", WORKFLOW_URI, WORKFLOW_ID, "_deprovision")) + ); + } +} diff --git a/src/main/java/org/opensearch/flowframework/rest/RestGetWorkflowAction.java b/src/main/java/org/opensearch/flowframework/rest/RestGetWorkflowAction.java index 6d9d5e3b5..3710c2142 100644 --- a/src/main/java/org/opensearch/flowframework/rest/RestGetWorkflowAction.java +++ b/src/main/java/org/opensearch/flowframework/rest/RestGetWorkflowAction.java @@ -68,7 +68,7 @@ protected BaseRestHandler.RestChannelConsumer prepareRequest(RestRequest request // Validate content if (request.hasContent()) { - throw new FlowFrameworkException("No request body present", RestStatus.BAD_REQUEST); + throw new FlowFrameworkException("No request body is required", RestStatus.BAD_REQUEST); } // Validate params String workflowId = request.param(WORKFLOW_ID); diff --git a/src/main/java/org/opensearch/flowframework/transport/DeprovisionWorkflowAction.java b/src/main/java/org/opensearch/flowframework/transport/DeprovisionWorkflowAction.java new file mode 100644 index 000000000..8efcfbbc3 --- /dev/null +++ b/src/main/java/org/opensearch/flowframework/transport/DeprovisionWorkflowAction.java @@ -0,0 +1,27 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.flowframework.transport; + +import org.opensearch.action.ActionType; + +import static org.opensearch.flowframework.common.CommonValue.TRANSPORT_ACTION_NAME_PREFIX; + +/** + * External Action for public facing RestDeprovisionWorkflowAction + */ +public class DeprovisionWorkflowAction extends ActionType { + /** The name of this action */ + public static final String NAME = TRANSPORT_ACTION_NAME_PREFIX + "workflow/deprovision"; + /** An instance of this action */ + public static final DeprovisionWorkflowAction INSTANCE = new DeprovisionWorkflowAction(); + + private DeprovisionWorkflowAction() { + super(NAME, WorkflowResponse::new); + } +} diff --git a/src/main/java/org/opensearch/flowframework/transport/DeprovisionWorkflowTransportAction.java b/src/main/java/org/opensearch/flowframework/transport/DeprovisionWorkflowTransportAction.java new file mode 100644 index 000000000..29b1f2a7a --- /dev/null +++ b/src/main/java/org/opensearch/flowframework/transport/DeprovisionWorkflowTransportAction.java @@ -0,0 +1,258 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.flowframework.transport; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.ExceptionsHelper; +import org.opensearch.action.get.GetRequest; +import org.opensearch.action.support.ActionFilters; +import org.opensearch.action.support.HandledTransportAction; +import org.opensearch.client.Client; +import org.opensearch.common.inject.Inject; +import org.opensearch.common.util.concurrent.ThreadContext; +import org.opensearch.core.action.ActionListener; +import org.opensearch.core.rest.RestStatus; +import org.opensearch.flowframework.exception.FlowFrameworkException; +import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler; +import org.opensearch.flowframework.model.ResourceCreated; +import org.opensearch.flowframework.model.Template; +import org.opensearch.flowframework.model.Workflow; +import org.opensearch.flowframework.util.EncryptorUtils; +import org.opensearch.flowframework.workflow.ProcessNode; +import org.opensearch.flowframework.workflow.WorkflowData; +import org.opensearch.flowframework.workflow.WorkflowProcessSorter; +import org.opensearch.flowframework.workflow.WorkflowStepFactory; +import org.opensearch.tasks.Task; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.TransportService; + +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.CompletableFuture; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static org.opensearch.flowframework.common.CommonValue.GLOBAL_CONTEXT_INDEX; +import static org.opensearch.flowframework.common.CommonValue.PROVISION_WORKFLOW; +import static org.opensearch.flowframework.common.WorkflowResources.getDeprovisionStepByWorkflowStep; +import static org.opensearch.flowframework.common.WorkflowResources.getResourceByWorkflowStep; + +/** + * Transport Action to deprovision a workflow from a stored use case template + */ +public class DeprovisionWorkflowTransportAction extends HandledTransportAction { + + private static final String DEPROVISION_SUFFIX = "_deprovision"; + + private final Logger logger = LogManager.getLogger(DeprovisionWorkflowTransportAction.class); + + private final ThreadPool threadPool; + private final Client client; + private final WorkflowProcessSorter workflowProcessSorter; + private final WorkflowStepFactory workflowStepFactory; + private final FlowFrameworkIndicesHandler flowFrameworkIndicesHandler; + private final EncryptorUtils encryptorUtils; + + /** + * Instantiates a new ProvisionWorkflowTransportAction + * @param transportService The TransportService + * @param actionFilters action filters + * @param threadPool The OpenSearch thread pool + * @param client The node client to retrieve a stored use case template + * @param workflowProcessSorter Utility class to generate a togologically sorted list of Process nodes + * @param workflowStepFactory The factory instantiating workflow steps + * @param flowFrameworkIndicesHandler Class to handle all internal system indices actions + * @param encryptorUtils Utility class to handle encryption/decryption + */ + @Inject + public DeprovisionWorkflowTransportAction( + TransportService transportService, + ActionFilters actionFilters, + ThreadPool threadPool, + Client client, + WorkflowProcessSorter workflowProcessSorter, + WorkflowStepFactory workflowStepFactory, + FlowFrameworkIndicesHandler flowFrameworkIndicesHandler, + EncryptorUtils encryptorUtils + ) { + super(DeprovisionWorkflowAction.NAME, transportService, actionFilters, WorkflowRequest::new); + this.threadPool = threadPool; + this.client = client; + this.workflowProcessSorter = workflowProcessSorter; + this.workflowStepFactory = workflowStepFactory; + this.flowFrameworkIndicesHandler = flowFrameworkIndicesHandler; + this.encryptorUtils = encryptorUtils; + } + + @Override + protected void doExecute(Task task, WorkflowRequest request, ActionListener listener) { + // Retrieve use case template from global context + String workflowId = request.getWorkflowId(); + GetRequest getRequest = new GetRequest(GLOBAL_CONTEXT_INDEX, workflowId); + + // Stash thread context to interact with system index + try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) { + client.get(getRequest, ActionListener.wrap(response -> { + context.restore(); + + if (!response.isExists()) { + listener.onFailure( + new FlowFrameworkException( + "Failed to retrieve template (" + workflowId + ") from global context.", + RestStatus.NOT_FOUND + ) + ); + return; + } + + // Parse template from document source + Template template = Template.parse(response.getSourceAsString()); + + // Decrypt template + template = encryptorUtils.decryptTemplateCredentials(template); + + // Sort and validate graph + Workflow provisionWorkflow = template.workflows().get(PROVISION_WORKFLOW); + List provisionProcessSequence = workflowProcessSorter.sortProcessNodes(provisionWorkflow, workflowId); + workflowProcessSorter.validateGraph(provisionProcessSequence); + + // We have a valid template and sorted nodes, get the created resources + getResourcesAndExecute(request, provisionProcessSequence, listener); + }, exception -> { + if (exception instanceof FlowFrameworkException) { + logger.error("Workflow validation failed for workflow : " + workflowId); + listener.onFailure(exception); + } else { + logger.error("Failed to retrieve template from global context.", exception); + listener.onFailure(new FlowFrameworkException(exception.getMessage(), ExceptionsHelper.status(exception))); + } + })); + } catch (Exception e) { + logger.error("Failed to retrieve template from global context.", e); + listener.onFailure(new FlowFrameworkException(e.getMessage(), ExceptionsHelper.status(e))); + } + } + + private void getResourcesAndExecute( + WorkflowRequest request, + List provisionProcessSequence, + ActionListener listener + ) { + client.execute(GetWorkflowAction.INSTANCE, request, ActionListener.wrap(response -> { + // Get a map of step id to created resources + final Map resourceMap = response.getWorkflowState() + .resourcesCreated() + .stream() + .collect(Collectors.toMap(ResourceCreated::workflowStepId, Function.identity())); + + // Now finally do the deprovision + executeDeprovisionSequence(request.getWorkflowId(), resourceMap, provisionProcessSequence, listener); + }, exception -> { + logger.error("Failed to get workflow state for workflow " + request.getWorkflowId()); + listener.onFailure(new FlowFrameworkException(exception.getMessage(), ExceptionsHelper.status(exception))); + })); + } + + private void executeDeprovisionSequence( + String workflowId, + Map resourceMap, + List provisionProcessSequence, + ActionListener listener + ) { + // Create a list of ProcessNodes with ta corresponding deprovision workflow steps + List deprovisionProcessSequence = provisionProcessSequence.stream() + // Only include nodes that created a resource + .filter(pn -> resourceMap.containsKey(pn.id())) + // Create a new ProcessNode with a deprovision step + .map(pn -> { + String stepName = pn.workflowStep().getName(); + String deprovisionStep = getDeprovisionStepByWorkflowStep(stepName); + // Unimplemented steps presently return null, so skip + if (deprovisionStep == null) { + return null; + } + // New ID is old ID with deprovision added + String deprovisionStepId = pn.id() + DEPROVISION_SUFFIX; + return new ProcessNode( + deprovisionStepId, + workflowStepFactory.createStep(deprovisionStep), + Collections.emptyMap(), + new WorkflowData( + Map.of(getResourceByWorkflowStep(stepName), resourceMap.get(pn.id()).resourceId()), + workflowId, + deprovisionStepId + ), + Collections.emptyList(), + this.threadPool, + pn.nodeTimeout() + ); + }) + .filter(Objects::nonNull) + .collect(Collectors.toList()); + // Deprovision in reverse order of provisioning to minimize risk of dependencies + Collections.reverse(deprovisionProcessSequence); + + // Repeat attempting to delete resources as long as at least one is successful + int resourceCount = deprovisionProcessSequence.size(); + while (resourceCount > 0) { + Iterator iter = deprovisionProcessSequence.iterator(); + while (iter.hasNext()) { + ProcessNode deprovisionNode = iter.next(); + String resourceNameAndId = getResourceFromProcessNode(deprovisionNode, resourceMap); + CompletableFuture deprovisionFuture = deprovisionNode.execute(); + deprovisionFuture.join(); + if (deprovisionFuture.isCompletedExceptionally()) { + // Occasional failures with dependent resources may occur, log and continue + logger.info("Failed {} for {}", deprovisionNode.id(), resourceNameAndId); + } else { + // Remove from list so we don't try again + iter.remove(); + logger.info("Successfully deprovisioned {}", resourceNameAndId); + } + } + if (deprovisionProcessSequence.size() < resourceCount) { + // If we've deleted something, decrement and try again if not zero + resourceCount = deprovisionProcessSequence.size(); + } else { + // If nothing was deleted, exit loop + break; + } + } + if (deprovisionProcessSequence.isEmpty()) { + // Successful deprovision, return workflow ID + listener.onResponse(new WorkflowResponse(workflowId)); + } else { + // Failed deprovision, give user list of remaining resources + listener.onFailure( + new FlowFrameworkException( + "Failed to deprovision some resources: [" + + deprovisionProcessSequence.stream() + .map(pn -> getResourceFromProcessNode(pn, resourceMap)) + .collect(Collectors.joining(", ")) + + "].", + RestStatus.ACCEPTED + ) + ); + } + } + + private String getResourceFromProcessNode(ProcessNode deprovisionNode, Map resourceMap) { + String deprovisionId = deprovisionNode.id(); + int pos = deprovisionId.indexOf(DEPROVISION_SUFFIX); + if (pos > 0) { + String stepName = deprovisionId.substring(0, pos); + return getResourceByWorkflowStep(stepName) + " " + resourceMap.get(stepName); + } + return null; + } +} diff --git a/src/main/java/org/opensearch/flowframework/transport/GetWorkflowResponse.java b/src/main/java/org/opensearch/flowframework/transport/GetWorkflowResponse.java index 922a8a3f5..cd983c0d0 100644 --- a/src/main/java/org/opensearch/flowframework/transport/GetWorkflowResponse.java +++ b/src/main/java/org/opensearch/flowframework/transport/GetWorkflowResponse.java @@ -23,9 +23,9 @@ public class GetWorkflowResponse extends ActionResponse implements ToXContentObject { /** The workflow state */ - public WorkflowState workflowState; + private final WorkflowState workflowState; /** Flag to indicate if the entire state should be returned */ - public boolean allStatus; + private final boolean allStatus; /** * Instantiates a new GetWorkflowResponse from an input stream @@ -44,6 +44,7 @@ public GetWorkflowResponse(StreamInput in) throws IOException { * @param allStatus whether to return all fields in state index */ public GetWorkflowResponse(WorkflowState workflowState, boolean allStatus) { + this.allStatus = allStatus; if (allStatus) { this.workflowState = workflowState; } else { @@ -58,10 +59,27 @@ public GetWorkflowResponse(WorkflowState workflowState, boolean allStatus) { @Override public void writeTo(StreamOutput out) throws IOException { workflowState.writeTo(out); + out.writeBoolean(allStatus); } @Override public XContentBuilder toXContent(XContentBuilder xContentBuilder, Params params) throws IOException { return workflowState.toXContent(xContentBuilder, params); } + + /** + * Gets the workflow state. + * @return the workflow state + */ + public WorkflowState getWorkflowState() { + return workflowState; + } + + /** + * Gets whether to return the entire state. + * @return true if the entire state should be returned + */ + public boolean isAllStatus() { + return allStatus; + } } diff --git a/src/main/java/org/opensearch/flowframework/transport/WorkflowRequest.java b/src/main/java/org/opensearch/flowframework/transport/WorkflowRequest.java index d049be8f6..10e9f5473 100644 --- a/src/main/java/org/opensearch/flowframework/transport/WorkflowRequest.java +++ b/src/main/java/org/opensearch/flowframework/transport/WorkflowRequest.java @@ -19,7 +19,7 @@ import java.io.IOException; /** - * Transport Request to create and provision a workflow + * Transport Request to create, provision, and deprovision a workflow */ public class WorkflowRequest extends ActionRequest { diff --git a/src/test/java/org/opensearch/flowframework/FlowFrameworkPluginTests.java b/src/test/java/org/opensearch/flowframework/FlowFrameworkPluginTests.java index e3827e0b3..c958e70e5 100644 --- a/src/test/java/org/opensearch/flowframework/FlowFrameworkPluginTests.java +++ b/src/test/java/org/opensearch/flowframework/FlowFrameworkPluginTests.java @@ -81,8 +81,8 @@ public void testPlugin() throws IOException { 4, ffp.createComponents(client, clusterService, threadPool, null, null, null, environment, null, null, null, null).size() ); - assertEquals(4, ffp.getRestHandlers(settings, null, null, null, null, null, null).size()); - assertEquals(4, ffp.getActions().size()); + assertEquals(5, ffp.getRestHandlers(settings, null, null, null, null, null, null).size()); + assertEquals(5, ffp.getActions().size()); assertEquals(1, ffp.getExecutorBuilders(settings).size()); assertEquals(4, ffp.getSettings().size()); }