From 384757deffb1bd88781c2d5c3459ca25f15792a3 Mon Sep 17 00:00:00 2001 From: Jake Landis Date: Mon, 17 Dec 2018 16:25:11 -0600 Subject: [PATCH] ingest: support default pipelines + bulk upserts (#36618) This commit adds support to enable bulk upserts to use an index's default pipeline. Bulk upsert, doc_as_upsert, and script_as_upsert are all supported. However, bulk script_as_upsert has slightly surprising behavior since the pipeline is executed _before_ the script is evaluated. This means that the pipeline only has access the data found in the upsert field of the script_as_upsert. The non-bulk script_as_upsert (existing behavior) runs the pipeline _after_ the script is executed. This commit does _not_ attempt to consolidate the bulk and non-bulk behavior for script_as_upsert. This commit also adds additional testing for the non-bulk behavior, which remains unchanged with this commit. fixes #36219 --- .../test/ingest/200_default_pipeline.yml | 101 ++++++++++++++++-- .../action/bulk/TransportBulkAction.java | 30 ++++-- .../elasticsearch/ingest/IngestService.java | 10 +- .../bulk/TransportBulkActionIngestTests.java | 54 ++++++++++ .../action/bulk/TransportBulkActionTests.java | 21 ++++ 5 files changed, 196 insertions(+), 20 deletions(-) diff --git a/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/200_default_pipeline.yml b/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/200_default_pipeline.yml index 4695991f3c3b1..d4b39c5e99ac2 100644 --- a/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/200_default_pipeline.yml +++ b/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/200_default_pipeline.yml @@ -23,7 +23,7 @@ teardown: ] } - match: { acknowledged: true } - +# default pipeline via index - do: indices.create: index: test @@ -48,7 +48,7 @@ teardown: id: 1 - match: { _source.bytes_source_field: "1kb" } - match: { _source.bytes_target_field: 1024 } - +# default pipeline via alias - do: index: index: test_alias @@ -63,12 +63,101 @@ teardown: id: 2 - match: { _source.bytes_source_field: "1kb" } - match: { _source.bytes_target_field: 1024 } +# default pipeline via upsert + - do: + update: + index: test + type: test + id: 3 + body: + script: + source: "ctx._source.ran_script = true" + lang: "painless" + upsert: { "bytes_source_field":"1kb" } + - do: + get: + index: test + type: test + id: 3 + - match: { _source.bytes_source_field: "1kb" } + - match: { _source.bytes_target_field: 1024 } +# default pipeline via scripted upsert + - do: + update: + index: test + type: test + id: 4 + body: + script: + source: "ctx._source.bytes_source_field = '1kb'" + lang: "painless" + upsert : {} + scripted_upsert: true + - do: + get: + index: test + type: test + id: 4 + - match: { _source.bytes_source_field: "1kb" } + - match: { _source.bytes_target_field: 1024 } +# default pipeline via doc_as_upsert + - do: + update: + index: test + type: test + id: 5 + body: + doc: { "bytes_source_field":"1kb" } + doc_as_upsert: true + - do: + get: + index: test + type: test + id: 5 + - match: { _source.bytes_source_field: "1kb" } + - match: { _source.bytes_target_field: 1024 } +# default pipeline via bulk upsert +# note - bulk scripted upsert's execute the pipeline before the script, so any data referenced by the pipeline +# needs to be in the upsert, not the script + - do: + bulk: + refresh: true + body: | + {"update":{"_id":"6","_index":"test","_type":"test"}} + {"script":"ctx._source.ran_script = true","upsert":{"bytes_source_field":"1kb"}} + {"update":{"_id":"7","_index":"test","_type":"test"}} + {"doc":{"bytes_source_field":"2kb"}, "doc_as_upsert":true} + {"update":{"_id":"8","_index":"test","_type":"test"}} + {"script": "ctx._source.ran_script = true","upsert":{"bytes_source_field":"3kb"}, "scripted_upsert" : true} + - do: + mget: + body: + docs: + - { _index: "test", _type: "_doc", _id: "6" } + - { _index: "test", _type: "_doc", _id: "7" } + - { _index: "test", _type: "_doc", _id: "8" } + - match: { docs.0._index: "test" } + - match: { docs.0._id: "6" } + - match: { docs.0._source.bytes_source_field: "1kb" } + - match: { docs.0._source.bytes_target_field: 1024 } + - is_false: docs.0._source.ran_script + - match: { docs.1._index: "test" } + - match: { docs.1._id: "7" } + - match: { docs.1._source.bytes_source_field: "2kb" } + - match: { docs.1._source.bytes_target_field: 2048 } + - match: { docs.2._index: "test" } + - match: { docs.2._id: "8" } + - match: { docs.2._source.bytes_source_field: "3kb" } + - match: { docs.2._source.bytes_target_field: 3072 } + - match: { docs.2._source.ran_script: true } + +# explicit no default pipeline - do: index: index: test type: test - id: 3 + id: 9 pipeline: "_none" body: {bytes_source_field: "1kb"} @@ -76,15 +165,15 @@ teardown: get: index: test type: test - id: 3 + id: 9 - match: { _source.bytes_source_field: "1kb" } - is_false: _source.bytes_target_field - +# bad request - do: catch: bad_request index: index: test type: test - id: 4 + id: 10 pipeline: "" body: {bytes_source_field: "1kb"} diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java index fa294a1bb2b6c..a89d162979f5f 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java @@ -127,6 +127,24 @@ public TransportBulkAction(ThreadPool threadPool, TransportService transportServ clusterService.addStateApplier(this.ingestForwarder); } + /** + * Retrieves the {@link IndexRequest} from the provided {@link DocWriteRequest} for index or upsert actions. Upserts are + * modeled as {@link IndexRequest} inside the {@link UpdateRequest}. Ignores {@link org.elasticsearch.action.delete.DeleteRequest}'s + * + * @param docWriteRequest The request to find the {@link IndexRequest} + * @return the found {@link IndexRequest} or {@code null} if one can not be found. + */ + public static IndexRequest getIndexWriteRequest(DocWriteRequest docWriteRequest) { + IndexRequest indexRequest = null; + if (docWriteRequest instanceof IndexRequest) { + indexRequest = (IndexRequest) docWriteRequest; + } else if (docWriteRequest instanceof UpdateRequest) { + UpdateRequest updateRequest = (UpdateRequest) docWriteRequest; + indexRequest = updateRequest.docAsUpsert() ? updateRequest.doc() : updateRequest.upsertRequest(); + } + return indexRequest; + } + @Override protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener listener) { final long startTime = relativeTime(); @@ -207,12 +225,12 @@ private void executeIngestAndBulk(Task task, final BulkRequest bulkRequest, fina final MetaData metaData = clusterService.state().getMetaData(); ImmutableOpenMap indicesMetaData = metaData.indices(); for (DocWriteRequest actionRequest : bulkRequest.requests) { - if (actionRequest instanceof IndexRequest) { - IndexRequest indexRequest = (IndexRequest) actionRequest; + IndexRequest indexRequest = getIndexWriteRequest(actionRequest); + if(indexRequest != null){ String pipeline = indexRequest.getPipeline(); if (pipeline == null) { - IndexMetaData indexMetaData = indicesMetaData.get(indexRequest.index()); - if (indexMetaData == null) { + IndexMetaData indexMetaData = indicesMetaData.get(actionRequest.index()); + if (indexMetaData == null && indexRequest.index() != null) { //check the alias AliasOrIndex indexOrAlias = metaData.getAliasAndIndexLookup().get(indexRequest.index()); if (indexOrAlias != null && indexOrAlias.isAlias()) { @@ -626,7 +644,7 @@ ActionListener wrapActionListenerIfNeeded(long ingestTookInMillis, } void markCurrentItemAsDropped() { - IndexRequest indexRequest = (IndexRequest) bulkRequest.requests().get(currentSlot); + IndexRequest indexRequest = getIndexWriteRequest(bulkRequest.requests().get(currentSlot)); failedSlots.set(currentSlot); itemResponses.add( new BulkItemResponse(currentSlot, indexRequest.opType(), @@ -639,7 +657,7 @@ void markCurrentItemAsDropped() { } void markCurrentItemAsFailed(Exception e) { - IndexRequest indexRequest = (IndexRequest) bulkRequest.requests().get(currentSlot); + IndexRequest indexRequest = getIndexWriteRequest(bulkRequest.requests().get(currentSlot)); // We hit a error during preprocessing a request, so we: // 1) Remember the request item slot from the bulk, so that we're done processing all requests we know what failed // 2) Add a bulk item failure for this request diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index 705e77028a1ef..6951e33d5e741 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -24,11 +24,11 @@ import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.DocWriteRequest; +import org.elasticsearch.action.bulk.TransportBulkAction; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.ingest.DeletePipelineRequest; import org.elasticsearch.action.ingest.PutPipelineRequest; import org.elasticsearch.action.support.master.AcknowledgedResponse; -import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.cluster.AckedClusterStateUpdateTask; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; @@ -388,13 +388,7 @@ public void onFailure(Exception e) { @Override protected void doRun() { for (DocWriteRequest actionRequest : actionRequests) { - IndexRequest indexRequest = null; - if (actionRequest instanceof IndexRequest) { - indexRequest = (IndexRequest) actionRequest; - } else if (actionRequest instanceof UpdateRequest) { - UpdateRequest updateRequest = (UpdateRequest) actionRequest; - indexRequest = updateRequest.docAsUpsert() ? updateRequest.doc() : updateRequest.upsertRequest(); - } + IndexRequest indexRequest = TransportBulkAction.getIndexWriteRequest(actionRequest); if (indexRequest == null) { continue; } diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java index f25f8844153a5..219aee9ebe2ff 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java @@ -28,6 +28,7 @@ import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.AutoCreateIndex; +import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateApplier; @@ -408,6 +409,57 @@ public void testUseDefaultPipelineWithAlias() throws Exception { validateDefaultPipeline(new IndexRequest(WITH_DEFAULT_PIPELINE_ALIAS, "type", "id")); } + public void testUseDefaultPipelineWithBulkUpsert() throws Exception { + Exception exception = new Exception("fake exception"); + BulkRequest bulkRequest = new BulkRequest(); + IndexRequest indexRequest1 = new IndexRequest(WITH_DEFAULT_PIPELINE, "type", "id1").source(Collections.emptyMap()); + IndexRequest indexRequest2 = new IndexRequest(WITH_DEFAULT_PIPELINE, "type", "id2").source(Collections.emptyMap()); + IndexRequest indexRequest3 = new IndexRequest(WITH_DEFAULT_PIPELINE, "type", "id3").source(Collections.emptyMap()); + UpdateRequest upsertRequest = new UpdateRequest(WITH_DEFAULT_PIPELINE, "type", "id1").upsert(indexRequest1).script(mockScript("1")); + UpdateRequest docAsUpsertRequest = new UpdateRequest(WITH_DEFAULT_PIPELINE, "type", "id2").doc(indexRequest2).docAsUpsert(true); + // this test only covers the mechanics that scripted bulk upserts will execute a default pipeline. However, in practice scripted + // bulk upserts with a default pipeline are a bit surprising since the script executes AFTER the pipeline. + UpdateRequest scriptedUpsert = new UpdateRequest(WITH_DEFAULT_PIPELINE, "type", "id2").upsert(indexRequest3).script(mockScript("1")) + .scriptedUpsert(true); + bulkRequest.add(upsertRequest).add(docAsUpsertRequest).add(scriptedUpsert); + + AtomicBoolean responseCalled = new AtomicBoolean(false); + AtomicBoolean failureCalled = new AtomicBoolean(false); + assertNull(indexRequest1.getPipeline()); + assertNull(indexRequest2.getPipeline()); + assertNull(indexRequest3.getPipeline()); + action.execute(null, bulkRequest, ActionListener.wrap( + response -> { + BulkItemResponse itemResponse = response.iterator().next(); + assertThat(itemResponse.getFailure().getMessage(), containsString("fake exception")); + responseCalled.set(true); + }, + e -> { + assertThat(e, sameInstance(exception)); + failureCalled.set(true); + })); + + // check failure works, and passes through to the listener + assertFalse(action.isExecuted); // haven't executed yet + assertFalse(responseCalled.get()); + assertFalse(failureCalled.get()); + verify(ingestService).executeBulkRequest(bulkDocsItr.capture(), failureHandler.capture(), completionHandler.capture(), any()); + assertEquals(indexRequest1.getPipeline(), "default_pipeline"); + assertEquals(indexRequest2.getPipeline(), "default_pipeline"); + assertEquals(indexRequest3.getPipeline(), "default_pipeline"); + completionHandler.getValue().accept(exception); + assertTrue(failureCalled.get()); + + // now check success of the transport bulk action + indexRequest1.setPipeline(IngestService.NOOP_PIPELINE_NAME); // this is done by the real pipeline execution service when processing + indexRequest2.setPipeline(IngestService.NOOP_PIPELINE_NAME); // this is done by the real pipeline execution service when processing + indexRequest3.setPipeline(IngestService.NOOP_PIPELINE_NAME); // this is done by the real pipeline execution service when processing + completionHandler.getValue().accept(null); + assertTrue(action.isExecuted); + assertFalse(responseCalled.get()); // listener would only be called by real index action, not our mocked one + verifyZeroInteractions(transportService); + } + public void testCreateIndexBeforeRunPipeline() throws Exception { Exception exception = new Exception("fake exception"); IndexRequest indexRequest = new IndexRequest("missing_index", "type", "id"); @@ -445,6 +497,7 @@ private void validateDefaultPipeline(IndexRequest indexRequest) { indexRequest.source(Collections.emptyMap()); AtomicBoolean responseCalled = new AtomicBoolean(false); AtomicBoolean failureCalled = new AtomicBoolean(false); + assertNull(indexRequest.getPipeline()); singleItemBulkWriteAction.execute(null, indexRequest, ActionListener.wrap( response -> { responseCalled.set(true); @@ -459,6 +512,7 @@ private void validateDefaultPipeline(IndexRequest indexRequest) { assertFalse(responseCalled.get()); assertFalse(failureCalled.get()); verify(ingestService).executeBulkRequest(bulkDocsItr.capture(), failureHandler.capture(), completionHandler.capture(), any()); + assertEquals(indexRequest.getPipeline(), "default_pipeline"); completionHandler.getValue().accept(exception); assertTrue(failureCalled.get()); diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java index a058cf477411f..162ef56553df4 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java @@ -23,8 +23,10 @@ import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; import org.elasticsearch.action.bulk.TransportBulkActionTookTests.Resolver; import org.elasticsearch.action.delete.DeleteRequest; +import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.AutoCreateIndex; +import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; @@ -132,4 +134,23 @@ public void testDeleteNonExistingDocExternalGteVersionCreatesIndex() throws Exce throw new AssertionError(exception); })); } + + public void testGetIndexWriteRequest() throws Exception { + IndexRequest indexRequest = new IndexRequest("index", "type", "id1").source(Collections.emptyMap()); + UpdateRequest upsertRequest = new UpdateRequest("index", "type", "id1").upsert(indexRequest).script(mockScript("1")); + UpdateRequest docAsUpsertRequest = new UpdateRequest("index", "type", "id2").doc(indexRequest).docAsUpsert(true); + UpdateRequest scriptedUpsert = new UpdateRequest("index", "type", "id2").upsert(indexRequest).script(mockScript("1")) + .scriptedUpsert(true); + + assertEquals(TransportBulkAction.getIndexWriteRequest(indexRequest), indexRequest); + assertEquals(TransportBulkAction.getIndexWriteRequest(upsertRequest), indexRequest); + assertEquals(TransportBulkAction.getIndexWriteRequest(docAsUpsertRequest), indexRequest); + assertEquals(TransportBulkAction.getIndexWriteRequest(scriptedUpsert), indexRequest); + + DeleteRequest deleteRequest = new DeleteRequest("index", "id"); + assertNull(TransportBulkAction.getIndexWriteRequest(deleteRequest)); + + UpdateRequest badUpsertRequest = new UpdateRequest("index", "type", "id1"); + assertNull(TransportBulkAction.getIndexWriteRequest(badUpsertRequest)); + } }