Skip to content

Commit

Permalink
[Extensions] Migrates Delete Detector Results (#881)
Browse files Browse the repository at this point in the history
* Migrates Delete Detector Results

Signed-off-by: Owais Kazi <owaiskazi19@gmail.com>

* Integrated the correct response for the results

Signed-off-by: Owais Kazi <owaiskazi19@gmail.com>

* Added test

Signed-off-by: Owais Kazi <owaiskazi19@gmail.com>

* Updated with new method name

Signed-off-by: Owais Kazi <owaiskazi19@gmail.com>

* Addressed PR Comments

Signed-off-by: Owais Kazi <owaiskazi19@gmail.com>

---------

Signed-off-by: Owais Kazi <owaiskazi19@gmail.com>
  • Loading branch information
owaiskazi19 authored May 2, 2023
1 parent 5dc8edb commit 697b9b7
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 41 deletions.
5 changes: 5 additions & 0 deletions src/main/java/org/opensearch/ad/AnomalyDetectorExtension.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import org.opensearch.ad.ratelimit.ResultWriteWorker;
import org.opensearch.ad.rest.RestAnomalyDetectorJobAction;
import org.opensearch.ad.rest.RestDeleteAnomalyDetectorAction;
import org.opensearch.ad.rest.RestDeleteAnomalyResultsAction;
import org.opensearch.ad.rest.RestGetAnomalyDetectorAction;
import org.opensearch.ad.rest.RestIndexAnomalyDetectorAction;
import org.opensearch.ad.rest.RestPreviewAnomalyDetectorAction;
Expand Down Expand Up @@ -100,6 +101,8 @@
import org.opensearch.ad.transport.AnomalyResultTransportAction;
import org.opensearch.ad.transport.DeleteAnomalyDetectorAction;
import org.opensearch.ad.transport.DeleteAnomalyDetectorTransportAction;
import org.opensearch.ad.transport.DeleteAnomalyResultsAction;
import org.opensearch.ad.transport.DeleteAnomalyResultsTransportAction;
import org.opensearch.ad.transport.DeleteModelAction;
import org.opensearch.ad.transport.DeleteModelTransportAction;
import org.opensearch.ad.transport.EntityResultAction;
Expand Down Expand Up @@ -217,6 +220,7 @@ public List<ExtensionRestHandler> getExtensionRestHandlers() {
new RestGetAnomalyDetectorAction(extensionsRunner(), restClient()),
new RestAnomalyDetectorJobAction(extensionsRunner(), restClient()),
new RestDeleteAnomalyDetectorAction(extensionsRunner(), restClient()),
new RestDeleteAnomalyResultsAction(extensionsRunner(), restClient()),
new RestStatsAnomalyDetectorAction(extensionsRunner(), restClient(), adStats, nodeFilter),
new RestSearchAnomalyDetectorAction(extensionsRunner(), restClient()),
new RestSearchAnomalyResultAction(extensionsRunner(), restClient()),
Expand Down Expand Up @@ -794,6 +798,7 @@ public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settings) {
new ActionHandler<>(EntityResultAction.INSTANCE, EntityResultTransportAction.class),
new ActionHandler<>(ProfileAction.INSTANCE, ProfileTransportAction.class),
new ActionHandler<>(DeleteAnomalyDetectorAction.INSTANCE, DeleteAnomalyDetectorTransportAction.class),
new ActionHandler<>(DeleteAnomalyResultsAction.INSTANCE, DeleteAnomalyResultsTransportAction.class),
new ActionHandler<>(StatsAnomalyDetectorAction.INSTANCE, StatsAnomalyDetectorTransportAction.class),
new ActionHandler<>(ADStatsNodesAction.INSTANCE, ADStatsNodesTransportAction.class),
new ActionHandler<>(SearchAnomalyDetectorAction.INSTANCE, SearchAnomalyDetectorTransportAction.class),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,30 @@

import java.io.IOException;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.ActionListener;
import org.opensearch.action.support.IndicesOptions;
import org.opensearch.ad.AnomalyDetectorPlugin;
import org.opensearch.ad.AnomalyDetectorExtension;
import org.opensearch.ad.constant.CommonErrorMessages;
import org.opensearch.ad.settings.AnomalyDetectorSettings;
import org.opensearch.ad.settings.EnabledSetting;
import org.opensearch.ad.transport.DeleteAnomalyResultsAction;
import org.opensearch.client.node.NodeClient;
import org.opensearch.common.xcontent.json.JsonXContent;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.extensions.rest.ExtensionRestResponse;
import org.opensearch.index.reindex.BulkByScrollResponse;
import org.opensearch.index.reindex.DeleteByQueryRequest;
import org.opensearch.rest.BaseRestHandler;
import org.opensearch.rest.BytesRestResponse;
import org.opensearch.rest.RestRequest;
import org.opensearch.rest.RestStatus;
import org.opensearch.sdk.ExtensionsRunner;
import org.opensearch.sdk.SDKClient.SDKRestClient;
import org.opensearch.sdk.rest.BaseExtensionRestHandler;
import org.opensearch.search.builder.SearchSourceBuilder;

import com.google.common.collect.ImmutableList;
Expand All @@ -48,20 +55,31 @@
*
* TODO: build better user experience to reduce user's effort to maintain custom result index.
*/
public class RestDeleteAnomalyResultsAction extends BaseRestHandler {
public class RestDeleteAnomalyResultsAction extends BaseExtensionRestHandler {

private static final String DELETE_AD_RESULTS_ACTION = "delete_anomaly_results";
private static final Logger logger = LogManager.getLogger(RestDeleteAnomalyResultsAction.class);
private ExtensionsRunner extensionsRunner;
private SDKRestClient sdkRestClient;

public RestDeleteAnomalyResultsAction() {}
public RestDeleteAnomalyResultsAction(ExtensionsRunner extensionsRunner, SDKRestClient client) {
this.extensionsRunner = extensionsRunner;
this.sdkRestClient = client;
}

@Override
public String getName() {
return DELETE_AD_RESULTS_ACTION;
}

@Override
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {
private Function<RestRequest, ExtensionRestResponse> handleRequest = (request) -> {
try {
return prepareRequest(request);
} catch (Exception e) {
return exceptionalRequest(request, e);
}
};

protected ExtensionRestResponse prepareRequest(RestRequest request) throws IOException {
if (!EnabledSetting.isADPluginEnabled()) {
throw new IllegalStateException(CommonErrorMessages.DISABLED_ERR_MSG);
}
Expand All @@ -70,21 +88,32 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
DeleteByQueryRequest deleteRequest = new DeleteByQueryRequest(ALL_AD_RESULTS_INDEX_PATTERN)
.setQuery(searchSourceBuilder.query())
.setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_HIDDEN);
return channel -> client.execute(DeleteAnomalyResultsAction.INSTANCE, deleteRequest, ActionListener.wrap(r -> {
XContentBuilder contentBuilder = r.toXContent(channel.newBuilder().startObject(), ToXContent.EMPTY_PARAMS);
contentBuilder.endObject();
channel.sendResponse(new BytesRestResponse(RestStatus.OK, contentBuilder));
}, e -> {
try {
channel.sendResponse(new BytesRestResponse(channel, e));
} catch (IOException exception) {
logger.error("Failed to send back delete anomaly result exception result", exception);
}
}));
CompletableFuture<BulkByScrollResponse> futureResponse = new CompletableFuture<>();
sdkRestClient
.execute(
DeleteAnomalyResultsAction.INSTANCE,
deleteRequest,
ActionListener
.wrap(deleteResponse -> futureResponse.complete(deleteResponse), ex -> futureResponse.completeExceptionally(ex))
);
BulkByScrollResponse bulkByScrollResponse = futureResponse
.orTimeout(
AnomalyDetectorSettings.REQUEST_TIMEOUT.get(extensionsRunner.getEnvironmentSettings()).getMillis(),
TimeUnit.MILLISECONDS
)
.join();

XContentBuilder contentBuilder = bulkByScrollResponse
.toXContent(JsonXContent.contentBuilder().startObject(), ToXContent.EMPTY_PARAMS);
contentBuilder.endObject();

ExtensionRestResponse extensionRestResponse = new ExtensionRestResponse(request, RestStatus.OK, contentBuilder);
return extensionRestResponse;
}

@Override
public List<Route> routes() {
return ImmutableList.of(new Route(RestRequest.Method.DELETE, AnomalyDetectorPlugin.AD_BASE_DETECTORS_URI + "/results"));
public List<RouteHandler> routeHandlers() {
return ImmutableList
.of(new RouteHandler(RestRequest.Method.DELETE, AnomalyDetectorExtension.AD_BASE_DETECTORS_URI + "/results", handleRequest));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,36 +21,39 @@
import org.apache.logging.log4j.Logger;
import org.opensearch.action.ActionListener;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.HandledTransportAction;
import org.opensearch.action.support.TransportAction;
import org.opensearch.ad.auth.UserIdentity;
import org.opensearch.client.Client;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.settings.Settings;
import org.opensearch.index.reindex.BulkByScrollResponse;
import org.opensearch.index.reindex.DeleteByQueryAction;
import org.opensearch.index.reindex.DeleteByQueryRequest;
import org.opensearch.sdk.ExtensionsRunner;
import org.opensearch.sdk.SDKClient.SDKRestClient;
import org.opensearch.sdk.SDKClusterService;
import org.opensearch.tasks.Task;
import org.opensearch.transport.TransportService;
import org.opensearch.tasks.TaskManager;

public class DeleteAnomalyResultsTransportAction extends HandledTransportAction<DeleteByQueryRequest, BulkByScrollResponse> {
import com.google.inject.Inject;

private final Client client;
public class DeleteAnomalyResultsTransportAction extends TransportAction<DeleteByQueryRequest, BulkByScrollResponse> {

private final SDKRestClient sdkRestClient;
private volatile Boolean filterEnabled;
private static final Logger logger = LogManager.getLogger(DeleteAnomalyResultsTransportAction.class);
private final Settings settings;

@Inject
public DeleteAnomalyResultsTransportAction(
TransportService transportService,
ExtensionsRunner extensionsRunner,
TaskManager taskManager,
ActionFilters actionFilters,
Settings settings,
ClusterService clusterService,
Client client
SDKClusterService sdkClusterService,
SDKRestClient sdkRestClient
) {
super(DeleteAnomalyResultsAction.NAME, transportService, actionFilters, DeleteByQueryRequest::new);
this.client = client;
super(DeleteAnomalyResultsAction.NAME, actionFilters, taskManager);
this.sdkRestClient = sdkRestClient;
this.settings = extensionsRunner.getEnvironmentSettings();
filterEnabled = FILTER_BY_BACKEND_ROLES.get(settings);
clusterService.getClusterSettings().addSettingsUpdateConsumer(FILTER_BY_BACKEND_ROLES, it -> filterEnabled = it);
sdkClusterService.getClusterSettings().addSettingsUpdateConsumer(FILTER_BY_BACKEND_ROLES, it -> filterEnabled = it);
}

@Override
Expand All @@ -75,12 +78,15 @@ private void validateRole(DeleteByQueryRequest request, UserIdentity user, Actio
// Case 1: user == null when 1. Security is disabled. 2. When user is super-admin
// Case 2: If Security is enabled and filter is disabled, proceed with search as
// user is already authenticated to hit this API.
client.execute(DeleteByQueryAction.INSTANCE, request, listener);
// client.execute(DeleteByQueryAction.INSTANCE, request, listener);
// client.execute(DeleteByQueryAction.INSTANCE, request, listener);
sdkRestClient.deleteByQuery(request, listener);
} else {
// Security is enabled and backend role filter is enabled
try {
addUserBackendRolesFilter(user, request.getSearchRequest().source());
client.execute(DeleteByQueryAction.INSTANCE, request, listener);
// client.execute(DeleteByQueryAction.INSTANCE, request, listener);
sdkRestClient.deleteByQuery(request, listener);
} catch (Exception e) {
listener.onFailure(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,22 @@
* GitHub history for details.
*/

/*package org.opensearch.ad.transport;
package org.opensearch.ad.transport;

import static org.opensearch.ad.TestHelpers.matchAllRequest;
import static org.opensearch.ad.constant.CommonName.ANOMALY_RESULT_INDEX_ALIAS;

import java.io.IOException;
import java.util.concurrent.TimeUnit;

import org.junit.Ignore;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.ad.HistoricalAnalysisIntegTestCase;
import org.opensearch.ad.TestHelpers;
import org.opensearch.index.query.BoolQueryBuilder;
import org.opensearch.index.query.MatchAllQueryBuilder;
import org.opensearch.index.reindex.BulkByScrollResponse;
import org.opensearch.index.reindex.DeleteByQueryRequest;

public class DeleteAnomalyResultsTransportActionTests extends HistoricalAnalysisIntegTestCase {

Expand All @@ -35,4 +49,4 @@ public void testDeleteADResultAction() throws IOException, InterruptedException
}, 90, TimeUnit.SECONDS);
assertEquals(1, deleteADResultResponse.getDeleted());
}
}*/
}

0 comments on commit 697b9b7

Please sign in to comment.