Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Delete Workflow API #294

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.opensearch.flowframework.common.FlowFrameworkFeatureEnabledSetting;
import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler;
import org.opensearch.flowframework.rest.RestCreateWorkflowAction;
import org.opensearch.flowframework.rest.RestDeleteWorkflowAction;
import org.opensearch.flowframework.rest.RestDeprovisionWorkflowAction;
import org.opensearch.flowframework.rest.RestGetWorkflowAction;
import org.opensearch.flowframework.rest.RestGetWorkflowStateAction;
Expand All @@ -36,6 +37,8 @@
import org.opensearch.flowframework.rest.RestSearchWorkflowStateAction;
import org.opensearch.flowframework.transport.CreateWorkflowAction;
import org.opensearch.flowframework.transport.CreateWorkflowTransportAction;
import org.opensearch.flowframework.transport.DeleteWorkflowAction;
import org.opensearch.flowframework.transport.DeleteWorkflowTransportAction;
import org.opensearch.flowframework.transport.DeprovisionWorkflowAction;
import org.opensearch.flowframework.transport.DeprovisionWorkflowTransportAction;
import org.opensearch.flowframework.transport.GetWorkflowAction;
Expand Down Expand Up @@ -139,6 +142,7 @@ public List<RestHandler> getRestHandlers(
) {
return ImmutableList.of(
new RestCreateWorkflowAction(flowFrameworkFeatureEnabledSetting, settings, clusterService),
new RestDeleteWorkflowAction(flowFrameworkFeatureEnabledSetting),
new RestProvisionWorkflowAction(flowFrameworkFeatureEnabledSetting),
new RestDeprovisionWorkflowAction(flowFrameworkFeatureEnabledSetting),
new RestSearchWorkflowAction(flowFrameworkFeatureEnabledSetting),
Expand All @@ -152,6 +156,7 @@ public List<RestHandler> getRestHandlers(
public List<ActionHandler<? extends ActionRequest, ? extends ActionResponse>> getActions() {
return ImmutableList.of(
new ActionHandler<>(CreateWorkflowAction.INSTANCE, CreateWorkflowTransportAction.class),
new ActionHandler<>(DeleteWorkflowAction.INSTANCE, DeleteWorkflowTransportAction.class),
new ActionHandler<>(ProvisionWorkflowAction.INSTANCE, ProvisionWorkflowTransportAction.class),
new ActionHandler<>(DeprovisionWorkflowAction.INSTANCE, DeprovisionWorkflowTransportAction.class),
new ActionHandler<>(SearchWorkflowAction.INSTANCE, SearchWorkflowTransportAction.class),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework.rest;

import com.google.common.collect.ImmutableList;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.ExceptionsHelper;
import org.opensearch.client.node.NodeClient;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.flowframework.common.FlowFrameworkFeatureEnabledSetting;
import org.opensearch.flowframework.exception.FlowFrameworkException;
import org.opensearch.flowframework.transport.DeleteWorkflowAction;
import org.opensearch.flowframework.transport.WorkflowRequest;
import org.opensearch.rest.BaseRestHandler;
import org.opensearch.rest.BytesRestResponse;
import org.opensearch.rest.RestRequest;

import java.io.IOException;
import java.util.List;
import java.util.Locale;

import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_ID;
import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_URI;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.FLOW_FRAMEWORK_ENABLED;

/**
* Rest Action to facilitate requests to delete a stored template
*/
public class RestDeleteWorkflowAction extends BaseRestHandler {

private static final String DELETE_WORKFLOW_ACTION = "delete_workflow";
private static final Logger logger = LogManager.getLogger(RestDeleteWorkflowAction.class);
private FlowFrameworkFeatureEnabledSetting flowFrameworkFeatureEnabledSetting;

/**
* Instantiates a new RestDeleteWorkflowAction
* @param flowFrameworkFeatureEnabledSetting Whether this API is enabled
*/
public RestDeleteWorkflowAction(FlowFrameworkFeatureEnabledSetting flowFrameworkFeatureEnabledSetting) {
this.flowFrameworkFeatureEnabledSetting = flowFrameworkFeatureEnabledSetting;
}

@Override
public String getName() {
return DELETE_WORKFLOW_ACTION;
}

@Override
public List<Route> routes() {
return ImmutableList.of(new Route(RestRequest.Method.DELETE, String.format(Locale.ROOT, "%s/{%s}", WORKFLOW_URI, WORKFLOW_ID)));
}

@Override
protected BaseRestHandler.RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {
String workflowId = request.param(WORKFLOW_ID);
try {
if (!flowFrameworkFeatureEnabledSetting.isFlowFrameworkEnabled()) {
throw new FlowFrameworkException(
"This API is disabled. To enable it, update the setting [" + FLOW_FRAMEWORK_ENABLED.getKey() + "] to true.",
RestStatus.FORBIDDEN
);
}
// Validate content
if (request.hasContent()) {
throw new FlowFrameworkException("Invalid request format", RestStatus.BAD_REQUEST);
}
// Validate params
if (workflowId == null) {
throw new FlowFrameworkException("workflow_id cannot be null", RestStatus.BAD_REQUEST);
}
WorkflowRequest workflowRequest = new WorkflowRequest(workflowId, null);
return channel -> client.execute(DeleteWorkflowAction.INSTANCE, workflowRequest, ActionListener.wrap(response -> {
XContentBuilder builder = response.toXContent(channel.newBuilder(), ToXContent.EMPTY_PARAMS);
channel.sendResponse(new BytesRestResponse(RestStatus.OK, builder));
}, exception -> {
try {
FlowFrameworkException ex = exception instanceof FlowFrameworkException
? (FlowFrameworkException) exception
: new FlowFrameworkException(exception.getMessage(), ExceptionsHelper.status(exception));
XContentBuilder exceptionBuilder = ex.toXContent(channel.newErrorBuilder(), ToXContent.EMPTY_PARAMS);
channel.sendResponse(new BytesRestResponse(ex.getRestStatus(), exceptionBuilder));

} catch (IOException e) {
logger.error("Failed to send back delete workflow exception", e);
channel.sendResponse(new BytesRestResponse(ExceptionsHelper.status(e), e.getMessage()));
}
}));

} catch (FlowFrameworkException ex) {
return channel -> channel.sendResponse(
new BytesRestResponse(ex.getRestStatus(), ex.toXContent(channel.newErrorBuilder(), ToXContent.EMPTY_PARAMS))
);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework.transport;

import org.opensearch.action.ActionType;
import org.opensearch.action.delete.DeleteResponse;

import static org.opensearch.flowframework.common.CommonValue.TRANSPORT_ACTION_NAME_PREFIX;

/**
* External Action for public facing RestGetWorkflowAction
*/
public class DeleteWorkflowAction extends ActionType<DeleteResponse> {
/** The name of this action */
public static final String NAME = TRANSPORT_ACTION_NAME_PREFIX + "workflow/delete";
/** An instance of this action */
public static final DeleteWorkflowAction INSTANCE = new DeleteWorkflowAction();

private DeleteWorkflowAction() {
super(NAME, DeleteResponse::new);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework.transport;

import org.opensearch.action.delete.DeleteRequest;
import org.opensearch.action.delete.DeleteResponse;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.HandledTransportAction;
import org.opensearch.client.Client;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.flowframework.exception.FlowFrameworkException;
import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler;
import org.opensearch.tasks.Task;
import org.opensearch.transport.TransportService;

import static org.opensearch.flowframework.common.CommonValue.GLOBAL_CONTEXT_INDEX;

/**
* Transport action to retrieve a use case template within the Global Context
*/
public class DeleteWorkflowTransportAction extends HandledTransportAction<WorkflowRequest, DeleteResponse> {

private final FlowFrameworkIndicesHandler flowFrameworkIndicesHandler;
private final Client client;

/**
* Instantiates a new DeleteWorkflowTransportAction instance
* @param transportService the transport service
* @param actionFilters action filters
* @param flowFrameworkIndicesHandler The Flow Framework indices handler
* @param client the OpenSearch Client
*/
@Inject
public DeleteWorkflowTransportAction(
TransportService transportService,
ActionFilters actionFilters,
FlowFrameworkIndicesHandler flowFrameworkIndicesHandler,
Client client
) {
super(DeleteWorkflowAction.NAME, transportService, actionFilters, WorkflowRequest::new);
this.flowFrameworkIndicesHandler = flowFrameworkIndicesHandler;
this.client = client;
}

@Override
protected void doExecute(Task task, WorkflowRequest request, ActionListener<DeleteResponse> listener) {
if (flowFrameworkIndicesHandler.doesIndexExist(GLOBAL_CONTEXT_INDEX)) {
String workflowId = request.getWorkflowId();
DeleteRequest deleteRequest = new DeleteRequest(GLOBAL_CONTEXT_INDEX, workflowId);

ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext();
client.delete(deleteRequest, ActionListener.runBefore(listener, () -> context.restore()));
} else {
listener.onFailure(new FlowFrameworkException("There are no templates in the global context.", RestStatus.NOT_FOUND));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ protected void doExecute(Task task, WorkflowRequest request, ActionListener<Work
// Sort and validate graph
Workflow provisionWorkflow = template.workflows().get(PROVISION_WORKFLOW);
List<ProcessNode> provisionProcessSequence = workflowProcessSorter.sortProcessNodes(provisionWorkflow, workflowId);
workflowProcessSorter.validateGraph(provisionProcessSequence);
workflowProcessSorter.validate(provisionProcessSequence);

// We have a valid template and sorted nodes, get the created resources
getResourcesAndExecute(request.getWorkflowId(), provisionProcessSequence, listener);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,8 @@ public void testPlugin() throws IOException {
4,
ffp.createComponents(client, clusterService, threadPool, null, null, null, environment, null, null, null, null).size()
);
assertEquals(7, ffp.getRestHandlers(settings, null, null, null, null, null, null).size());
assertEquals(7, ffp.getActions().size());
assertEquals(8, ffp.getRestHandlers(settings, null, null, null, null, null, null).size());
assertEquals(8, ffp.getActions().size());
assertEquals(1, ffp.getExecutorBuilders(settings).size());
assertEquals(5, ffp.getSettings().size());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework.rest;

import org.opensearch.client.node.NodeClient;
import org.opensearch.core.common.bytes.BytesArray;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.MediaTypeRegistry;
import org.opensearch.flowframework.common.FlowFrameworkFeatureEnabledSetting;
import org.opensearch.rest.RestHandler;
import org.opensearch.rest.RestRequest;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.test.rest.FakeRestChannel;
import org.opensearch.test.rest.FakeRestRequest;

import java.util.List;
import java.util.Locale;

import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_URI;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class RestDeleteWorkflowActionTests extends OpenSearchTestCase {
private RestDeleteWorkflowAction restDeleteWorkflowAction;
private String getPath;
private FlowFrameworkFeatureEnabledSetting flowFrameworkFeatureEnabledSetting;
private NodeClient nodeClient;

@Override
public void setUp() throws Exception {
super.setUp();

this.getPath = String.format(Locale.ROOT, "%s/{%s}", WORKFLOW_URI, "workflow_id");
flowFrameworkFeatureEnabledSetting = mock(FlowFrameworkFeatureEnabledSetting.class);
when(flowFrameworkFeatureEnabledSetting.isFlowFrameworkEnabled()).thenReturn(true);
this.restDeleteWorkflowAction = new RestDeleteWorkflowAction(flowFrameworkFeatureEnabledSetting);
this.nodeClient = mock(NodeClient.class);
}

public void testRestDeleteWorkflowActionName() {
String name = restDeleteWorkflowAction.getName();
assertEquals("delete_workflow", name);
}

public void testRestDeleteWorkflowActionRoutes() {
List<RestHandler.Route> routes = restDeleteWorkflowAction.routes();
assertEquals(1, routes.size());
assertEquals(RestRequest.Method.DELETE, routes.get(0).getMethod());
assertEquals(this.getPath, routes.get(0).getPath());
}

public void testInvalidRequestWithContent() {
RestRequest request = new FakeRestRequest.Builder(xContentRegistry()).withMethod(RestRequest.Method.DELETE)
.withPath(this.getPath)
.withContent(new BytesArray("request body"), MediaTypeRegistry.JSON)
.build();

FakeRestChannel channel = new FakeRestChannel(request, false, 1);
IllegalArgumentException ex = expectThrows(IllegalArgumentException.class, () -> {
restDeleteWorkflowAction.handleRequest(request, channel, nodeClient);
});
assertEquals("request [DELETE /_plugins/_flow_framework/workflow/{workflow_id}] does not support having a body", ex.getMessage());
}

public void testNullWorkflowId() throws Exception {

// Request with no params
RestRequest request = new FakeRestRequest.Builder(xContentRegistry()).withMethod(RestRequest.Method.DELETE)
.withPath(this.getPath)
.build();

FakeRestChannel channel = new FakeRestChannel(request, true, 1);
restDeleteWorkflowAction.handleRequest(request, channel, nodeClient);

assertEquals(1, channel.errors().get());
assertEquals(RestStatus.BAD_REQUEST, channel.capturedResponse().status());
assertTrue(channel.capturedResponse().content().utf8ToString().contains("workflow_id cannot be null"));
}

public void testFeatureFlagNotEnabled() throws Exception {
when(flowFrameworkFeatureEnabledSetting.isFlowFrameworkEnabled()).thenReturn(false);
RestRequest request = new FakeRestRequest.Builder(xContentRegistry()).withMethod(RestRequest.Method.DELETE)
.withPath(this.getPath)
.build();
FakeRestChannel channel = new FakeRestChannel(request, false, 1);
restDeleteWorkflowAction.handleRequest(request, channel, nodeClient);
assertEquals(RestStatus.FORBIDDEN, channel.capturedResponse().status());
assertTrue(channel.capturedResponse().content().utf8ToString().contains("This API is disabled."));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public void testRestGetWorkflowActionRoutes() {
}

public void testInvalidRequestWithContent() {
RestRequest request = new FakeRestRequest.Builder(xContentRegistry()).withMethod(RestRequest.Method.POST)
RestRequest request = new FakeRestRequest.Builder(xContentRegistry()).withMethod(RestRequest.Method.GET)
.withPath(this.getPath)
.withContent(new BytesArray("request body"), MediaTypeRegistry.JSON)
.build();
Expand All @@ -65,13 +65,13 @@ public void testInvalidRequestWithContent() {
IllegalArgumentException ex = expectThrows(IllegalArgumentException.class, () -> {
restGetWorkflowAction.handleRequest(request, channel, nodeClient);
});
assertEquals("request [POST /_plugins/_flow_framework/workflow/{workflow_id}] does not support having a body", ex.getMessage());
assertEquals("request [GET /_plugins/_flow_framework/workflow/{workflow_id}] does not support having a body", ex.getMessage());
}

public void testNullWorkflowId() throws Exception {

// Request with no params
RestRequest request = new FakeRestRequest.Builder(xContentRegistry()).withMethod(RestRequest.Method.POST)
RestRequest request = new FakeRestRequest.Builder(xContentRegistry()).withMethod(RestRequest.Method.GET)
.withPath(this.getPath)
.build();

Expand All @@ -85,7 +85,7 @@ public void testNullWorkflowId() throws Exception {

public void testFeatureFlagNotEnabled() throws Exception {
when(flowFrameworkFeatureEnabledSetting.isFlowFrameworkEnabled()).thenReturn(false);
RestRequest request = new FakeRestRequest.Builder(xContentRegistry()).withMethod(RestRequest.Method.POST)
RestRequest request = new FakeRestRequest.Builder(xContentRegistry()).withMethod(RestRequest.Method.GET)
.withPath(this.getPath)
.build();
FakeRestChannel channel = new FakeRestChannel(request, false, 1);
Expand Down
Loading
Loading