Skip to content

Commit

Permalink
Add Delete Workflow API (opensearch-project#294)
Browse files Browse the repository at this point in the history
Signed-off-by: Daniel Widdis <widdis@gmail.com>
  • Loading branch information
dbwiddis committed Dec 15, 2023
1 parent 4cbf9dc commit 856631d
Show file tree
Hide file tree
Showing 9 changed files with 430 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.opensearch.flowframework.common.FlowFrameworkFeatureEnabledSetting;
import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler;
import org.opensearch.flowframework.rest.RestCreateWorkflowAction;
import org.opensearch.flowframework.rest.RestDeleteWorkflowAction;
import org.opensearch.flowframework.rest.RestDeprovisionWorkflowAction;
import org.opensearch.flowframework.rest.RestGetWorkflowAction;
import org.opensearch.flowframework.rest.RestGetWorkflowStateAction;
Expand All @@ -36,6 +37,8 @@
import org.opensearch.flowframework.rest.RestSearchWorkflowStateAction;
import org.opensearch.flowframework.transport.CreateWorkflowAction;
import org.opensearch.flowframework.transport.CreateWorkflowTransportAction;
import org.opensearch.flowframework.transport.DeleteWorkflowAction;
import org.opensearch.flowframework.transport.DeleteWorkflowTransportAction;
import org.opensearch.flowframework.transport.DeprovisionWorkflowAction;
import org.opensearch.flowframework.transport.DeprovisionWorkflowTransportAction;
import org.opensearch.flowframework.transport.GetWorkflowAction;
Expand Down Expand Up @@ -139,6 +142,7 @@ public List<RestHandler> getRestHandlers(
) {
return ImmutableList.of(
new RestCreateWorkflowAction(flowFrameworkFeatureEnabledSetting, settings, clusterService),
new RestDeleteWorkflowAction(flowFrameworkFeatureEnabledSetting),
new RestProvisionWorkflowAction(flowFrameworkFeatureEnabledSetting),
new RestDeprovisionWorkflowAction(flowFrameworkFeatureEnabledSetting),
new RestSearchWorkflowAction(flowFrameworkFeatureEnabledSetting),
Expand All @@ -152,6 +156,7 @@ public List<RestHandler> getRestHandlers(
public List<ActionHandler<? extends ActionRequest, ? extends ActionResponse>> getActions() {
return ImmutableList.of(
new ActionHandler<>(CreateWorkflowAction.INSTANCE, CreateWorkflowTransportAction.class),
new ActionHandler<>(DeleteWorkflowAction.INSTANCE, DeleteWorkflowTransportAction.class),
new ActionHandler<>(ProvisionWorkflowAction.INSTANCE, ProvisionWorkflowTransportAction.class),
new ActionHandler<>(DeprovisionWorkflowAction.INSTANCE, DeprovisionWorkflowTransportAction.class),
new ActionHandler<>(SearchWorkflowAction.INSTANCE, SearchWorkflowTransportAction.class),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework.rest;

import com.google.common.collect.ImmutableList;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.ExceptionsHelper;
import org.opensearch.client.node.NodeClient;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.flowframework.common.FlowFrameworkFeatureEnabledSetting;
import org.opensearch.flowframework.exception.FlowFrameworkException;
import org.opensearch.flowframework.transport.DeleteWorkflowAction;
import org.opensearch.flowframework.transport.WorkflowRequest;
import org.opensearch.rest.BaseRestHandler;
import org.opensearch.rest.BytesRestResponse;
import org.opensearch.rest.RestRequest;

import java.io.IOException;
import java.util.List;
import java.util.Locale;

import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_ID;
import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_URI;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.FLOW_FRAMEWORK_ENABLED;

/**
* Rest Action to facilitate requests to delete a stored template
*/
public class RestDeleteWorkflowAction extends BaseRestHandler {

private static final String DELETE_WORKFLOW_ACTION = "delete_workflow";
private static final Logger logger = LogManager.getLogger(RestDeleteWorkflowAction.class);
private FlowFrameworkFeatureEnabledSetting flowFrameworkFeatureEnabledSetting;

/**
* Instantiates a new RestDeleteWorkflowAction
* @param flowFrameworkFeatureEnabledSetting Whether this API is enabled
*/
public RestDeleteWorkflowAction(FlowFrameworkFeatureEnabledSetting flowFrameworkFeatureEnabledSetting) {
this.flowFrameworkFeatureEnabledSetting = flowFrameworkFeatureEnabledSetting;
}

@Override
public String getName() {
return DELETE_WORKFLOW_ACTION;
}

@Override
public List<Route> routes() {
return ImmutableList.of(new Route(RestRequest.Method.DELETE, String.format(Locale.ROOT, "%s/{%s}", WORKFLOW_URI, WORKFLOW_ID)));
}

@Override
protected BaseRestHandler.RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {
String workflowId = request.param(WORKFLOW_ID);
try {
if (!flowFrameworkFeatureEnabledSetting.isFlowFrameworkEnabled()) {
throw new FlowFrameworkException(
"This API is disabled. To enable it, update the setting [" + FLOW_FRAMEWORK_ENABLED.getKey() + "] to true.",
RestStatus.FORBIDDEN
);
}
// Validate content
if (request.hasContent()) {
throw new FlowFrameworkException("Invalid request format", RestStatus.BAD_REQUEST);
}
// Validate params
if (workflowId == null) {
throw new FlowFrameworkException("workflow_id cannot be null", RestStatus.BAD_REQUEST);
}
WorkflowRequest workflowRequest = new WorkflowRequest(workflowId, null);
return channel -> client.execute(DeleteWorkflowAction.INSTANCE, workflowRequest, ActionListener.wrap(response -> {
XContentBuilder builder = response.toXContent(channel.newBuilder(), ToXContent.EMPTY_PARAMS);
channel.sendResponse(new BytesRestResponse(RestStatus.OK, builder));
}, exception -> {
try {
FlowFrameworkException ex = exception instanceof FlowFrameworkException
? (FlowFrameworkException) exception
: new FlowFrameworkException(exception.getMessage(), ExceptionsHelper.status(exception));
XContentBuilder exceptionBuilder = ex.toXContent(channel.newErrorBuilder(), ToXContent.EMPTY_PARAMS);
channel.sendResponse(new BytesRestResponse(ex.getRestStatus(), exceptionBuilder));

} catch (IOException e) {
logger.error("Failed to send back delete workflow exception", e);
channel.sendResponse(new BytesRestResponse(ExceptionsHelper.status(e), e.getMessage()));
}
}));

} catch (FlowFrameworkException ex) {
return channel -> channel.sendResponse(
new BytesRestResponse(ex.getRestStatus(), ex.toXContent(channel.newErrorBuilder(), ToXContent.EMPTY_PARAMS))
);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework.transport;

import org.opensearch.action.ActionType;
import org.opensearch.action.delete.DeleteResponse;

import static org.opensearch.flowframework.common.CommonValue.TRANSPORT_ACTION_NAME_PREFIX;

/**
* External Action for public facing RestGetWorkflowAction
*/
public class DeleteWorkflowAction extends ActionType<DeleteResponse> {
/** The name of this action */
public static final String NAME = TRANSPORT_ACTION_NAME_PREFIX + "workflow/delete";
/** An instance of this action */
public static final DeleteWorkflowAction INSTANCE = new DeleteWorkflowAction();

private DeleteWorkflowAction() {
super(NAME, DeleteResponse::new);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework.transport;

import org.opensearch.action.delete.DeleteRequest;
import org.opensearch.action.delete.DeleteResponse;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.HandledTransportAction;
import org.opensearch.client.Client;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.flowframework.exception.FlowFrameworkException;
import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler;
import org.opensearch.tasks.Task;
import org.opensearch.transport.TransportService;

import static org.opensearch.flowframework.common.CommonValue.GLOBAL_CONTEXT_INDEX;

/**
* Transport action to retrieve a use case template within the Global Context
*/
public class DeleteWorkflowTransportAction extends HandledTransportAction<WorkflowRequest, DeleteResponse> {

private final FlowFrameworkIndicesHandler flowFrameworkIndicesHandler;
private final Client client;

/**
* Instantiates a new DeleteWorkflowTransportAction instance
* @param transportService the transport service
* @param actionFilters action filters
* @param flowFrameworkIndicesHandler The Flow Framework indices handler
* @param client the OpenSearch Client
*/
@Inject
public DeleteWorkflowTransportAction(
TransportService transportService,
ActionFilters actionFilters,
FlowFrameworkIndicesHandler flowFrameworkIndicesHandler,
Client client
) {
super(DeleteWorkflowAction.NAME, transportService, actionFilters, WorkflowRequest::new);
this.flowFrameworkIndicesHandler = flowFrameworkIndicesHandler;
this.client = client;
}

@Override
protected void doExecute(Task task, WorkflowRequest request, ActionListener<DeleteResponse> listener) {
if (flowFrameworkIndicesHandler.doesIndexExist(GLOBAL_CONTEXT_INDEX)) {
String workflowId = request.getWorkflowId();
DeleteRequest deleteRequest = new DeleteRequest(GLOBAL_CONTEXT_INDEX, workflowId);

ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext();
client.delete(deleteRequest, ActionListener.runBefore(listener, () -> context.restore()));
} else {
listener.onFailure(new FlowFrameworkException("There are no templates in the global context.", RestStatus.NOT_FOUND));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ protected void doExecute(Task task, WorkflowRequest request, ActionListener<Work
// Sort and validate graph
Workflow provisionWorkflow = template.workflows().get(PROVISION_WORKFLOW);
List<ProcessNode> provisionProcessSequence = workflowProcessSorter.sortProcessNodes(provisionWorkflow, workflowId);
workflowProcessSorter.validateGraph(provisionProcessSequence);
workflowProcessSorter.validate(provisionProcessSequence);

// We have a valid template and sorted nodes, get the created resources
getResourcesAndExecute(request.getWorkflowId(), provisionProcessSequence, listener);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,8 @@ public void testPlugin() throws IOException {
4,
ffp.createComponents(client, clusterService, threadPool, null, null, null, environment, null, null, null, null).size()
);
assertEquals(7, ffp.getRestHandlers(settings, null, null, null, null, null, null).size());
assertEquals(7, ffp.getActions().size());
assertEquals(8, ffp.getRestHandlers(settings, null, null, null, null, null, null).size());
assertEquals(8, ffp.getActions().size());
assertEquals(1, ffp.getExecutorBuilders(settings).size());
assertEquals(5, ffp.getSettings().size());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework.rest;

import org.opensearch.client.node.NodeClient;
import org.opensearch.core.common.bytes.BytesArray;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.MediaTypeRegistry;
import org.opensearch.flowframework.common.FlowFrameworkFeatureEnabledSetting;
import org.opensearch.rest.RestHandler;
import org.opensearch.rest.RestRequest;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.test.rest.FakeRestChannel;
import org.opensearch.test.rest.FakeRestRequest;

import java.util.List;
import java.util.Locale;

import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_URI;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class RestDeleteWorkflowActionTests extends OpenSearchTestCase {
private RestDeleteWorkflowAction restDeleteWorkflowAction;
private String getPath;
private FlowFrameworkFeatureEnabledSetting flowFrameworkFeatureEnabledSetting;
private NodeClient nodeClient;

@Override
public void setUp() throws Exception {
super.setUp();

this.getPath = String.format(Locale.ROOT, "%s/{%s}", WORKFLOW_URI, "workflow_id");
flowFrameworkFeatureEnabledSetting = mock(FlowFrameworkFeatureEnabledSetting.class);
when(flowFrameworkFeatureEnabledSetting.isFlowFrameworkEnabled()).thenReturn(true);
this.restDeleteWorkflowAction = new RestDeleteWorkflowAction(flowFrameworkFeatureEnabledSetting);
this.nodeClient = mock(NodeClient.class);
}

public void testRestDeleteWorkflowActionName() {
String name = restDeleteWorkflowAction.getName();
assertEquals("delete_workflow", name);
}

public void testRestDeleteWorkflowActionRoutes() {
List<RestHandler.Route> routes = restDeleteWorkflowAction.routes();
assertEquals(1, routes.size());
assertEquals(RestRequest.Method.DELETE, routes.get(0).getMethod());
assertEquals(this.getPath, routes.get(0).getPath());
}

public void testInvalidRequestWithContent() {
RestRequest request = new FakeRestRequest.Builder(xContentRegistry()).withMethod(RestRequest.Method.DELETE)
.withPath(this.getPath)
.withContent(new BytesArray("request body"), MediaTypeRegistry.JSON)
.build();

FakeRestChannel channel = new FakeRestChannel(request, false, 1);
IllegalArgumentException ex = expectThrows(IllegalArgumentException.class, () -> {
restDeleteWorkflowAction.handleRequest(request, channel, nodeClient);
});
assertEquals("request [DELETE /_plugins/_flow_framework/workflow/{workflow_id}] does not support having a body", ex.getMessage());
}

public void testNullWorkflowId() throws Exception {

// Request with no params
RestRequest request = new FakeRestRequest.Builder(xContentRegistry()).withMethod(RestRequest.Method.DELETE)
.withPath(this.getPath)
.build();

FakeRestChannel channel = new FakeRestChannel(request, true, 1);
restDeleteWorkflowAction.handleRequest(request, channel, nodeClient);

assertEquals(1, channel.errors().get());
assertEquals(RestStatus.BAD_REQUEST, channel.capturedResponse().status());
assertTrue(channel.capturedResponse().content().utf8ToString().contains("workflow_id cannot be null"));
}

public void testFeatureFlagNotEnabled() throws Exception {
when(flowFrameworkFeatureEnabledSetting.isFlowFrameworkEnabled()).thenReturn(false);
RestRequest request = new FakeRestRequest.Builder(xContentRegistry()).withMethod(RestRequest.Method.DELETE)
.withPath(this.getPath)
.build();
FakeRestChannel channel = new FakeRestChannel(request, false, 1);
restDeleteWorkflowAction.handleRequest(request, channel, nodeClient);
assertEquals(RestStatus.FORBIDDEN, channel.capturedResponse().status());
assertTrue(channel.capturedResponse().content().utf8ToString().contains("This API is disabled."));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public void testRestGetWorkflowActionRoutes() {
}

public void testInvalidRequestWithContent() {
RestRequest request = new FakeRestRequest.Builder(xContentRegistry()).withMethod(RestRequest.Method.POST)
RestRequest request = new FakeRestRequest.Builder(xContentRegistry()).withMethod(RestRequest.Method.GET)
.withPath(this.getPath)
.withContent(new BytesArray("request body"), MediaTypeRegistry.JSON)
.build();
Expand All @@ -65,13 +65,13 @@ public void testInvalidRequestWithContent() {
IllegalArgumentException ex = expectThrows(IllegalArgumentException.class, () -> {
restGetWorkflowAction.handleRequest(request, channel, nodeClient);
});
assertEquals("request [POST /_plugins/_flow_framework/workflow/{workflow_id}] does not support having a body", ex.getMessage());
assertEquals("request [GET /_plugins/_flow_framework/workflow/{workflow_id}] does not support having a body", ex.getMessage());
}

public void testNullWorkflowId() throws Exception {

// Request with no params
RestRequest request = new FakeRestRequest.Builder(xContentRegistry()).withMethod(RestRequest.Method.POST)
RestRequest request = new FakeRestRequest.Builder(xContentRegistry()).withMethod(RestRequest.Method.GET)
.withPath(this.getPath)
.build();

Expand All @@ -85,7 +85,7 @@ public void testNullWorkflowId() throws Exception {

public void testFeatureFlagNotEnabled() throws Exception {
when(flowFrameworkFeatureEnabledSetting.isFlowFrameworkEnabled()).thenReturn(false);
RestRequest request = new FakeRestRequest.Builder(xContentRegistry()).withMethod(RestRequest.Method.POST)
RestRequest request = new FakeRestRequest.Builder(xContentRegistry()).withMethod(RestRequest.Method.GET)
.withPath(this.getPath)
.build();
FakeRestChannel channel = new FakeRestChannel(request, false, 1);
Expand Down
Loading

0 comments on commit 856631d

Please sign in to comment.