Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature/agent_framework] Deprovision API #271

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,16 @@
import org.opensearch.flowframework.common.FlowFrameworkFeatureEnabledSetting;
import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler;
import org.opensearch.flowframework.rest.RestCreateWorkflowAction;
import org.opensearch.flowframework.rest.RestDeprovisionWorkflowAction;
import org.opensearch.flowframework.rest.RestGetWorkflowAction;
import org.opensearch.flowframework.rest.RestGetWorkflowStateAction;
import org.opensearch.flowframework.rest.RestProvisionWorkflowAction;
import org.opensearch.flowframework.rest.RestSearchWorkflowAction;
import org.opensearch.flowframework.rest.RestSearchWorkflowStateAction;
import org.opensearch.flowframework.transport.CreateWorkflowAction;
import org.opensearch.flowframework.transport.CreateWorkflowTransportAction;
import org.opensearch.flowframework.transport.DeprovisionWorkflowAction;
import org.opensearch.flowframework.transport.DeprovisionWorkflowTransportAction;
import org.opensearch.flowframework.transport.GetWorkflowAction;
import org.opensearch.flowframework.transport.GetWorkflowStateAction;
import org.opensearch.flowframework.transport.GetWorkflowStateTransportAction;
Expand Down Expand Up @@ -131,6 +134,7 @@ public List<RestHandler> getRestHandlers(
return ImmutableList.of(
new RestCreateWorkflowAction(flowFrameworkFeatureEnabledSetting, settings, clusterService),
new RestProvisionWorkflowAction(flowFrameworkFeatureEnabledSetting),
new RestDeprovisionWorkflowAction(flowFrameworkFeatureEnabledSetting),
new RestSearchWorkflowAction(flowFrameworkFeatureEnabledSetting),
new RestGetWorkflowStateAction(flowFrameworkFeatureEnabledSetting),
new RestGetWorkflowAction(flowFrameworkFeatureEnabledSetting),
Expand All @@ -143,6 +147,7 @@ public List<RestHandler> getRestHandlers(
return ImmutableList.of(
new ActionHandler<>(CreateWorkflowAction.INSTANCE, CreateWorkflowTransportAction.class),
new ActionHandler<>(ProvisionWorkflowAction.INSTANCE, ProvisionWorkflowTransportAction.class),
new ActionHandler<>(DeprovisionWorkflowAction.INSTANCE, DeprovisionWorkflowTransportAction.class),
new ActionHandler<>(SearchWorkflowAction.INSTANCE, SearchWorkflowTransportAction.class),
new ActionHandler<>(GetWorkflowStateAction.INSTANCE, GetWorkflowStateTransportAction.class),
new ActionHandler<>(GetWorkflowAction.INSTANCE, GetWorkflowTransportAction.class),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,32 +23,34 @@
public enum WorkflowResources {

/** official workflow step name for creating a connector and associated created resource */
CREATE_CONNECTOR("create_connector", "connector_id"),
CREATE_CONNECTOR("create_connector", "connector_id", "delete_connector"),
/** official workflow step name for registering a remote model and associated created resource */
REGISTER_REMOTE_MODEL("register_remote_model", "model_id"),
REGISTER_REMOTE_MODEL("register_remote_model", "model_id", "delete_model"),
/** official workflow step name for registering a local model and associated created resource */
REGISTER_LOCAL_MODEL("register_local_model", "model_id"),
REGISTER_LOCAL_MODEL("register_local_model", "model_id", "delete_model"),
/** official workflow step name for registering a model group and associated created resource */
REGISTER_MODEL_GROUP("register_model_group", "model_group_id"),
REGISTER_MODEL_GROUP("register_model_group", "model_group_id", null), // TODO
/** official workflow step name for deploying a model and associated created resource */
DEPLOY_MODEL("deploy_model", "model_id"),
DEPLOY_MODEL("deploy_model", "model_id", "undeploy_model"),
/** official workflow step name for creating an ingest-pipeline and associated created resource */
CREATE_INGEST_PIPELINE("create_ingest_pipeline", "pipeline_id"),
CREATE_INGEST_PIPELINE("create_ingest_pipeline", "pipeline_id", null), // TODO
/** official workflow step name for creating an index and associated created resource */
CREATE_INDEX("create_index", "index_name"),
CREATE_INDEX("create_index", "index_name", null), // TODO
/** official workflow step name for register an agent and the associated created resource */
REGISTER_AGENT("register_agent", "agent_id");
REGISTER_AGENT("register_agent", "agent_id", "delete_agent");

private final String workflowStep;
private final String resourceCreated;
private final String deprovisionStep;
private static final Logger logger = LogManager.getLogger(WorkflowResources.class);
private static final Set<String> allResources = Stream.of(values())
.map(WorkflowResources::getResourceCreated)
.collect(Collectors.toSet());

WorkflowResources(String workflowStep, String resourceCreated) {
WorkflowResources(String workflowStep, String resourceCreated, String deprovisionStep) {
this.workflowStep = workflowStep;
this.resourceCreated = resourceCreated;
this.deprovisionStep = deprovisionStep;
}

/**
Expand All @@ -68,15 +70,23 @@ public String getResourceCreated() {
}

/**
* gets the resources created type based on the workflowStep
* Returns the deprovisionStep for the given enum Constant
* @return the deprovisionStep of this data.
*/
public String getDeprovisionStep() {
return deprovisionStep;
}

/**
* Gets the resources created type based on the workflowStep.
* @param workflowStep workflow step name
* @return the resource that will be created
* @throws FlowFrameworkException if workflow step doesn't exist in enum
*/
public static String getResourceByWorkflowStep(String workflowStep) throws FlowFrameworkException {
if (workflowStep != null && !workflowStep.isEmpty()) {
for (WorkflowResources mapping : values()) {
if (mapping.getWorkflowStep().equals(workflowStep)) {
if (workflowStep.equals(mapping.getWorkflowStep()) || workflowStep.equals(mapping.getDeprovisionStep())) {
return mapping.getResourceCreated();
}
}
Expand All @@ -85,6 +95,24 @@ public static String getResourceByWorkflowStep(String workflowStep) throws FlowF
throw new FlowFrameworkException("Unable to find resource type for step: " + workflowStep, RestStatus.BAD_REQUEST);
}

/**
* Gets the deprovision step type based on the workflowStep.
* @param workflowStep workflow step name
* @return the corresponding step to deprovision
* @throws FlowFrameworkException if workflow step doesn't exist in enum
*/
public static String getDeprovisionStepByWorkflowStep(String workflowStep) throws FlowFrameworkException {
if (workflowStep != null && !workflowStep.isEmpty()) {
for (WorkflowResources mapping : values()) {
if (mapping.getWorkflowStep().equals(workflowStep)) {
return mapping.getDeprovisionStep();
}
}
}
logger.error("Unable to find deprovision step for step: " + workflowStep);
throw new FlowFrameworkException("Unable to find deprovision step for step: " + workflowStep, RestStatus.BAD_REQUEST);
}

/**
* Returns all the possible resource created types in enum
* @return a set of all the resource created types
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,15 +176,14 @@ public static ResourceCreated parse(XContentParser parser) throws IOException {

@Override
public String toString() {
return "resources_Created [workflow_step_name= "
return "resources_Created [workflow_step_name="
+ workflowStepName
+ ", workflow_step_id= "
+ ", workflow_step_id="
+ workflowStepId
+ ", resource_type= "
+ ", resource_type="
+ resourceType
+ ", resource_id= "
+ ", resource_id="
+ resourceId
+ "]";
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework.rest;

import com.google.common.collect.ImmutableList;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.ExceptionsHelper;
import org.opensearch.client.node.NodeClient;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.flowframework.common.FlowFrameworkFeatureEnabledSetting;
import org.opensearch.flowframework.exception.FlowFrameworkException;
import org.opensearch.flowframework.transport.DeprovisionWorkflowAction;
import org.opensearch.flowframework.transport.WorkflowRequest;
import org.opensearch.rest.BaseRestHandler;
import org.opensearch.rest.BytesRestResponse;
import org.opensearch.rest.RestRequest;

import java.io.IOException;
import java.util.List;
import java.util.Locale;

import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_ID;
import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_URI;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.FLOW_FRAMEWORK_ENABLED;

/**
* Rest Action to facilitate requests to de-provision a workflow
*/
public class RestDeprovisionWorkflowAction extends BaseRestHandler {

private static final String DEPROVISION_WORKFLOW_ACTION = "deprovision_workflow";
private static final Logger logger = LogManager.getLogger(RestDeprovisionWorkflowAction.class);
private final FlowFrameworkFeatureEnabledSetting flowFrameworkFeatureEnabledSetting;

/**
* Instantiates a new RestDeprovisionWorkflowAction
* @param flowFrameworkFeatureEnabledSetting Whether this API is enabled
*/
public RestDeprovisionWorkflowAction(FlowFrameworkFeatureEnabledSetting flowFrameworkFeatureEnabledSetting) {
this.flowFrameworkFeatureEnabledSetting = flowFrameworkFeatureEnabledSetting;
}

@Override
public String getName() {
return DEPROVISION_WORKFLOW_ACTION;
}

@Override
protected BaseRestHandler.RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {

try {
if (!flowFrameworkFeatureEnabledSetting.isFlowFrameworkEnabled()) {
throw new FlowFrameworkException(
"This API is disabled. To enable it, update the setting [" + FLOW_FRAMEWORK_ENABLED.getKey() + "] to true.",
RestStatus.FORBIDDEN
);
}
// Validate content
if (request.hasContent()) {
throw new FlowFrameworkException("No request body is required", RestStatus.BAD_REQUEST);
}
// Validate params
String workflowId = request.param(WORKFLOW_ID);
if (workflowId == null) {
throw new FlowFrameworkException("workflow_id cannot be null", RestStatus.BAD_REQUEST);
}
WorkflowRequest workflowRequest = new WorkflowRequest(workflowId, null);

return channel -> client.execute(DeprovisionWorkflowAction.INSTANCE, workflowRequest, ActionListener.wrap(response -> {
XContentBuilder builder = response.toXContent(channel.newBuilder(), ToXContent.EMPTY_PARAMS);
channel.sendResponse(new BytesRestResponse(RestStatus.OK, builder));
}, exception -> {
try {
FlowFrameworkException ex = exception instanceof FlowFrameworkException
? (FlowFrameworkException) exception
: new FlowFrameworkException(exception.getMessage(), ExceptionsHelper.status(exception));
XContentBuilder exceptionBuilder = ex.toXContent(channel.newErrorBuilder(), ToXContent.EMPTY_PARAMS);
channel.sendResponse(new BytesRestResponse(ex.getRestStatus(), exceptionBuilder));
} catch (IOException e) {
logger.error("Failed to send back provision workflow exception", e);
channel.sendResponse(new BytesRestResponse(ExceptionsHelper.status(e), e.getMessage()));
}
}));

} catch (FlowFrameworkException ex) {
return channel -> channel.sendResponse(
new BytesRestResponse(ex.getRestStatus(), ex.toXContent(channel.newErrorBuilder(), ToXContent.EMPTY_PARAMS))
);
}
}

@Override
public List<Route> routes() {
return ImmutableList.of(
new Route(RestRequest.Method.POST, String.format(Locale.ROOT, "%s/{%s}/%s", WORKFLOW_URI, WORKFLOW_ID, "_deprovision"))
);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework.transport;

import org.opensearch.action.ActionType;

import static org.opensearch.flowframework.common.CommonValue.TRANSPORT_ACTION_NAME_PREFIX;

/**
* External Action for public facing RestDeprovisionWorkflowAction
*/
public class DeprovisionWorkflowAction extends ActionType<WorkflowResponse> {
/** The name of this action */
public static final String NAME = TRANSPORT_ACTION_NAME_PREFIX + "workflow/deprovision";
/** An instance of this action */
public static final DeprovisionWorkflowAction INSTANCE = new DeprovisionWorkflowAction();

private DeprovisionWorkflowAction() {
super(NAME, WorkflowResponse::new);
}
}
Loading
Loading