Skip to content

Commit

Permalink
[Feature/agent_framework] Deprovision API (opensearch-project#271)
Browse files Browse the repository at this point in the history
* Deprovision REST and Transport Actions

Signed-off-by: Daniel Widdis <widdis@gmail.com>

* Fix errors you find actually running the code

Signed-off-by: Daniel Widdis <widdis@gmail.com>

* Add test for Rest deprovision action

Signed-off-by: Daniel Widdis <widdis@gmail.com>

* Initial copypaste of Deprovision Transport Action Test

Signed-off-by: Daniel Widdis <widdis@gmail.com>

* Add some delays to let deletions propagate, reset workflow state

Signed-off-by: Daniel Widdis <widdis@gmail.com>

* Improved deprovisioning results and status updates

Signed-off-by: Daniel Widdis <widdis@gmail.com>

* Fix bug in resource created parsing

Signed-off-by: Daniel Widdis <widdis@gmail.com>

* Completed test implementations

Signed-off-by: Daniel Widdis <widdis@gmail.com>

* Fixes after rebase

Signed-off-by: Daniel Widdis <widdis@gmail.com>

---------

Signed-off-by: Daniel Widdis <widdis@gmail.com>
  • Loading branch information
dbwiddis committed Dec 15, 2023
1 parent 531ef4c commit 8069ea8
Show file tree
Hide file tree
Showing 10 changed files with 857 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,16 @@
import org.opensearch.flowframework.common.FlowFrameworkFeatureEnabledSetting;
import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler;
import org.opensearch.flowframework.rest.RestCreateWorkflowAction;
import org.opensearch.flowframework.rest.RestDeprovisionWorkflowAction;
import org.opensearch.flowframework.rest.RestGetWorkflowAction;
import org.opensearch.flowframework.rest.RestGetWorkflowStateAction;
import org.opensearch.flowframework.rest.RestProvisionWorkflowAction;
import org.opensearch.flowframework.rest.RestSearchWorkflowAction;
import org.opensearch.flowframework.rest.RestSearchWorkflowStateAction;
import org.opensearch.flowframework.transport.CreateWorkflowAction;
import org.opensearch.flowframework.transport.CreateWorkflowTransportAction;
import org.opensearch.flowframework.transport.DeprovisionWorkflowAction;
import org.opensearch.flowframework.transport.DeprovisionWorkflowTransportAction;
import org.opensearch.flowframework.transport.GetWorkflowAction;
import org.opensearch.flowframework.transport.GetWorkflowStateAction;
import org.opensearch.flowframework.transport.GetWorkflowStateTransportAction;
Expand Down Expand Up @@ -131,6 +134,7 @@ public List<RestHandler> getRestHandlers(
return ImmutableList.of(
new RestCreateWorkflowAction(flowFrameworkFeatureEnabledSetting, settings, clusterService),
new RestProvisionWorkflowAction(flowFrameworkFeatureEnabledSetting),
new RestDeprovisionWorkflowAction(flowFrameworkFeatureEnabledSetting),
new RestSearchWorkflowAction(flowFrameworkFeatureEnabledSetting),
new RestGetWorkflowStateAction(flowFrameworkFeatureEnabledSetting),
new RestGetWorkflowAction(flowFrameworkFeatureEnabledSetting),
Expand All @@ -143,6 +147,7 @@ public List<RestHandler> getRestHandlers(
return ImmutableList.of(
new ActionHandler<>(CreateWorkflowAction.INSTANCE, CreateWorkflowTransportAction.class),
new ActionHandler<>(ProvisionWorkflowAction.INSTANCE, ProvisionWorkflowTransportAction.class),
new ActionHandler<>(DeprovisionWorkflowAction.INSTANCE, DeprovisionWorkflowTransportAction.class),
new ActionHandler<>(SearchWorkflowAction.INSTANCE, SearchWorkflowTransportAction.class),
new ActionHandler<>(GetWorkflowStateAction.INSTANCE, GetWorkflowStateTransportAction.class),
new ActionHandler<>(GetWorkflowAction.INSTANCE, GetWorkflowTransportAction.class),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,32 +23,34 @@
public enum WorkflowResources {

/** official workflow step name for creating a connector and associated created resource */
CREATE_CONNECTOR("create_connector", "connector_id"),
CREATE_CONNECTOR("create_connector", "connector_id", "delete_connector"),
/** official workflow step name for registering a remote model and associated created resource */
REGISTER_REMOTE_MODEL("register_remote_model", "model_id"),
REGISTER_REMOTE_MODEL("register_remote_model", "model_id", "delete_model"),
/** official workflow step name for registering a local model and associated created resource */
REGISTER_LOCAL_MODEL("register_local_model", "model_id"),
REGISTER_LOCAL_MODEL("register_local_model", "model_id", "delete_model"),
/** official workflow step name for registering a model group and associated created resource */
REGISTER_MODEL_GROUP("register_model_group", "model_group_id"),
REGISTER_MODEL_GROUP("register_model_group", "model_group_id", null), // TODO
/** official workflow step name for deploying a model and associated created resource */
DEPLOY_MODEL("deploy_model", "model_id"),
DEPLOY_MODEL("deploy_model", "model_id", "undeploy_model"),
/** official workflow step name for creating an ingest-pipeline and associated created resource */
CREATE_INGEST_PIPELINE("create_ingest_pipeline", "pipeline_id"),
CREATE_INGEST_PIPELINE("create_ingest_pipeline", "pipeline_id", null), // TODO
/** official workflow step name for creating an index and associated created resource */
CREATE_INDEX("create_index", "index_name"),
CREATE_INDEX("create_index", "index_name", null), // TODO
/** official workflow step name for register an agent and the associated created resource */
REGISTER_AGENT("register_agent", "agent_id");
REGISTER_AGENT("register_agent", "agent_id", "delete_agent");

private final String workflowStep;
private final String resourceCreated;
private final String deprovisionStep;
private static final Logger logger = LogManager.getLogger(WorkflowResources.class);
private static final Set<String> allResources = Stream.of(values())
.map(WorkflowResources::getResourceCreated)
.collect(Collectors.toSet());

WorkflowResources(String workflowStep, String resourceCreated) {
WorkflowResources(String workflowStep, String resourceCreated, String deprovisionStep) {
this.workflowStep = workflowStep;
this.resourceCreated = resourceCreated;
this.deprovisionStep = deprovisionStep;
}

/**
Expand All @@ -68,15 +70,23 @@ public String getResourceCreated() {
}

/**
* gets the resources created type based on the workflowStep
* Returns the deprovisionStep for the given enum Constant
* @return the deprovisionStep of this data.
*/
public String getDeprovisionStep() {
return deprovisionStep;
}

/**
* Gets the resources created type based on the workflowStep.
* @param workflowStep workflow step name
* @return the resource that will be created
* @throws FlowFrameworkException if workflow step doesn't exist in enum
*/
public static String getResourceByWorkflowStep(String workflowStep) throws FlowFrameworkException {
if (workflowStep != null && !workflowStep.isEmpty()) {
for (WorkflowResources mapping : values()) {
if (mapping.getWorkflowStep().equals(workflowStep)) {
if (workflowStep.equals(mapping.getWorkflowStep()) || workflowStep.equals(mapping.getDeprovisionStep())) {
return mapping.getResourceCreated();
}
}
Expand All @@ -85,6 +95,24 @@ public static String getResourceByWorkflowStep(String workflowStep) throws FlowF
throw new FlowFrameworkException("Unable to find resource type for step: " + workflowStep, RestStatus.BAD_REQUEST);
}

/**
* Gets the deprovision step type based on the workflowStep.
* @param workflowStep workflow step name
* @return the corresponding step to deprovision
* @throws FlowFrameworkException if workflow step doesn't exist in enum
*/
public static String getDeprovisionStepByWorkflowStep(String workflowStep) throws FlowFrameworkException {
if (workflowStep != null && !workflowStep.isEmpty()) {
for (WorkflowResources mapping : values()) {
if (mapping.getWorkflowStep().equals(workflowStep)) {
return mapping.getDeprovisionStep();
}
}
}
logger.error("Unable to find deprovision step for step: " + workflowStep);
throw new FlowFrameworkException("Unable to find deprovision step for step: " + workflowStep, RestStatus.BAD_REQUEST);
}

/**
* Returns all the possible resource created types in enum
* @return a set of all the resource created types
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,15 +176,14 @@ public static ResourceCreated parse(XContentParser parser) throws IOException {

@Override
public String toString() {
return "resources_Created [workflow_step_name= "
return "resources_Created [workflow_step_name="
+ workflowStepName
+ ", workflow_step_id= "
+ ", workflow_step_id="
+ workflowStepId
+ ", resource_type= "
+ ", resource_type="
+ resourceType
+ ", resource_id= "
+ ", resource_id="
+ resourceId
+ "]";
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework.rest;

import com.google.common.collect.ImmutableList;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.ExceptionsHelper;
import org.opensearch.client.node.NodeClient;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.flowframework.common.FlowFrameworkFeatureEnabledSetting;
import org.opensearch.flowframework.exception.FlowFrameworkException;
import org.opensearch.flowframework.transport.DeprovisionWorkflowAction;
import org.opensearch.flowframework.transport.WorkflowRequest;
import org.opensearch.rest.BaseRestHandler;
import org.opensearch.rest.BytesRestResponse;
import org.opensearch.rest.RestRequest;

import java.io.IOException;
import java.util.List;
import java.util.Locale;

import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_ID;
import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_URI;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.FLOW_FRAMEWORK_ENABLED;

/**
* Rest Action to facilitate requests to de-provision a workflow
*/
public class RestDeprovisionWorkflowAction extends BaseRestHandler {

private static final String DEPROVISION_WORKFLOW_ACTION = "deprovision_workflow";
private static final Logger logger = LogManager.getLogger(RestDeprovisionWorkflowAction.class);
private final FlowFrameworkFeatureEnabledSetting flowFrameworkFeatureEnabledSetting;

/**
* Instantiates a new RestDeprovisionWorkflowAction
* @param flowFrameworkFeatureEnabledSetting Whether this API is enabled
*/
public RestDeprovisionWorkflowAction(FlowFrameworkFeatureEnabledSetting flowFrameworkFeatureEnabledSetting) {
this.flowFrameworkFeatureEnabledSetting = flowFrameworkFeatureEnabledSetting;
}

@Override
public String getName() {
return DEPROVISION_WORKFLOW_ACTION;
}

@Override
protected BaseRestHandler.RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {

try {
if (!flowFrameworkFeatureEnabledSetting.isFlowFrameworkEnabled()) {
throw new FlowFrameworkException(
"This API is disabled. To enable it, update the setting [" + FLOW_FRAMEWORK_ENABLED.getKey() + "] to true.",
RestStatus.FORBIDDEN
);
}
// Validate content
if (request.hasContent()) {
throw new FlowFrameworkException("No request body is required", RestStatus.BAD_REQUEST);
}
// Validate params
String workflowId = request.param(WORKFLOW_ID);
if (workflowId == null) {
throw new FlowFrameworkException("workflow_id cannot be null", RestStatus.BAD_REQUEST);
}
WorkflowRequest workflowRequest = new WorkflowRequest(workflowId, null);

return channel -> client.execute(DeprovisionWorkflowAction.INSTANCE, workflowRequest, ActionListener.wrap(response -> {
XContentBuilder builder = response.toXContent(channel.newBuilder(), ToXContent.EMPTY_PARAMS);
channel.sendResponse(new BytesRestResponse(RestStatus.OK, builder));
}, exception -> {
try {
FlowFrameworkException ex = exception instanceof FlowFrameworkException
? (FlowFrameworkException) exception
: new FlowFrameworkException(exception.getMessage(), ExceptionsHelper.status(exception));
XContentBuilder exceptionBuilder = ex.toXContent(channel.newErrorBuilder(), ToXContent.EMPTY_PARAMS);
channel.sendResponse(new BytesRestResponse(ex.getRestStatus(), exceptionBuilder));
} catch (IOException e) {
logger.error("Failed to send back provision workflow exception", e);
channel.sendResponse(new BytesRestResponse(ExceptionsHelper.status(e), e.getMessage()));
}
}));

} catch (FlowFrameworkException ex) {
return channel -> channel.sendResponse(
new BytesRestResponse(ex.getRestStatus(), ex.toXContent(channel.newErrorBuilder(), ToXContent.EMPTY_PARAMS))
);
}
}

@Override
public List<Route> routes() {
return ImmutableList.of(
new Route(RestRequest.Method.POST, String.format(Locale.ROOT, "%s/{%s}/%s", WORKFLOW_URI, WORKFLOW_ID, "_deprovision"))
);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework.transport;

import org.opensearch.action.ActionType;

import static org.opensearch.flowframework.common.CommonValue.TRANSPORT_ACTION_NAME_PREFIX;

/**
* External Action for public facing RestDeprovisionWorkflowAction
*/
public class DeprovisionWorkflowAction extends ActionType<WorkflowResponse> {
/** The name of this action */
public static final String NAME = TRANSPORT_ACTION_NAME_PREFIX + "workflow/deprovision";
/** An instance of this action */
public static final DeprovisionWorkflowAction INSTANCE = new DeprovisionWorkflowAction();

private DeprovisionWorkflowAction() {
super(NAME, WorkflowResponse::new);
}
}
Loading

0 comments on commit 8069ea8

Please sign in to comment.