From f9466fd609b4b4f8ba962e13cc7ab5ca7fa1fd1c Mon Sep 17 00:00:00 2001 From: Sohaib Iftikhar Date: Tue, 4 Sep 2018 14:56:26 +0200 Subject: [PATCH] HLRC: Add delete by query API (#32782) Adds the delete-by-query API to the High Level REST Client. --- .../client/RequestConverters.java | 27 +++ .../client/RestHighLevelClient.java | 30 ++++ .../java/org/elasticsearch/client/CrudIT.java | 48 ++++++ .../client/RequestConvertersTests.java | 52 +++++- .../client/RestHighLevelClientTests.java | 1 - .../documentation/CRUDDocumentationIT.java | 108 ++++++++++++ .../document/delete-by-query.asciidoc | 163 ++++++++++++++++++ .../high-level/supported-apis.asciidoc | 2 + .../reindex/RestDeleteByQueryAction.java | 3 +- .../index/reindex/RoundTripTests.java | 3 +- .../index/reindex/DeleteByQueryRequest.java | 92 +++++++++- .../ml/job/persistence/JobDataDeleter.java | 16 +- .../persistence/JobStorageDeletionTask.java | 23 +-- .../action/TransportDeleteCalendarAction.java | 9 +- .../retention/ExpiredForecastsRemover.java | 9 +- .../job/retention/ExpiredResultsRemover.java | 11 +- .../security/authc/ExpiredTokenRemover.java | 10 +- 17 files changed, 547 insertions(+), 60 deletions(-) create mode 100644 docs/java-rest/high-level/document/delete-by-query.asciidoc diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java index 054abef1bcdb2..43cb3ddf02235 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java @@ -108,6 +108,7 @@ import org.elasticsearch.index.VersionType; import org.elasticsearch.index.rankeval.RankEvalRequest; import org.elasticsearch.index.reindex.AbstractBulkByScrollRequest; +import org.elasticsearch.index.reindex.DeleteByQueryRequest; import org.elasticsearch.index.reindex.ReindexRequest; import org.elasticsearch.index.reindex.UpdateByQueryRequest; import org.elasticsearch.protocol.xpack.XPackInfoRequest; @@ -877,6 +878,32 @@ static Request updateByQuery(UpdateByQueryRequest updateByQueryRequest) throws I return request; } + static Request deleteByQuery(DeleteByQueryRequest deleteByQueryRequest) throws IOException { + String endpoint = + endpoint(deleteByQueryRequest.indices(), deleteByQueryRequest.getDocTypes(), "_delete_by_query"); + Request request = new Request(HttpPost.METHOD_NAME, endpoint); + Params params = new Params(request) + .withRouting(deleteByQueryRequest.getRouting()) + .withRefresh(deleteByQueryRequest.isRefresh()) + .withTimeout(deleteByQueryRequest.getTimeout()) + .withWaitForActiveShards(deleteByQueryRequest.getWaitForActiveShards(), ActiveShardCount.DEFAULT) + .withIndicesOptions(deleteByQueryRequest.indicesOptions()); + if (deleteByQueryRequest.isAbortOnVersionConflict() == false) { + params.putParam("conflicts", "proceed"); + } + if (deleteByQueryRequest.getBatchSize() != AbstractBulkByScrollRequest.DEFAULT_SCROLL_SIZE) { + params.putParam("scroll_size", Integer.toString(deleteByQueryRequest.getBatchSize())); + } + if (deleteByQueryRequest.getScrollTime() != AbstractBulkByScrollRequest.DEFAULT_SCROLL_TIMEOUT) { + params.putParam("scroll", deleteByQueryRequest.getScrollTime()); + } + if (deleteByQueryRequest.getSize() > 0) { + params.putParam("size", Integer.toString(deleteByQueryRequest.getSize())); + } + request.setEntity(createEntity(deleteByQueryRequest, REQUEST_BODY_CONTENT_TYPE)); + return request; + } + static Request rollover(RolloverRequest rolloverRequest) throws IOException { String endpoint = new EndpointBuilder().addPathPart(rolloverRequest.getAlias()).addPathPartAsIs("_rollover") .addPathPart(rolloverRequest.getNewIndexName()).build(); diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/RestHighLevelClient.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/RestHighLevelClient.java index 158d9506b9d59..afd85f313302e 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/RestHighLevelClient.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/RestHighLevelClient.java @@ -66,6 +66,7 @@ import org.elasticsearch.index.rankeval.RankEvalRequest; import org.elasticsearch.index.rankeval.RankEvalResponse; import org.elasticsearch.index.reindex.BulkByScrollResponse; +import org.elasticsearch.index.reindex.DeleteByQueryRequest; import org.elasticsearch.index.reindex.ReindexRequest; import org.elasticsearch.index.reindex.UpdateByQueryRequest; import org.elasticsearch.plugins.spi.NamedXContentProvider; @@ -474,6 +475,35 @@ public final void updateByQueryAsync(UpdateByQueryRequest reindexRequest, Reques ); } + /** + * Executes a delete by query request. + * See + * Delete By Query API on elastic.co + * @param deleteByQueryRequest the request + * @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized + * @return the response + * @throws IOException in case there is a problem sending the request or parsing back the response + */ + public final BulkByScrollResponse deleteByQuery(DeleteByQueryRequest deleteByQueryRequest, RequestOptions options) throws IOException { + return performRequestAndParseEntity( + deleteByQueryRequest, RequestConverters::deleteByQuery, options, BulkByScrollResponse::fromXContent, emptySet() + ); + } + + /** + * Asynchronously executes a delete by query request. + * See + * Delete By Query API on elastic.co + * @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized + * @param listener the listener to be notified upon request completion + */ + public final void deleteByQueryAsync(DeleteByQueryRequest reindexRequest, RequestOptions options, + ActionListener listener) { + performRequestAsyncAndParseEntity( + reindexRequest, RequestConverters::deleteByQuery, options, BulkByScrollResponse::fromXContent, listener, emptySet() + ); + } + /** * Pings the remote Elasticsearch cluster and returns true if the ping succeeded, false otherwise * @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/CrudIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/CrudIT.java index 64232d8b17693..eeba9d50fa606 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/CrudIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/CrudIT.java @@ -36,6 +36,7 @@ import org.elasticsearch.action.get.MultiGetResponse; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; +import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.support.WriteRequest.RefreshPolicy; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.action.update.UpdateResponse; @@ -50,6 +51,7 @@ import org.elasticsearch.index.get.GetResult; import org.elasticsearch.index.query.IdsQueryBuilder; import org.elasticsearch.index.reindex.BulkByScrollResponse; +import org.elasticsearch.index.reindex.DeleteByQueryRequest; import org.elasticsearch.index.reindex.ReindexRequest; import org.elasticsearch.index.reindex.UpdateByQueryRequest; import org.elasticsearch.rest.RestStatus; @@ -823,6 +825,52 @@ public void testUpdateByQuery() throws IOException { } } + public void testDeleteByQuery() throws IOException { + final String sourceIndex = "source1"; + { + // Prepare + Settings settings = Settings.builder() + .put("number_of_shards", 1) + .put("number_of_replicas", 0) + .build(); + createIndex(sourceIndex, settings); + assertEquals( + RestStatus.OK, + highLevelClient().bulk( + new BulkRequest() + .add(new IndexRequest(sourceIndex, "type", "1") + .source(Collections.singletonMap("foo", 1), XContentType.JSON)) + .add(new IndexRequest(sourceIndex, "type", "2") + .source(Collections.singletonMap("foo", 2), XContentType.JSON)) + .setRefreshPolicy(RefreshPolicy.IMMEDIATE), + RequestOptions.DEFAULT + ).status() + ); + } + { + // test1: delete one doc + DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(); + deleteByQueryRequest.indices(sourceIndex); + deleteByQueryRequest.setQuery(new IdsQueryBuilder().addIds("1").types("type")); + deleteByQueryRequest.setRefresh(true); + BulkByScrollResponse bulkResponse = + execute(deleteByQueryRequest, highLevelClient()::deleteByQuery, highLevelClient()::deleteByQueryAsync); + assertEquals(1, bulkResponse.getTotal()); + assertEquals(1, bulkResponse.getDeleted()); + assertEquals(0, bulkResponse.getNoops()); + assertEquals(0, bulkResponse.getVersionConflicts()); + assertEquals(1, bulkResponse.getBatches()); + assertTrue(bulkResponse.getTook().getMillis() > 0); + assertEquals(1, bulkResponse.getBatches()); + assertEquals(0, bulkResponse.getBulkFailures().size()); + assertEquals(0, bulkResponse.getSearchFailures().size()); + assertEquals( + 1, + highLevelClient().search(new SearchRequest(sourceIndex), RequestOptions.DEFAULT).getHits().totalHits + ); + } + } + public void testBulkProcessorIntegration() throws IOException { int nbItems = randomIntBetween(10, 100); boolean[] errors = new boolean[nbItems]; diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java index 43ab55c5362b3..2f864b951b223 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java @@ -128,6 +128,7 @@ import org.elasticsearch.index.rankeval.RankEvalSpec; import org.elasticsearch.index.rankeval.RatedRequest; import org.elasticsearch.index.rankeval.RestRankEvalAction; +import org.elasticsearch.index.reindex.DeleteByQueryRequest; import org.elasticsearch.index.reindex.ReindexRequest; import org.elasticsearch.index.reindex.RemoteInfo; import org.elasticsearch.index.reindex.UpdateByQueryRequest; @@ -536,6 +537,53 @@ public void testUpdateByQuery() throws IOException { assertToXContentBody(updateByQueryRequest, request.getEntity()); } + public void testDeleteByQuery() throws IOException { + DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(); + deleteByQueryRequest.indices(randomIndicesNames(1, 5)); + Map expectedParams = new HashMap<>(); + if (randomBoolean()) { + deleteByQueryRequest.setDocTypes(generateRandomStringArray(5, 5, false, false)); + } + if (randomBoolean()) { + int batchSize = randomInt(100); + deleteByQueryRequest.setBatchSize(batchSize); + expectedParams.put("scroll_size", Integer.toString(batchSize)); + } + if (randomBoolean()) { + deleteByQueryRequest.setRouting("=cat"); + expectedParams.put("routing", "=cat"); + } + if (randomBoolean()) { + int size = randomIntBetween(100, 1000); + deleteByQueryRequest.setSize(size); + expectedParams.put("size", Integer.toString(size)); + } + if (randomBoolean()) { + deleteByQueryRequest.setAbortOnVersionConflict(false); + expectedParams.put("conflicts", "proceed"); + } + if (randomBoolean()) { + String ts = randomTimeValue(); + deleteByQueryRequest.setScroll(TimeValue.parseTimeValue(ts, "scroll")); + expectedParams.put("scroll", ts); + } + if (randomBoolean()) { + deleteByQueryRequest.setQuery(new TermQueryBuilder("foo", "fooval")); + } + setRandomIndicesOptions(deleteByQueryRequest::setIndicesOptions, deleteByQueryRequest::indicesOptions, expectedParams); + setRandomTimeout(deleteByQueryRequest::setTimeout, ReplicationRequest.DEFAULT_TIMEOUT, expectedParams); + Request request = RequestConverters.deleteByQuery(deleteByQueryRequest); + StringJoiner joiner = new StringJoiner("/", "/", ""); + joiner.add(String.join(",", deleteByQueryRequest.indices())); + if (deleteByQueryRequest.getDocTypes().length > 0) + joiner.add(String.join(",", deleteByQueryRequest.getDocTypes())); + joiner.add("_delete_by_query"); + assertEquals(joiner.toString(), request.getEndpoint()); + assertEquals(HttpPost.METHOD_NAME, request.getMethod()); + assertEquals(expectedParams, request.getParameters()); + assertToXContentBody(deleteByQueryRequest, request.getEntity()); + } + public void testPutMapping() throws IOException { PutMappingRequest putMappingRequest = new PutMappingRequest(); @@ -2754,7 +2802,7 @@ public void testXPackPutWatch() throws Exception { request.getEntity().writeTo(bos); assertThat(bos.toString("UTF-8"), is(body)); } - + public void testGraphExplore() throws Exception { Map expectedParams = new HashMap<>(); @@ -2782,7 +2830,7 @@ public void testGraphExplore() throws Exception { assertEquals(expectedParams, request.getParameters()); assertThat(request.getEntity().getContentType().getValue(), is(XContentType.JSON.mediaTypeWithoutParameters())); assertToXContentBody(graphExploreRequest, request.getEntity()); - } + } public void testXPackDeleteWatch() { DeleteWatchRequest deleteWatchRequest = new DeleteWatchRequest(); diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/RestHighLevelClientTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/RestHighLevelClientTests.java index 7a55c87571d94..0ffc1acd104a3 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/RestHighLevelClientTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/RestHighLevelClientTests.java @@ -667,7 +667,6 @@ public void testApiNamingConventions() throws Exception { "cluster.remote_info", "count", "create", - "delete_by_query", "exists_source", "get_source", "indices.delete_alias", diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/CRUDDocumentationIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/CRUDDocumentationIT.java index d24828576b59d..ffb0448f5b421 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/CRUDDocumentationIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/CRUDDocumentationIT.java @@ -65,6 +65,7 @@ import org.elasticsearch.index.query.MatchAllQueryBuilder; import org.elasticsearch.index.query.TermQueryBuilder; import org.elasticsearch.index.reindex.BulkByScrollResponse; +import org.elasticsearch.index.reindex.DeleteByQueryRequest; import org.elasticsearch.index.reindex.ReindexRequest; import org.elasticsearch.index.reindex.RemoteInfo; import org.elasticsearch.index.reindex.ScrollableHitSource; @@ -1030,6 +1031,113 @@ public void onFailure(Exception e) { } } + public void testDeleteByQuery() throws Exception { + RestHighLevelClient client = highLevelClient(); + { + String mapping = + "\"doc\": {\n" + + " \"properties\": {\n" + + " \"user\": {\n" + + " \"type\": \"text\"\n" + + " },\n" + + " \"field1\": {\n" + + " \"type\": \"integer\"\n" + + " },\n" + + " \"field2\": {\n" + + " \"type\": \"integer\"\n" + + " }\n" + + " }\n" + + " }"; + createIndex("source1", Settings.EMPTY, mapping); + createIndex("source2", Settings.EMPTY, mapping); + } + { + // tag::delete-by-query-request + DeleteByQueryRequest request = new DeleteByQueryRequest("source1", "source2"); // <1> + // end::delete-by-query-request + // tag::delete-by-query-request-conflicts + request.setConflicts("proceed"); // <1> + // end::delete-by-query-request-conflicts + // tag::delete-by-query-request-typeOrQuery + request.setDocTypes("doc"); // <1> + request.setQuery(new TermQueryBuilder("user", "kimchy")); // <2> + // end::delete-by-query-request-typeOrQuery + // tag::delete-by-query-request-size + request.setSize(10); // <1> + // end::delete-by-query-request-size + // tag::delete-by-query-request-scrollSize + request.setBatchSize(100); // <1> + // end::delete-by-query-request-scrollSize + // tag::delete-by-query-request-timeout + request.setTimeout(TimeValue.timeValueMinutes(2)); // <1> + // end::delete-by-query-request-timeout + // tag::delete-by-query-request-refresh + request.setRefresh(true); // <1> + // end::delete-by-query-request-refresh + // tag::delete-by-query-request-slices + request.setSlices(2); // <1> + // end::delete-by-query-request-slices + // tag::delete-by-query-request-scroll + request.setScroll(TimeValue.timeValueMinutes(10)); // <1> + // end::delete-by-query-request-scroll + // tag::delete-by-query-request-routing + request.setRouting("=cat"); // <1> + // end::delete-by-query-request-routing + // tag::delete-by-query-request-indicesOptions + request.setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN); // <1> + // end::delete-by-query-request-indicesOptions + + // tag::delete-by-query-execute + BulkByScrollResponse bulkResponse = client.deleteByQuery(request, RequestOptions.DEFAULT); + // end::delete-by-query-execute + assertSame(0, bulkResponse.getSearchFailures().size()); + assertSame(0, bulkResponse.getBulkFailures().size()); + // tag::delete-by-query-response + TimeValue timeTaken = bulkResponse.getTook(); // <1> + boolean timedOut = bulkResponse.isTimedOut(); // <2> + long totalDocs = bulkResponse.getTotal(); // <3> + long deletedDocs = bulkResponse.getDeleted(); // <4> + long batches = bulkResponse.getBatches(); // <5> + long noops = bulkResponse.getNoops(); // <6> + long versionConflicts = bulkResponse.getVersionConflicts(); // <7> + long bulkRetries = bulkResponse.getBulkRetries(); // <8> + long searchRetries = bulkResponse.getSearchRetries(); // <9> + TimeValue throttledMillis = bulkResponse.getStatus().getThrottled(); // <10> + TimeValue throttledUntilMillis = bulkResponse.getStatus().getThrottledUntil(); // <11> + List searchFailures = bulkResponse.getSearchFailures(); // <12> + List bulkFailures = bulkResponse.getBulkFailures(); // <13> + // end::delete-by-query-response + } + { + DeleteByQueryRequest request = new DeleteByQueryRequest(); + request.indices("source1"); + + // tag::delete-by-query-execute-listener + ActionListener listener = new ActionListener() { + @Override + public void onResponse(BulkByScrollResponse bulkResponse) { + // <1> + } + + @Override + public void onFailure(Exception e) { + // <2> + } + }; + // end::delete-by-query-execute-listener + + // Replace the empty listener by a blocking listener in test + final CountDownLatch latch = new CountDownLatch(1); + listener = new LatchedActionListener<>(listener, latch); + + // tag::delete-by-query-execute-async + client.deleteByQueryAsync(request, RequestOptions.DEFAULT, listener); // <1> + // end::delete-by-query-execute-async + + assertTrue(latch.await(30L, TimeUnit.SECONDS)); + } + } + public void testGet() throws Exception { RestHighLevelClient client = highLevelClient(); { diff --git a/docs/java-rest/high-level/document/delete-by-query.asciidoc b/docs/java-rest/high-level/document/delete-by-query.asciidoc new file mode 100644 index 0000000000000..5ec246a9121ec --- /dev/null +++ b/docs/java-rest/high-level/document/delete-by-query.asciidoc @@ -0,0 +1,163 @@ +[[java-rest-high-document-delete-by-query]] +=== Delete By Query API + +[[java-rest-high-document-delete-by-query-request]] +==== Delete By Query Request + +A `DeleteByQueryRequest` can be used to delete documents from an index. It requires an existing index (or a set of indices) +on which deletion is to be performed. + +The simplest form of a `DeleteByQueryRequest` looks like: + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/CRUDDocumentationIT.java[delete-by-query-request] +-------------------------------------------------- +<1> Creates the `DeleteByQueryRequest` on a set of indices. + +By default version conflicts abort the `DeleteByQueryRequest` process but you can just count them by settings it to +`proceed` in the request body + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/CRUDDocumentationIT.java[delete-by-query-request-conflicts] +-------------------------------------------------- +<1> Set `proceed` on version conflict + +You can limit the documents by adding a type to the source or by adding a query. + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/CRUDDocumentationIT.java[delete-by-query-request-typeOrQuery] +-------------------------------------------------- +<1> Only copy `doc` type +<2> Only copy documents which have field `user` set to `kimchy` + +It’s also possible to limit the number of processed documents by setting size. + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/CRUDDocumentationIT.java[delete-by-query-request-size] +-------------------------------------------------- +<1> Only copy 10 documents + +By default `DeleteByQueryRequest` uses batches of 1000. You can change the batch size with `setBatchSize`. + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/CRUDDocumentationIT.java[delete-by-query-request-scrollSize] +-------------------------------------------------- +<1> Use batches of 100 documents + +`DeleteByQueryRequest` also helps in automatically parallelizing using `sliced-scroll` to +slice on `_uid`. Use `setSlices` to specify the number of slices to use. + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/CRUDDocumentationIT.java[delete-by-query-request-slices] +-------------------------------------------------- +<1> set number of slices to use + +`DeleteByQueryRequest` uses the `scroll` parameter to control how long it keeps the "search context" alive. + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/CRUDDocumentationIT.java[delete-by-query-request-scroll] +-------------------------------------------------- +<1> set scroll time + +If you provide routing then the routing is copied to the scroll query, limiting the process to the shards that match +that routing value. + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/CRUDDocumentationIT.java[delete-by-query-request-routing] +-------------------------------------------------- +<1> set routing + + +==== Optional arguments +In addition to the options above the following arguments can optionally be also provided: + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/CRUDDocumentationIT.java[delete-by-query-request-timeout] +-------------------------------------------------- +<1> Timeout to wait for the delete by query request to be performed as a `TimeValue` + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/CRUDDocumentationIT.java[delete-by-query-request-refresh] +-------------------------------------------------- +<1> Refresh index after calling delete by query + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/CRUDDocumentationIT.java[delete-by-query-request-indicesOptions] +-------------------------------------------------- +<1> Set indices options + + +[[java-rest-high-document-delete-by-query-sync]] +==== Synchronous Execution + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/CRUDDocumentationIT.java[delete-by-query-execute] +-------------------------------------------------- + +[[java-rest-high-document-delete-by-query-async]] +==== Asynchronous Execution + +The asynchronous execution of an delete by query request requires both the `DeleteByQueryRequest` +instance and an `ActionListener` instance to be passed to the asynchronous +method: + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/CRUDDocumentationIT.java[delete-by-query-execute-async] +-------------------------------------------------- +<1> The `DeleteByQueryRequest` to execute and the `ActionListener` to use when +the execution completes + +The asynchronous method does not block and returns immediately. Once it is +completed the `ActionListener` is called back using the `onResponse` method +if the execution successfully completed or using the `onFailure` method if +it failed. + +A typical listener for `BulkByScrollResponse` looks like: + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/CRUDDocumentationIT.java[delete-by-query-execute-listener] +-------------------------------------------------- +<1> Called when the execution is successfully completed. The response is +provided as an argument and contains a list of individual results for each +operation that was executed. Note that one or more operations might have +failed while the others have been successfully executed. +<2> Called when the whole `DeleteByQueryRequest` fails. In this case the raised +exception is provided as an argument and no operation has been executed. + +[[java-rest-high-document-delete-by-query-execute-listener-response]] +==== Delete By Query Response + +The returned `BulkByScrollResponse` contains information about the executed operations and + allows to iterate over each result as follows: + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/CRUDDocumentationIT.java[delete-by-query-response] +-------------------------------------------------- +<1> Get total time taken +<2> Check if the request timed out +<3> Get total number of docs processed +<4> Number of docs that were deleted +<5> Number of batches that were executed +<6> Number of skipped docs +<7> Number of version conflicts +<8> Number of times request had to retry bulk index operations +<9> Number of times request had to retry search operations +<10> The total time this request has throttled itself not including the current throttle time if it is currently sleeping +<11> Remaining delay of any current throttle sleep or 0 if not sleeping +<12> Failures during search phase +<13> Failures during bulk index operation diff --git a/docs/java-rest/high-level/supported-apis.asciidoc b/docs/java-rest/high-level/supported-apis.asciidoc index b39c83b691318..481a2470aa2d7 100644 --- a/docs/java-rest/high-level/supported-apis.asciidoc +++ b/docs/java-rest/high-level/supported-apis.asciidoc @@ -17,6 +17,7 @@ Multi-document APIs:: * <> * <> * <> +* <> include::document/index.asciidoc[] include::document/get.asciidoc[] @@ -27,6 +28,7 @@ include::document/bulk.asciidoc[] include::document/multi-get.asciidoc[] include::document/reindex.asciidoc[] include::document/update-by-query.asciidoc[] +include::document/delete-by-query.asciidoc[] == Search APIs diff --git a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/RestDeleteByQueryAction.java b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/RestDeleteByQueryAction.java index e0ebaa85193db..be232ca7c402f 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/RestDeleteByQueryAction.java +++ b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/RestDeleteByQueryAction.java @@ -19,7 +19,6 @@ package org.elasticsearch.index.reindex; -import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.client.node.NodeClient; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.rest.RestController; @@ -56,7 +55,7 @@ protected DeleteByQueryRequest buildRequest(RestRequest request) throws IOExcept * it to set its own defaults which differ from SearchRequest's * defaults. Then the parseInternalRequest can override them. */ - DeleteByQueryRequest internal = new DeleteByQueryRequest(new SearchRequest()); + DeleteByQueryRequest internal = new DeleteByQueryRequest(); Map> consumers = new HashMap<>(); consumers.put("conflicts", o -> internal.setConflicts((String) o)); diff --git a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RoundTripTests.java b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RoundTripTests.java index 2ce21e4929e02..32bd090f8af0a 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RoundTripTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RoundTripTests.java @@ -20,7 +20,6 @@ package org.elasticsearch.index.reindex; import org.elasticsearch.Version; -import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.stream.BytesStreamOutput; @@ -103,7 +102,7 @@ public void testUpdateByQueryRequest() throws IOException { } public void testDeleteByQueryRequest() throws IOException { - DeleteByQueryRequest delete = new DeleteByQueryRequest(new SearchRequest()); + DeleteByQueryRequest delete = new DeleteByQueryRequest(); randomRequest(delete); DeleteByQueryRequest tripped = new DeleteByQueryRequest(toInputByteStream(delete)); assertRequestEquals(delete, tripped); diff --git a/server/src/main/java/org/elasticsearch/index/reindex/DeleteByQueryRequest.java b/server/src/main/java/org/elasticsearch/index/reindex/DeleteByQueryRequest.java index f848e8722c719..2713e5e2661da 100644 --- a/server/src/main/java/org/elasticsearch/index/reindex/DeleteByQueryRequest.java +++ b/server/src/main/java/org/elasticsearch/index/reindex/DeleteByQueryRequest.java @@ -24,6 +24,9 @@ import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.tasks.TaskId; import java.io.IOException; @@ -47,12 +50,18 @@ *
  • it's results won't be visible until the index is refreshed.
  • * */ -public class DeleteByQueryRequest extends AbstractBulkByScrollRequest implements IndicesRequest.Replaceable { +public class DeleteByQueryRequest extends AbstractBulkByScrollRequest + implements IndicesRequest.Replaceable, ToXContentObject { public DeleteByQueryRequest() { + this(new SearchRequest()); } - public DeleteByQueryRequest(SearchRequest search) { + public DeleteByQueryRequest(String... indices) { + this(new SearchRequest(indices)); + } + + DeleteByQueryRequest(SearchRequest search) { this(search, true); } @@ -68,6 +77,78 @@ private DeleteByQueryRequest(SearchRequest search, boolean setDefaults) { } } + /** + * Set the query for selective delete + */ + public DeleteByQueryRequest setQuery(QueryBuilder query) { + if (query != null) { + getSearchRequest().source().query(query); + } + return this; + } + + /** + * Set the document types for the delete + */ + public DeleteByQueryRequest setDocTypes(String... types) { + if (types != null) { + getSearchRequest().types(types); + } + return this; + } + + /** + * Set routing limiting the process to the shards that match that routing value + */ + public DeleteByQueryRequest setRouting(String routing) { + if (routing != null) { + getSearchRequest().routing(routing); + } + return this; + } + + /** + * The scroll size to control number of documents processed per batch + */ + public DeleteByQueryRequest setBatchSize(int size) { + getSearchRequest().source().size(size); + return this; + } + + /** + * Set the IndicesOptions for controlling unavailable indices + */ + public DeleteByQueryRequest setIndicesOptions(IndicesOptions indicesOptions) { + getSearchRequest().indicesOptions(indicesOptions); + return this; + } + + /** + * Gets the batch size for this request + */ + public int getBatchSize() { + return getSearchRequest().source().size(); + } + + /** + * Gets the routing value used for this request + */ + public String getRouting() { + return getSearchRequest().routing(); + } + + /** + * Gets the document types on which this request would be executed. Returns an empty array if all + * types are to be processed. + */ + public String[] getDocTypes() { + if (getSearchRequest().types() != null) { + return getSearchRequest().types(); + } else { + return new String[0]; + } + } + @Override protected DeleteByQueryRequest self() { return this; @@ -132,4 +213,11 @@ public DeleteByQueryRequest types(String... types) { return this; } + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + getSearchRequest().source().innerToXContent(builder, params); + builder.endObject(); + return builder; + } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/JobDataDeleter.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/JobDataDeleter.java index 0a7d27f7a0ec4..cc86ce17bb904 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/JobDataDeleter.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/JobDataDeleter.java @@ -11,7 +11,6 @@ import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.bulk.BulkResponse; -import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.client.Client; @@ -22,7 +21,6 @@ import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.reindex.DeleteByQueryAction; import org.elasticsearch.index.reindex.DeleteByQueryRequest; -import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelState; import org.elasticsearch.xpack.core.ml.job.results.Result; @@ -129,8 +127,8 @@ public void deleteResultsFromTime(long cutoffEpochMs, ActionListener li QueryBuilder query = QueryBuilders.boolQuery() .filter(QueryBuilders.existsQuery(Result.RESULT_TYPE.getPreferredName())) .filter(QueryBuilders.rangeQuery(Result.TIMESTAMP.getPreferredName()).gte(cutoffEpochMs)); - deleteByQueryHolder.searchRequest.indicesOptions(IndicesOptions.lenientExpandOpen()); - deleteByQueryHolder.searchRequest.source(new SearchSourceBuilder().query(query)); + deleteByQueryHolder.dbqRequest.setIndicesOptions(IndicesOptions.lenientExpandOpen()); + deleteByQueryHolder.dbqRequest.setQuery(query); executeAsyncWithOrigin(client, ML_ORIGIN, DeleteByQueryAction.INSTANCE, deleteByQueryHolder.dbqRequest, ActionListener.wrap(r -> listener.onResponse(true), listener::onFailure)); } @@ -142,9 +140,9 @@ public void deleteInterimResults() { DeleteByQueryHolder deleteByQueryHolder = new DeleteByQueryHolder(AnomalyDetectorsIndex.jobResultsAliasedName(jobId)); deleteByQueryHolder.dbqRequest.setRefresh(false); - deleteByQueryHolder.searchRequest.indicesOptions(IndicesOptions.lenientExpandOpen()); + deleteByQueryHolder.dbqRequest.setIndicesOptions(IndicesOptions.lenientExpandOpen()); QueryBuilder qb = QueryBuilders.termQuery(Result.IS_INTERIM.getPreferredName(), true); - deleteByQueryHolder.searchRequest.source(new SearchSourceBuilder().query(new ConstantScoreQueryBuilder(qb))); + deleteByQueryHolder.dbqRequest.setQuery(new ConstantScoreQueryBuilder(qb)); try (ThreadContext.StoredContext ignore = stashWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN)) { client.execute(DeleteByQueryAction.INSTANCE, deleteByQueryHolder.dbqRequest).get(); @@ -156,13 +154,11 @@ public void deleteInterimResults() { // Wrapper to ensure safety private static class DeleteByQueryHolder { - private final SearchRequest searchRequest; private final DeleteByQueryRequest dbqRequest; private DeleteByQueryHolder(String index) { - // The search request has to be constructed and passed to the DeleteByQueryRequest before more details are set to it - searchRequest = new SearchRequest(index); - dbqRequest = new DeleteByQueryRequest(searchRequest); + dbqRequest = new DeleteByQueryRequest(); + dbqRequest.indices(index); dbqRequest.setSlices(5); dbqRequest.setAbortOnVersionConflict(false); } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/JobStorageDeletionTask.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/JobStorageDeletionTask.java index 19cb42a220ed2..61ed8ed4e118f 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/JobStorageDeletionTask.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/JobStorageDeletionTask.java @@ -13,7 +13,6 @@ import org.elasticsearch.action.admin.indices.alias.get.GetAliasesResponse; import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkResponse; -import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.client.Client; @@ -28,7 +27,6 @@ import org.elasticsearch.index.reindex.BulkByScrollResponse; import org.elasticsearch.index.reindex.DeleteByQueryAction; import org.elasticsearch.index.reindex.DeleteByQueryRequest; -import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.xpack.core.ml.action.GetModelSnapshotsAction; @@ -95,12 +93,11 @@ public void delete(String jobId, Client client, ClusterState state, ActionListener deleteCategorizerStateHandler = ActionListener.wrap( response -> { logger.info("Running DBQ on [" + indexName + "," + indexPattern + "] for job [" + jobId + "]"); - SearchRequest searchRequest = new SearchRequest(indexName, indexPattern); - DeleteByQueryRequest request = new DeleteByQueryRequest(searchRequest); + DeleteByQueryRequest request = new DeleteByQueryRequest(indexName, indexPattern); ConstantScoreQueryBuilder query = new ConstantScoreQueryBuilder(new TermQueryBuilder(Job.ID.getPreferredName(), jobId)); - searchRequest.source(new SearchSourceBuilder().query(query)); - searchRequest.indicesOptions(MlIndicesUtils.addIgnoreUnavailable(IndicesOptions.lenientExpandOpen())); + request.setQuery(query); + request.setIndicesOptions(MlIndicesUtils.addIgnoreUnavailable(IndicesOptions.lenientExpandOpen())); request.setSlices(5); request.setAbortOnVersionConflict(false); request.setRefresh(true); @@ -125,14 +122,13 @@ public void delete(String jobId, Client client, ClusterState state, private void deleteQuantiles(String jobId, Client client, ActionListener finishedHandler) { // The quantiles type and doc ID changed in v5.5 so delete both the old and new format - SearchRequest searchRequest = new SearchRequest(AnomalyDetectorsIndex.jobStateIndexName()); - DeleteByQueryRequest request = new DeleteByQueryRequest(searchRequest); + DeleteByQueryRequest request = new DeleteByQueryRequest(AnomalyDetectorsIndex.jobStateIndexName()); // Just use ID here, not type, as trying to delete different types spams the logs with an exception stack trace IdsQueryBuilder query = new IdsQueryBuilder().addIds(Quantiles.documentId(jobId), // TODO: remove in 7.0 Quantiles.v54DocumentId(jobId)); - searchRequest.source(new SearchSourceBuilder().query(query)); - searchRequest.indicesOptions(MlIndicesUtils.addIgnoreUnavailable(IndicesOptions.lenientExpandOpen())); + request.setQuery(query); + request.setIndicesOptions(MlIndicesUtils.addIgnoreUnavailable(IndicesOptions.lenientExpandOpen())); request.setAbortOnVersionConflict(false); request.setRefresh(true); @@ -162,14 +158,13 @@ private void deleteModelState(String jobId, Client client, ActionListener finishedHandler) { // The categorizer state type and doc ID changed in v5.5 so delete both the old and new format - SearchRequest searchRequest = new SearchRequest(AnomalyDetectorsIndex.jobStateIndexName()); - DeleteByQueryRequest request = new DeleteByQueryRequest(searchRequest); + DeleteByQueryRequest request = new DeleteByQueryRequest(AnomalyDetectorsIndex.jobStateIndexName()); // Just use ID here, not type, as trying to delete different types spams the logs with an exception stack trace IdsQueryBuilder query = new IdsQueryBuilder().addIds(CategorizerState.documentId(jobId, docNum), // TODO: remove in 7.0 CategorizerState.v54DocumentId(jobId, docNum)); - searchRequest.source(new SearchSourceBuilder().query(query)); - searchRequest.indicesOptions(MlIndicesUtils.addIgnoreUnavailable(IndicesOptions.lenientExpandOpen())); + request.setQuery(query); + request.setIndicesOptions(MlIndicesUtils.addIgnoreUnavailable(IndicesOptions.lenientExpandOpen())); request.setAbortOnVersionConflict(false); request.setRefresh(true); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteCalendarAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteCalendarAction.java index 17ee493246e43..56a02cc847e4b 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteCalendarAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteCalendarAction.java @@ -7,7 +7,6 @@ import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.action.support.master.AcknowledgedResponse; @@ -19,7 +18,6 @@ import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.reindex.DeleteByQueryAction; import org.elasticsearch.index.reindex.DeleteByQueryRequest; -import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.ml.MlMetaIndex; @@ -76,15 +74,12 @@ protected void doExecute(DeleteCalendarAction.Request request, ActionListener findForecastsToDelete(SearchResponse searchRe } private DeleteByQueryRequest buildDeleteByQuery(List forecastsToDelete) { - SearchRequest searchRequest = new SearchRequest(); - // We need to create the DeleteByQueryRequest before we modify the SearchRequest - // because the constructor of the former wipes the latter - DeleteByQueryRequest request = new DeleteByQueryRequest(searchRequest); + DeleteByQueryRequest request = new DeleteByQueryRequest(); request.setSlices(5); - searchRequest.indices(RESULTS_INDEX_PATTERN); + request.indices(RESULTS_INDEX_PATTERN); BoolQueryBuilder boolQuery = QueryBuilders.boolQuery().minimumShouldMatch(1); boolQuery.must(QueryBuilders.termsQuery(Result.RESULT_TYPE.getPreferredName(), ForecastRequestStats.RESULT_TYPE_VALUE, Forecast.RESULT_TYPE_VALUE)); @@ -157,7 +154,7 @@ private DeleteByQueryRequest buildDeleteByQuery(List forec .must(QueryBuilders.termQuery(Forecast.FORECAST_ID.getPreferredName(), forecastToDelete.getForecastId()))); } QueryBuilder query = QueryBuilders.boolQuery().filter(boolQuery); - searchRequest.source(new SearchSourceBuilder().query(query)); + request.setQuery(query); return request; } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemover.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemover.java index f59fdddedecdb..c882c90116880 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemover.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemover.java @@ -8,7 +8,6 @@ import org.apache.logging.log4j.Logger; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.logging.Loggers; @@ -17,7 +16,6 @@ import org.elasticsearch.index.reindex.BulkByScrollResponse; import org.elasticsearch.index.reindex.DeleteByQueryAction; import org.elasticsearch.index.reindex.DeleteByQueryRequest; -import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.messages.Messages; import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; @@ -87,19 +85,16 @@ public void onFailure(Exception e) { } private DeleteByQueryRequest createDBQRequest(Job job, long cutoffEpochMs) { - SearchRequest searchRequest = new SearchRequest(); - // We need to create the DeleteByQueryRequest before we modify the SearchRequest - // because the constructor of the former wipes the latter - DeleteByQueryRequest request = new DeleteByQueryRequest(searchRequest); + DeleteByQueryRequest request = new DeleteByQueryRequest(); request.setSlices(5); - searchRequest.indices(AnomalyDetectorsIndex.jobResultsAliasedName(job.getId())); + request.indices(AnomalyDetectorsIndex.jobResultsAliasedName(job.getId())); QueryBuilder excludeFilter = QueryBuilders.termsQuery(Result.RESULT_TYPE.getPreferredName(), ModelSizeStats.RESULT_TYPE_VALUE, ForecastRequestStats.RESULT_TYPE_VALUE, Forecast.RESULT_TYPE_VALUE); QueryBuilder query = createQuery(job.getId(), cutoffEpochMs) .filter(QueryBuilders.existsQuery(Result.RESULT_TYPE.getPreferredName())) .mustNot(excludeFilter); - searchRequest.source(new SearchSourceBuilder().query(query)); + request.setQuery(query); return request; } diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/ExpiredTokenRemover.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/ExpiredTokenRemover.java index b8ae5c944419a..78670dd99f6cf 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/ExpiredTokenRemover.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/ExpiredTokenRemover.java @@ -9,7 +9,6 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.bulk.BulkItemResponse; -import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.client.Client; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.settings.Settings; @@ -50,15 +49,14 @@ final class ExpiredTokenRemover extends AbstractRunnable { @Override public void doRun() { - SearchRequest searchRequest = new SearchRequest(SecurityIndexManager.SECURITY_INDEX_NAME); - DeleteByQueryRequest expiredDbq = new DeleteByQueryRequest(searchRequest); + DeleteByQueryRequest expiredDbq = new DeleteByQueryRequest(SecurityIndexManager.SECURITY_INDEX_NAME); if (timeout != TimeValue.MINUS_ONE) { expiredDbq.setTimeout(timeout); - searchRequest.source().timeout(timeout); + expiredDbq.getSearchRequest().source().timeout(timeout); } final Instant now = Instant.now(); - searchRequest.source() - .query(QueryBuilders.boolQuery() + expiredDbq + .setQuery(QueryBuilders.boolQuery() .filter(QueryBuilders.termsQuery("doc_type", TokenService.INVALIDATED_TOKEN_DOC_TYPE, "token")) .filter(QueryBuilders.boolQuery() .should(QueryBuilders.rangeQuery("expiration_time").lte(now.toEpochMilli()))