Skip to content

Commit

Permalink
Deprovision REST and Transport Actions
Browse files Browse the repository at this point in the history
Signed-off-by: Daniel Widdis <widdis@gmail.com>
  • Loading branch information
dbwiddis committed Dec 11, 2023
1 parent 2c1401c commit 34724d5
Show file tree
Hide file tree
Showing 9 changed files with 460 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,14 @@
import org.opensearch.flowframework.common.FlowFrameworkFeatureEnabledSetting;
import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler;
import org.opensearch.flowframework.rest.RestCreateWorkflowAction;
import org.opensearch.flowframework.rest.RestDeprovisionWorkflowAction;
import org.opensearch.flowframework.rest.RestGetWorkflowAction;
import org.opensearch.flowframework.rest.RestProvisionWorkflowAction;
import org.opensearch.flowframework.rest.RestSearchWorkflowAction;
import org.opensearch.flowframework.transport.CreateWorkflowAction;
import org.opensearch.flowframework.transport.CreateWorkflowTransportAction;
import org.opensearch.flowframework.transport.DeprovisionWorkflowAction;
import org.opensearch.flowframework.transport.DeprovisionWorkflowTransportAction;
import org.opensearch.flowframework.transport.GetWorkflowAction;
import org.opensearch.flowframework.transport.GetWorkflowTransportAction;
import org.opensearch.flowframework.transport.ProvisionWorkflowAction;
Expand Down Expand Up @@ -124,6 +127,7 @@ public List<RestHandler> getRestHandlers(
return ImmutableList.of(
new RestCreateWorkflowAction(flowFrameworkFeatureEnabledSetting, settings, clusterService),
new RestProvisionWorkflowAction(flowFrameworkFeatureEnabledSetting),
new RestDeprovisionWorkflowAction(flowFrameworkFeatureEnabledSetting),
new RestSearchWorkflowAction(flowFrameworkFeatureEnabledSetting),
new RestGetWorkflowAction(flowFrameworkFeatureEnabledSetting)
);
Expand All @@ -134,6 +138,7 @@ public List<RestHandler> getRestHandlers(
return ImmutableList.of(
new ActionHandler<>(CreateWorkflowAction.INSTANCE, CreateWorkflowTransportAction.class),
new ActionHandler<>(ProvisionWorkflowAction.INSTANCE, ProvisionWorkflowTransportAction.class),
new ActionHandler<>(DeprovisionWorkflowAction.INSTANCE, DeprovisionWorkflowTransportAction.class),
new ActionHandler<>(SearchWorkflowAction.INSTANCE, SearchWorkflowTransportAction.class),
new ActionHandler<>(GetWorkflowAction.INSTANCE, GetWorkflowTransportAction.class)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,32 +23,34 @@
public enum WorkflowResources {

/** official workflow step name for creating a connector and associated created resource */
CREATE_CONNECTOR("create_connector", "connector_id"),
CREATE_CONNECTOR("create_connector", "connector_id", "delete_connetor"),
/** official workflow step name for registering a remote model and associated created resource */
REGISTER_REMOTE_MODEL("register_remote_model", "model_id"),
REGISTER_REMOTE_MODEL("register_remote_model", "model_id", "delete_model"),
/** official workflow step name for registering a local model and associated created resource */
REGISTER_LOCAL_MODEL("register_local_model", "model_id"),
REGISTER_LOCAL_MODEL("register_local_model", "model_id", "delete_model"),
/** official workflow step name for registering a model group and associated created resource */
REGISTER_MODEL_GROUP("register_model_group", "model_group_id"),
REGISTER_MODEL_GROUP("register_model_group", "model_group_id", null), // TODO
/** official workflow step name for deploying a model and associated created resource */
DEPLOY_MODEL("deploy_model", "model_id"),
DEPLOY_MODEL("deploy_model", "model_id", "undeploy_model"),
/** official workflow step name for creating an ingest-pipeline and associated created resource */
CREATE_INGEST_PIPELINE("create_ingest_pipeline", "pipeline_id"),
CREATE_INGEST_PIPELINE("create_ingest_pipeline", "pipeline_id", null), // TODO
/** official workflow step name for creating an index and associated created resource */
CREATE_INDEX("create_index", "index_name"),
CREATE_INDEX("create_index", "index_name", null), // TODO
/** official workflow step name for register an agent and the associated created resource */
REGISTER_AGENT("register_agent", "agent_id");
REGISTER_AGENT("register_agent", "agent_id", "delete_agent");

private final String workflowStep;
private final String resourceCreated;
private final String deprovisionStep;
private static final Logger logger = LogManager.getLogger(WorkflowResources.class);
private static final Set<String> allResources = Stream.of(values())
.map(WorkflowResources::getResourceCreated)
.collect(Collectors.toSet());

WorkflowResources(String workflowStep, String resourceCreated) {
WorkflowResources(String workflowStep, String resourceCreated, String deprovisionStep) {
this.workflowStep = workflowStep;
this.resourceCreated = resourceCreated;
this.deprovisionStep = deprovisionStep;
}

/**
Expand All @@ -68,7 +70,15 @@ public String getResourceCreated() {
}

/**
* gets the resources created type based on the workflowStep
* Returns the deprovisionStep for the given enum Constant
* @return the deprovisionStep of this data.
*/
public String getDeprovisionStep() {
return deprovisionStep;
}

/**
* Gets the resources created type based on the workflowStep.
* @param workflowStep workflow step name
* @return the resource that will be created
* @throws FlowFrameworkException if workflow step doesn't exist in enum
Expand All @@ -85,6 +95,24 @@ public static String getResourceByWorkflowStep(String workflowStep) throws FlowF
throw new FlowFrameworkException("Unable to find resource type for step: " + workflowStep, RestStatus.BAD_REQUEST);
}

/**
* Gets the deprovision step type based on the workflowStep.
* @param workflowStep workflow step name
* @return the corresponding step to deprovision
* @throws FlowFrameworkException if workflow step doesn't exist in enum
*/
public static String getDeprovisionStepByWorkflowStep(String workflowStep) throws FlowFrameworkException {
if (workflowStep != null && !workflowStep.isEmpty()) {
for (WorkflowResources mapping : values()) {
if (mapping.getWorkflowStep().equals(workflowStep)) {
return mapping.getDeprovisionStep();
}
}
}
logger.error("Unable to find deprovision step for step: " + workflowStep);
throw new FlowFrameworkException("Unable to find deprovision step for step: " + workflowStep, RestStatus.BAD_REQUEST);
}

/**
* Returns all the possible resource created types in enum
* @return a set of all the resource created types
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework.rest;

import com.google.common.collect.ImmutableList;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.ExceptionsHelper;
import org.opensearch.client.node.NodeClient;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.flowframework.common.FlowFrameworkFeatureEnabledSetting;
import org.opensearch.flowframework.exception.FlowFrameworkException;
import org.opensearch.flowframework.transport.DeprovisionWorkflowAction;
import org.opensearch.flowframework.transport.WorkflowRequest;
import org.opensearch.rest.BaseRestHandler;
import org.opensearch.rest.BytesRestResponse;
import org.opensearch.rest.RestRequest;

import java.io.IOException;
import java.util.List;
import java.util.Locale;

import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_ID;
import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_URI;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.FLOW_FRAMEWORK_ENABLED;

/**
* Rest Action to facilitate requests to de-provision a workflow
*/
public class RestDeprovisionWorkflowAction extends BaseRestHandler {

private static final String DEPROVISION_WORKFLOW_ACTION = "deprovision_workflow";
private static final Logger logger = LogManager.getLogger(RestDeprovisionWorkflowAction.class);
private final FlowFrameworkFeatureEnabledSetting flowFrameworkFeatureEnabledSetting;

/**
* Instantiates a new RestDeprovisionWorkflowAction
* @param flowFrameworkFeatureEnabledSetting Whether this API is enabled
*/
public RestDeprovisionWorkflowAction(FlowFrameworkFeatureEnabledSetting flowFrameworkFeatureEnabledSetting) {
this.flowFrameworkFeatureEnabledSetting = flowFrameworkFeatureEnabledSetting;
}

@Override
public String getName() {
return DEPROVISION_WORKFLOW_ACTION;
}

@Override
protected BaseRestHandler.RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {

try {
if (!flowFrameworkFeatureEnabledSetting.isFlowFrameworkEnabled()) {
throw new FlowFrameworkException(
"This API is disabled. To enable it, update the setting [" + FLOW_FRAMEWORK_ENABLED.getKey() + "] to true.",
RestStatus.FORBIDDEN
);
}

// Validate content
if (request.hasContent()) {
throw new FlowFrameworkException("No request body is required", RestStatus.BAD_REQUEST);
}
// Validate params
String workflowId = request.param(WORKFLOW_ID);
if (workflowId == null) {
throw new FlowFrameworkException("workflow_id cannot be null", RestStatus.BAD_REQUEST);
}
WorkflowRequest workflowRequest = new WorkflowRequest(workflowId, null);

return channel -> client.execute(DeprovisionWorkflowAction.INSTANCE, workflowRequest, ActionListener.wrap(response -> {
XContentBuilder builder = response.toXContent(channel.newBuilder(), ToXContent.EMPTY_PARAMS);
channel.sendResponse(new BytesRestResponse(RestStatus.OK, builder));
}, exception -> {
try {
FlowFrameworkException ex = new FlowFrameworkException(exception.getMessage(), ExceptionsHelper.status(exception));
XContentBuilder exceptionBuilder = ex.toXContent(channel.newErrorBuilder(), ToXContent.EMPTY_PARAMS);
channel.sendResponse(new BytesRestResponse(ex.getRestStatus(), exceptionBuilder));

} catch (IOException e) {
logger.error("Failed to send back provision workflow exception", e);
channel.sendResponse(new BytesRestResponse(ExceptionsHelper.status(e), e.getMessage()));
}
}));

} catch (FlowFrameworkException ex) {
return channel -> channel.sendResponse(
new BytesRestResponse(ex.getRestStatus(), ex.toXContent(channel.newErrorBuilder(), ToXContent.EMPTY_PARAMS))
);
}
}

@Override
public List<Route> routes() {
return ImmutableList.of(
new Route(RestRequest.Method.GET, String.format(Locale.ROOT, "%s/{%s}/%s", WORKFLOW_URI, WORKFLOW_ID, "_deprovision"))
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ protected BaseRestHandler.RestChannelConsumer prepareRequest(RestRequest request

// Validate content
if (request.hasContent()) {
throw new FlowFrameworkException("No request body present", RestStatus.BAD_REQUEST);
throw new FlowFrameworkException("No request body is required", RestStatus.BAD_REQUEST);
}
// Validate params
String workflowId = request.param(WORKFLOW_ID);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework.transport;

import org.opensearch.action.ActionType;

import static org.opensearch.flowframework.common.CommonValue.TRANSPORT_ACTION_NAME_PREFIX;

/**
* External Action for public facing RestDeprovisionWorkflowAction
*/
public class DeprovisionWorkflowAction extends ActionType<WorkflowResponse> {
/** The name of this action */
public static final String NAME = TRANSPORT_ACTION_NAME_PREFIX + "workflow/deprovision";
/** An instance of this action */
public static final DeprovisionWorkflowAction INSTANCE = new DeprovisionWorkflowAction();

private DeprovisionWorkflowAction() {
super(NAME, WorkflowResponse::new);
}
}
Loading

0 comments on commit 34724d5

Please sign in to comment.