From 697b9b7a50b5d1f22911135641180817fd06bf1d Mon Sep 17 00:00:00 2001 From: Owais Kazi Date: Tue, 2 May 2023 13:07:16 -0400 Subject: [PATCH] [Extensions] Migrates Delete Detector Results (#881) * Migrates Delete Detector Results Signed-off-by: Owais Kazi * Integrated the correct response for the results Signed-off-by: Owais Kazi * Added test Signed-off-by: Owais Kazi * Updated with new method name Signed-off-by: Owais Kazi * Addressed PR Comments Signed-off-by: Owais Kazi --------- Signed-off-by: Owais Kazi --- .../ad/AnomalyDetectorExtension.java | 5 ++ .../rest/RestDeleteAnomalyResultsAction.java | 73 +++++++++++++------ .../DeleteAnomalyResultsTransportAction.java | 40 +++++----- ...eteAnomalyResultsTransportActionTests.java | 18 ++++- 4 files changed, 95 insertions(+), 41 deletions(-) diff --git a/src/main/java/org/opensearch/ad/AnomalyDetectorExtension.java b/src/main/java/org/opensearch/ad/AnomalyDetectorExtension.java index e69827bcc..fed3688bb 100644 --- a/src/main/java/org/opensearch/ad/AnomalyDetectorExtension.java +++ b/src/main/java/org/opensearch/ad/AnomalyDetectorExtension.java @@ -57,6 +57,7 @@ import org.opensearch.ad.ratelimit.ResultWriteWorker; import org.opensearch.ad.rest.RestAnomalyDetectorJobAction; import org.opensearch.ad.rest.RestDeleteAnomalyDetectorAction; +import org.opensearch.ad.rest.RestDeleteAnomalyResultsAction; import org.opensearch.ad.rest.RestGetAnomalyDetectorAction; import org.opensearch.ad.rest.RestIndexAnomalyDetectorAction; import org.opensearch.ad.rest.RestPreviewAnomalyDetectorAction; @@ -100,6 +101,8 @@ import org.opensearch.ad.transport.AnomalyResultTransportAction; import org.opensearch.ad.transport.DeleteAnomalyDetectorAction; import org.opensearch.ad.transport.DeleteAnomalyDetectorTransportAction; +import org.opensearch.ad.transport.DeleteAnomalyResultsAction; +import org.opensearch.ad.transport.DeleteAnomalyResultsTransportAction; import org.opensearch.ad.transport.DeleteModelAction; import org.opensearch.ad.transport.DeleteModelTransportAction; import org.opensearch.ad.transport.EntityResultAction; @@ -217,6 +220,7 @@ public List getExtensionRestHandlers() { new RestGetAnomalyDetectorAction(extensionsRunner(), restClient()), new RestAnomalyDetectorJobAction(extensionsRunner(), restClient()), new RestDeleteAnomalyDetectorAction(extensionsRunner(), restClient()), + new RestDeleteAnomalyResultsAction(extensionsRunner(), restClient()), new RestStatsAnomalyDetectorAction(extensionsRunner(), restClient(), adStats, nodeFilter), new RestSearchAnomalyDetectorAction(extensionsRunner(), restClient()), new RestSearchAnomalyResultAction(extensionsRunner(), restClient()), @@ -794,6 +798,7 @@ public List> getExecutorBuilders(Settings settings) { new ActionHandler<>(EntityResultAction.INSTANCE, EntityResultTransportAction.class), new ActionHandler<>(ProfileAction.INSTANCE, ProfileTransportAction.class), new ActionHandler<>(DeleteAnomalyDetectorAction.INSTANCE, DeleteAnomalyDetectorTransportAction.class), + new ActionHandler<>(DeleteAnomalyResultsAction.INSTANCE, DeleteAnomalyResultsTransportAction.class), new ActionHandler<>(StatsAnomalyDetectorAction.INSTANCE, StatsAnomalyDetectorTransportAction.class), new ActionHandler<>(ADStatsNodesAction.INSTANCE, ADStatsNodesTransportAction.class), new ActionHandler<>(SearchAnomalyDetectorAction.INSTANCE, SearchAnomalyDetectorTransportAction.class), diff --git a/src/main/java/org/opensearch/ad/rest/RestDeleteAnomalyResultsAction.java b/src/main/java/org/opensearch/ad/rest/RestDeleteAnomalyResultsAction.java index 4d81ea225..9bd0ce8d0 100644 --- a/src/main/java/org/opensearch/ad/rest/RestDeleteAnomalyResultsAction.java +++ b/src/main/java/org/opensearch/ad/rest/RestDeleteAnomalyResultsAction.java @@ -15,23 +15,30 @@ import java.io.IOException; import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.opensearch.action.ActionListener; import org.opensearch.action.support.IndicesOptions; -import org.opensearch.ad.AnomalyDetectorPlugin; +import org.opensearch.ad.AnomalyDetectorExtension; import org.opensearch.ad.constant.CommonErrorMessages; +import org.opensearch.ad.settings.AnomalyDetectorSettings; import org.opensearch.ad.settings.EnabledSetting; import org.opensearch.ad.transport.DeleteAnomalyResultsAction; -import org.opensearch.client.node.NodeClient; +import org.opensearch.common.xcontent.json.JsonXContent; import org.opensearch.core.xcontent.ToXContent; import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.extensions.rest.ExtensionRestResponse; +import org.opensearch.index.reindex.BulkByScrollResponse; import org.opensearch.index.reindex.DeleteByQueryRequest; -import org.opensearch.rest.BaseRestHandler; -import org.opensearch.rest.BytesRestResponse; import org.opensearch.rest.RestRequest; import org.opensearch.rest.RestStatus; +import org.opensearch.sdk.ExtensionsRunner; +import org.opensearch.sdk.SDKClient.SDKRestClient; +import org.opensearch.sdk.rest.BaseExtensionRestHandler; import org.opensearch.search.builder.SearchSourceBuilder; import com.google.common.collect.ImmutableList; @@ -48,20 +55,31 @@ * * TODO: build better user experience to reduce user's effort to maintain custom result index. */ -public class RestDeleteAnomalyResultsAction extends BaseRestHandler { +public class RestDeleteAnomalyResultsAction extends BaseExtensionRestHandler { private static final String DELETE_AD_RESULTS_ACTION = "delete_anomaly_results"; private static final Logger logger = LogManager.getLogger(RestDeleteAnomalyResultsAction.class); + private ExtensionsRunner extensionsRunner; + private SDKRestClient sdkRestClient; - public RestDeleteAnomalyResultsAction() {} + public RestDeleteAnomalyResultsAction(ExtensionsRunner extensionsRunner, SDKRestClient client) { + this.extensionsRunner = extensionsRunner; + this.sdkRestClient = client; + } - @Override public String getName() { return DELETE_AD_RESULTS_ACTION; } - @Override - protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException { + private Function handleRequest = (request) -> { + try { + return prepareRequest(request); + } catch (Exception e) { + return exceptionalRequest(request, e); + } + }; + + protected ExtensionRestResponse prepareRequest(RestRequest request) throws IOException { if (!EnabledSetting.isADPluginEnabled()) { throw new IllegalStateException(CommonErrorMessages.DISABLED_ERR_MSG); } @@ -70,21 +88,32 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli DeleteByQueryRequest deleteRequest = new DeleteByQueryRequest(ALL_AD_RESULTS_INDEX_PATTERN) .setQuery(searchSourceBuilder.query()) .setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_HIDDEN); - return channel -> client.execute(DeleteAnomalyResultsAction.INSTANCE, deleteRequest, ActionListener.wrap(r -> { - XContentBuilder contentBuilder = r.toXContent(channel.newBuilder().startObject(), ToXContent.EMPTY_PARAMS); - contentBuilder.endObject(); - channel.sendResponse(new BytesRestResponse(RestStatus.OK, contentBuilder)); - }, e -> { - try { - channel.sendResponse(new BytesRestResponse(channel, e)); - } catch (IOException exception) { - logger.error("Failed to send back delete anomaly result exception result", exception); - } - })); + CompletableFuture futureResponse = new CompletableFuture<>(); + sdkRestClient + .execute( + DeleteAnomalyResultsAction.INSTANCE, + deleteRequest, + ActionListener + .wrap(deleteResponse -> futureResponse.complete(deleteResponse), ex -> futureResponse.completeExceptionally(ex)) + ); + BulkByScrollResponse bulkByScrollResponse = futureResponse + .orTimeout( + AnomalyDetectorSettings.REQUEST_TIMEOUT.get(extensionsRunner.getEnvironmentSettings()).getMillis(), + TimeUnit.MILLISECONDS + ) + .join(); + + XContentBuilder contentBuilder = bulkByScrollResponse + .toXContent(JsonXContent.contentBuilder().startObject(), ToXContent.EMPTY_PARAMS); + contentBuilder.endObject(); + + ExtensionRestResponse extensionRestResponse = new ExtensionRestResponse(request, RestStatus.OK, contentBuilder); + return extensionRestResponse; } @Override - public List routes() { - return ImmutableList.of(new Route(RestRequest.Method.DELETE, AnomalyDetectorPlugin.AD_BASE_DETECTORS_URI + "/results")); + public List routeHandlers() { + return ImmutableList + .of(new RouteHandler(RestRequest.Method.DELETE, AnomalyDetectorExtension.AD_BASE_DETECTORS_URI + "/results", handleRequest)); } } diff --git a/src/main/java/org/opensearch/ad/transport/DeleteAnomalyResultsTransportAction.java b/src/main/java/org/opensearch/ad/transport/DeleteAnomalyResultsTransportAction.java index 2ee2881c1..d04384855 100644 --- a/src/main/java/org/opensearch/ad/transport/DeleteAnomalyResultsTransportAction.java +++ b/src/main/java/org/opensearch/ad/transport/DeleteAnomalyResultsTransportAction.java @@ -21,36 +21,39 @@ import org.apache.logging.log4j.Logger; import org.opensearch.action.ActionListener; import org.opensearch.action.support.ActionFilters; -import org.opensearch.action.support.HandledTransportAction; +import org.opensearch.action.support.TransportAction; import org.opensearch.ad.auth.UserIdentity; -import org.opensearch.client.Client; -import org.opensearch.cluster.service.ClusterService; -import org.opensearch.common.inject.Inject; import org.opensearch.common.settings.Settings; import org.opensearch.index.reindex.BulkByScrollResponse; -import org.opensearch.index.reindex.DeleteByQueryAction; import org.opensearch.index.reindex.DeleteByQueryRequest; +import org.opensearch.sdk.ExtensionsRunner; +import org.opensearch.sdk.SDKClient.SDKRestClient; +import org.opensearch.sdk.SDKClusterService; import org.opensearch.tasks.Task; -import org.opensearch.transport.TransportService; +import org.opensearch.tasks.TaskManager; -public class DeleteAnomalyResultsTransportAction extends HandledTransportAction { +import com.google.inject.Inject; - private final Client client; +public class DeleteAnomalyResultsTransportAction extends TransportAction { + + private final SDKRestClient sdkRestClient; private volatile Boolean filterEnabled; private static final Logger logger = LogManager.getLogger(DeleteAnomalyResultsTransportAction.class); + private final Settings settings; @Inject public DeleteAnomalyResultsTransportAction( - TransportService transportService, + ExtensionsRunner extensionsRunner, + TaskManager taskManager, ActionFilters actionFilters, - Settings settings, - ClusterService clusterService, - Client client + SDKClusterService sdkClusterService, + SDKRestClient sdkRestClient ) { - super(DeleteAnomalyResultsAction.NAME, transportService, actionFilters, DeleteByQueryRequest::new); - this.client = client; + super(DeleteAnomalyResultsAction.NAME, actionFilters, taskManager); + this.sdkRestClient = sdkRestClient; + this.settings = extensionsRunner.getEnvironmentSettings(); filterEnabled = FILTER_BY_BACKEND_ROLES.get(settings); - clusterService.getClusterSettings().addSettingsUpdateConsumer(FILTER_BY_BACKEND_ROLES, it -> filterEnabled = it); + sdkClusterService.getClusterSettings().addSettingsUpdateConsumer(FILTER_BY_BACKEND_ROLES, it -> filterEnabled = it); } @Override @@ -75,12 +78,15 @@ private void validateRole(DeleteByQueryRequest request, UserIdentity user, Actio // Case 1: user == null when 1. Security is disabled. 2. When user is super-admin // Case 2: If Security is enabled and filter is disabled, proceed with search as // user is already authenticated to hit this API. - client.execute(DeleteByQueryAction.INSTANCE, request, listener); + // client.execute(DeleteByQueryAction.INSTANCE, request, listener); + // client.execute(DeleteByQueryAction.INSTANCE, request, listener); + sdkRestClient.deleteByQuery(request, listener); } else { // Security is enabled and backend role filter is enabled try { addUserBackendRolesFilter(user, request.getSearchRequest().source()); - client.execute(DeleteByQueryAction.INSTANCE, request, listener); + // client.execute(DeleteByQueryAction.INSTANCE, request, listener); + sdkRestClient.deleteByQuery(request, listener); } catch (Exception e) { listener.onFailure(e); } diff --git a/src/test/java/org/opensearch/ad/transport/DeleteAnomalyResultsTransportActionTests.java b/src/test/java/org/opensearch/ad/transport/DeleteAnomalyResultsTransportActionTests.java index db83e861a..76e5eb0dc 100644 --- a/src/test/java/org/opensearch/ad/transport/DeleteAnomalyResultsTransportActionTests.java +++ b/src/test/java/org/opensearch/ad/transport/DeleteAnomalyResultsTransportActionTests.java @@ -9,8 +9,22 @@ * GitHub history for details. */ -/*package org.opensearch.ad.transport; +package org.opensearch.ad.transport; +import static org.opensearch.ad.TestHelpers.matchAllRequest; +import static org.opensearch.ad.constant.CommonName.ANOMALY_RESULT_INDEX_ALIAS; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; + +import org.junit.Ignore; +import org.opensearch.action.search.SearchResponse; +import org.opensearch.ad.HistoricalAnalysisIntegTestCase; +import org.opensearch.ad.TestHelpers; +import org.opensearch.index.query.BoolQueryBuilder; +import org.opensearch.index.query.MatchAllQueryBuilder; +import org.opensearch.index.reindex.BulkByScrollResponse; +import org.opensearch.index.reindex.DeleteByQueryRequest; public class DeleteAnomalyResultsTransportActionTests extends HistoricalAnalysisIntegTestCase { @@ -35,4 +49,4 @@ public void testDeleteADResultAction() throws IOException, InterruptedException }, 90, TimeUnit.SECONDS); assertEquals(1, deleteADResultResponse.getDeleted()); } -}*/ +}