diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/IngestClient.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/IngestClient.java index 5c5a82b52f438..340e14653971b 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/IngestClient.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/IngestClient.java @@ -24,6 +24,8 @@ import org.elasticsearch.action.ingest.GetPipelineRequest; import org.elasticsearch.action.ingest.GetPipelineResponse; import org.elasticsearch.action.ingest.PutPipelineRequest; +import org.elasticsearch.action.ingest.SimulatePipelineRequest; +import org.elasticsearch.action.ingest.SimulatePipelineResponse; import org.elasticsearch.action.ingest.WritePipelineResponse; import java.io.IOException; @@ -125,4 +127,37 @@ public void deletePipelineAsync(DeletePipelineRequest request, RequestOptions op restHighLevelClient.performRequestAsyncAndParseEntity( request, RequestConverters::deletePipeline, options, WritePipelineResponse::fromXContent, listener, emptySet()); } + + /** + * Simulate a pipeline on a set of documents provided in the request + *

+ * See + * + * Simulate Pipeline API on elastic.co + * @param request the request + * @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized + * @return the response + * @throws IOException in case there is a problem sending the request or parsing back the response + */ + public SimulatePipelineResponse simulatePipeline(SimulatePipelineRequest request, RequestOptions options) throws IOException { + return restHighLevelClient.performRequestAndParseEntity( request, RequestConverters::simulatePipeline, options, + SimulatePipelineResponse::fromXContent, emptySet()); + } + + /** + * Asynchronously simulate a pipeline on a set of documents provided in the request + *

+ * See + * + * Simulate Pipeline API on elastic.co + * @param request the request + * @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized + * @param listener the listener to be notified upon request completion + */ + public void simulatePipelineAsync(SimulatePipelineRequest request, + RequestOptions options, + ActionListener listener) { + restHighLevelClient.performRequestAsyncAndParseEntity( request, RequestConverters::simulatePipeline, options, + SimulatePipelineResponse::fromXContent, listener, emptySet()); + } } diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java index c6c53501e0dd6..3b92d09b8ed56 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java @@ -72,6 +72,7 @@ import org.elasticsearch.action.ingest.DeletePipelineRequest; import org.elasticsearch.action.ingest.PutPipelineRequest; import org.elasticsearch.action.ingest.GetPipelineRequest; +import org.elasticsearch.action.ingest.SimulatePipelineRequest; import org.elasticsearch.action.search.ClearScrollRequest; import org.elasticsearch.action.search.MultiSearchRequest; import org.elasticsearch.action.search.SearchRequest; @@ -927,6 +928,20 @@ static Request validateQuery(ValidateQueryRequest validateQueryRequest) throws I return request; } + static Request simulatePipeline(SimulatePipelineRequest simulatePipelineRequest) throws IOException { + EndpointBuilder builder = new EndpointBuilder().addPathPartAsIs("_ingest/pipeline"); + if (simulatePipelineRequest.getId() != null && !simulatePipelineRequest.getId().isEmpty()) { + builder.addPathPart(simulatePipelineRequest.getId()); + } + builder.addPathPartAsIs("_simulate"); + String endpoint = builder.build(); + Request request = new Request(HttpPost.METHOD_NAME, endpoint); + Params params = new Params(request); + params.putParam("verbose", Boolean.toString(simulatePipelineRequest.isVerbose())); + request.setEntity(createEntity(simulatePipelineRequest, REQUEST_BODY_CONTENT_TYPE)); + return request; + } + static Request getAlias(GetAliasesRequest getAliasesRequest) { String[] indices = getAliasesRequest.indices() == null ? Strings.EMPTY_ARRAY : getAliasesRequest.indices(); String[] aliases = getAliasesRequest.aliases() == null ? Strings.EMPTY_ARRAY : getAliasesRequest.aliases(); diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/ESRestHighLevelClientTestCase.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/ESRestHighLevelClientTestCase.java index 14fe0e01d31f9..d9d57a49b4f8a 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/ESRestHighLevelClientTestCase.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/ESRestHighLevelClientTestCase.java @@ -123,9 +123,7 @@ private HighLevelClient(RestClient restClient) { } } - protected static XContentBuilder buildRandomXContentPipeline() throws IOException { - XContentType xContentType = randomFrom(XContentType.values()); - XContentBuilder pipelineBuilder = XContentBuilder.builder(xContentType.xContent()); + protected static XContentBuilder buildRandomXContentPipeline(XContentBuilder pipelineBuilder) throws IOException { pipelineBuilder.startObject(); { pipelineBuilder.field(Pipeline.DESCRIPTION_KEY, "some random set of processors"); @@ -152,6 +150,12 @@ protected static XContentBuilder buildRandomXContentPipeline() throws IOExceptio return pipelineBuilder; } + protected static XContentBuilder buildRandomXContentPipeline() throws IOException { + XContentType xContentType = randomFrom(XContentType.values()); + XContentBuilder pipelineBuilder = XContentBuilder.builder(xContentType.xContent()); + return buildRandomXContentPipeline(pipelineBuilder); + } + protected static void createPipeline(String pipelineId) throws IOException { XContentBuilder builder = buildRandomXContentPipeline(); createPipeline(new PutPipelineRequest(pipelineId, BytesReference.bytes(builder), builder.contentType())); diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/IngestClientIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/IngestClientIT.java index ecc0d0052d415..6fd6f95059577 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/IngestClientIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/IngestClientIT.java @@ -23,12 +23,22 @@ import org.elasticsearch.action.ingest.GetPipelineRequest; import org.elasticsearch.action.ingest.GetPipelineResponse; import org.elasticsearch.action.ingest.PutPipelineRequest; +import org.elasticsearch.action.ingest.SimulateDocumentBaseResult; +import org.elasticsearch.action.ingest.SimulateDocumentResult; +import org.elasticsearch.action.ingest.SimulateDocumentVerboseResult; +import org.elasticsearch.action.ingest.SimulatePipelineRequest; +import org.elasticsearch.action.ingest.SimulatePipelineResponse; import org.elasticsearch.action.ingest.WritePipelineResponse; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.ingest.PipelineConfiguration; import java.io.IOException; +import java.util.List; + +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.core.IsInstanceOf.instanceOf; public class IngestClientIT extends ESRestHighLevelClientTestCase { @@ -80,4 +90,93 @@ public void testDeletePipeline() throws IOException { execute(request, highLevelClient().ingest()::deletePipeline, highLevelClient().ingest()::deletePipelineAsync); assertTrue(response.isAcknowledged()); } + + public void testSimulatePipeline() throws IOException { + testSimulatePipeline(false, false); + } + + public void testSimulatePipelineWithFailure() throws IOException { + testSimulatePipeline(false, true); + } + + public void testSimulatePipelineVerbose() throws IOException { + testSimulatePipeline(true, false); + } + + public void testSimulatePipelineVerboseWithFailure() throws IOException { + testSimulatePipeline(true, true); + } + + private void testSimulatePipeline(boolean isVerbose, + boolean isFailure) throws IOException { + XContentType xContentType = randomFrom(XContentType.values()); + XContentBuilder builder = XContentBuilder.builder(xContentType.xContent()); + String rankValue = isFailure ? "non-int" : Integer.toString(1234); + builder.startObject(); + { + builder.field("pipeline"); + buildRandomXContentPipeline(builder); + builder.startArray("docs"); + { + builder.startObject() + .field("_index", "index") + .field("_type", "doc") + .field("_id", "doc_" + 1) + .startObject("_source").field("foo", "rab_" + 1).field("rank", rankValue).endObject() + .endObject(); + } + builder.endArray(); + } + builder.endObject(); + + SimulatePipelineRequest request = new SimulatePipelineRequest( + BytesReference.bytes(builder), + builder.contentType() + ); + request.setVerbose(isVerbose); + SimulatePipelineResponse response = + execute(request, highLevelClient().ingest()::simulatePipeline, highLevelClient().ingest()::simulatePipelineAsync); + List results = response.getResults(); + assertEquals(1, results.size()); + if (isVerbose) { + assertThat(results.get(0), instanceOf(SimulateDocumentVerboseResult.class)); + SimulateDocumentVerboseResult verboseResult = (SimulateDocumentVerboseResult) results.get(0); + assertEquals(2, verboseResult.getProcessorResults().size()); + if (isFailure) { + assertNotNull(verboseResult.getProcessorResults().get(1).getFailure()); + assertThat(verboseResult.getProcessorResults().get(1).getFailure().getMessage(), + containsString("unable to convert [non-int] to integer")); + } else { + assertEquals( + verboseResult.getProcessorResults().get(0).getIngestDocument() + .getFieldValue("foo", String.class), + "bar" + ); + assertEquals( + Integer.valueOf(1234), + verboseResult.getProcessorResults().get(1).getIngestDocument() + .getFieldValue("rank", Integer.class) + ); + } + } else { + assertThat(results.get(0), instanceOf(SimulateDocumentBaseResult.class)); + SimulateDocumentBaseResult baseResult = (SimulateDocumentBaseResult)results.get(0); + if (isFailure) { + assertNotNull(baseResult.getFailure()); + assertThat(baseResult.getFailure().getMessage(), + containsString("unable to convert [non-int] to integer")); + } else { + assertNotNull(baseResult.getIngestDocument()); + assertEquals( + baseResult.getIngestDocument().getFieldValue("foo", String.class), + "bar" + ); + assertEquals( + Integer.valueOf(1234), + baseResult.getIngestDocument() + .getFieldValue("rank", Integer.class) + ); + } + } + } } diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java index e416b3bd29fe8..8035e1582c2dd 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java @@ -75,6 +75,7 @@ import org.elasticsearch.action.ingest.DeletePipelineRequest; import org.elasticsearch.action.ingest.GetPipelineRequest; import org.elasticsearch.action.ingest.PutPipelineRequest; +import org.elasticsearch.action.ingest.SimulatePipelineRequest; import org.elasticsearch.action.search.ClearScrollRequest; import org.elasticsearch.action.search.MultiSearchRequest; import org.elasticsearch.action.search.SearchRequest; @@ -1622,6 +1623,34 @@ public void testDeletePipeline() { assertEquals(expectedParams, expectedRequest.getParameters()); } + public void testSimulatePipeline() throws IOException { + String pipelineId = randomBoolean() ? "some_pipeline_id" : null; + boolean verbose = randomBoolean(); + String json = "{\"pipeline\":{" + + "\"description\":\"_description\"," + + "\"processors\":[{\"set\":{\"field\":\"field2\",\"value\":\"_value\"}}]}," + + "\"docs\":[{\"_index\":\"index\",\"_type\":\"_doc\",\"_id\":\"id\",\"_source\":{\"foo\":\"rab\"}}]}"; + SimulatePipelineRequest request = new SimulatePipelineRequest( + new BytesArray(json.getBytes(StandardCharsets.UTF_8)), + XContentType.JSON + ); + request.setId(pipelineId); + request.setVerbose(verbose); + Map expectedParams = new HashMap<>(); + expectedParams.put("verbose", Boolean.toString(verbose)); + + Request expectedRequest = RequestConverters.simulatePipeline(request); + StringJoiner endpoint = new StringJoiner("/", "/", ""); + endpoint.add("_ingest/pipeline"); + if (pipelineId != null && !pipelineId.isEmpty()) + endpoint.add(pipelineId); + endpoint.add("_simulate"); + assertEquals(endpoint.toString(), expectedRequest.getEndpoint()); + assertEquals(HttpPost.METHOD_NAME, expectedRequest.getMethod()); + assertEquals(expectedParams, expectedRequest.getParameters()); + assertToXContentBody(request, expectedRequest.getEntity()); + } + public void testClusterHealth() { ClusterHealthRequest healthRequest = new ClusterHealthRequest(); Map expectedParams = new HashMap<>(); diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/IngestClientDocumentationIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/IngestClientDocumentationIT.java index f5bdc9f2f3ee5..c53ec2b5d7cc7 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/IngestClientDocumentationIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/IngestClientDocumentationIT.java @@ -25,6 +25,12 @@ import org.elasticsearch.action.ingest.GetPipelineRequest; import org.elasticsearch.action.ingest.GetPipelineResponse; import org.elasticsearch.action.ingest.PutPipelineRequest; +import org.elasticsearch.action.ingest.SimulateDocumentBaseResult; +import org.elasticsearch.action.ingest.SimulateDocumentResult; +import org.elasticsearch.action.ingest.SimulateDocumentVerboseResult; +import org.elasticsearch.action.ingest.SimulatePipelineRequest; +import org.elasticsearch.action.ingest.SimulatePipelineResponse; +import org.elasticsearch.action.ingest.SimulateProcessorResult; import org.elasticsearch.action.ingest.WritePipelineResponse; import org.elasticsearch.client.ESRestHighLevelClientTestCase; import org.elasticsearch.client.RequestOptions; @@ -277,4 +283,109 @@ public void onFailure(Exception e) { } } + public void testSimulatePipeline() throws IOException { + RestHighLevelClient client = highLevelClient(); + + { + // tag::simulate-pipeline-request + String source = + "{\"" + + "pipeline\":{" + + "\"description\":\"_description\"," + + "\"processors\":[{\"set\":{\"field\":\"field2\",\"value\":\"_value\"}}]" + + "}," + + "\"docs\":[" + + "{\"_index\":\"index\",\"_type\":\"_doc\",\"_id\":\"id\",\"_source\":{\"foo\":\"bar\"}}," + + "{\"_index\":\"index\",\"_type\":\"_doc\",\"_id\":\"id\",\"_source\":{\"foo\":\"rab\"}}" + + "]" + + "}"; + SimulatePipelineRequest request = new SimulatePipelineRequest( + new BytesArray(source.getBytes(StandardCharsets.UTF_8)), // <1> + XContentType.JSON // <2> + ); + // end::simulate-pipeline-request + + // tag::simulate-pipeline-request-pipeline-id + request.setId("my-pipeline-id"); // <1> + // end::simulate-pipeline-request-pipeline-id + + // For testing we set this back to null + request.setId(null); + + // tag::simulate-pipeline-request-verbose + request.setVerbose(true); // <1> + // end::simulate-pipeline-request-verbose + + // tag::simulate-pipeline-execute + SimulatePipelineResponse response = client.ingest().simulatePipeline(request, RequestOptions.DEFAULT); // <1> + // end::simulate-pipeline-execute + + // tag::simulate-pipeline-response + for (SimulateDocumentResult result: response.getResults()) { // <1> + if (request.isVerbose()) { + assert result instanceof SimulateDocumentVerboseResult; + SimulateDocumentVerboseResult verboseResult = (SimulateDocumentVerboseResult)result; // <2> + for (SimulateProcessorResult processorResult: verboseResult.getProcessorResults()) { // <3> + processorResult.getIngestDocument(); // <4> + processorResult.getFailure(); // <5> + } + } else { + assert result instanceof SimulateDocumentBaseResult; + SimulateDocumentBaseResult baseResult = (SimulateDocumentBaseResult)result; // <6> + baseResult.getIngestDocument(); // <7> + baseResult.getFailure(); // <8> + } + } + // end::simulate-pipeline-response + assert(response.getResults().size() > 0); + } + } + + public void testSimulatePipelineAsync() throws Exception { + RestHighLevelClient client = highLevelClient(); + + { + String source = + "{\"" + + "pipeline\":{" + + "\"description\":\"_description\"," + + "\"processors\":[{\"set\":{\"field\":\"field2\",\"value\":\"_value\"}}]" + + "}," + + "\"docs\":[" + + "{\"_index\":\"index\",\"_type\":\"_doc\",\"_id\":\"id\",\"_source\":{\"foo\":\"bar\"}}," + + "{\"_index\":\"index\",\"_type\":\"_doc\",\"_id\":\"id\",\"_source\":{\"foo\":\"rab\"}}" + + "]" + + "}"; + SimulatePipelineRequest request = new SimulatePipelineRequest( + new BytesArray(source.getBytes(StandardCharsets.UTF_8)), + XContentType.JSON + ); + + // tag::simulate-pipeline-execute-listener + ActionListener listener = + new ActionListener() { + @Override + public void onResponse(SimulatePipelineResponse response) { + // <1> + } + + @Override + public void onFailure(Exception e) { + // <2> + } + }; + // end::simulate-pipeline-execute-listener + + // Replace the empty listener by a blocking listener in test + final CountDownLatch latch = new CountDownLatch(1); + listener = new LatchedActionListener<>(listener, latch); + + // tag::simulate-pipeline-execute-async + client.ingest().simulatePipelineAsync(request, RequestOptions.DEFAULT, listener); // <1> + // end::simulate-pipeline-execute-async + + assertTrue(latch.await(30L, TimeUnit.SECONDS)); + } + } + } diff --git a/docs/java-rest/high-level/ingest/simulate_pipeline.asciidoc b/docs/java-rest/high-level/ingest/simulate_pipeline.asciidoc new file mode 100644 index 0000000000000..9d1bbd06ceb26 --- /dev/null +++ b/docs/java-rest/high-level/ingest/simulate_pipeline.asciidoc @@ -0,0 +1,90 @@ +[[java-rest-high-ingest-simulate-pipeline]] +=== Simulate Pipeline API + +[[java-rest-high-ingest-simulate-pipeline-request]] +==== Simulate Pipeline Request + +A `SimulatePipelineRequest` requires a source and a `XContentType`. The source consists +of the request body. See the https://www.elastic.co/guide/en/elasticsearch/reference/master/simulate-pipeline-api.html[docs] +for more details on the request body. + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/IngestClientDocumentationIT.java[simulate-pipeline-request] +-------------------------------------------------- +<1> The request body as a `ByteArray`. +<2> The XContentType for the request body supplied above. + +==== Optional arguments +The following arguments can optionally be provided: + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/IngestClientDocumentationIT.java[simulate-pipeline-request-pipeline-id] +-------------------------------------------------- +<1> You can either specify an existing pipeline to execute against the provided documents, or supply a +pipeline definition in the body of the request. This option sets the id for an existing pipeline. + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/IngestClientDocumentationIT.java[simulate-pipeline-request-verbose] +-------------------------------------------------- +<1> To see the intermediate results of each processor in the simulate request, you can add the verbose parameter +to the request. + +[[java-rest-high-ingest-simulate-pipeline-sync]] +==== Synchronous Execution + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/IngestClientDocumentationIT.java[simulate-pipeline-execute] +-------------------------------------------------- +<1> Execute the request and get back the response in a `SimulatePipelineResponse` object. + +[[java-rest-high-ingest-simulate-pipeline-async]] +==== Asynchronous Execution + +The asynchronous execution of a simulate pipeline request requires both the `SimulatePipelineRequest` +instance and an `ActionListener` instance to be passed to the asynchronous +method: + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/IngestClientDocumentationIT.java[simulate-pipeline-execute-async] +-------------------------------------------------- +<1> The `SimulatePipelineRequest` to execute and the `ActionListener` to use when +the execution completes + +The asynchronous method does not block and returns immediately. Once it is +completed the `ActionListener` is called back using the `onResponse` method +if the execution successfully completed or using the `onFailure` method if +it failed. + +A typical listener for `SimulatePipelineResponse` looks like: + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/IngestClientDocumentationIT.java[simulate-pipeline-execute-listener] +-------------------------------------------------- +<1> Called when the execution is successfully completed. The response is +provided as an argument +<2> Called in case of failure. The raised exception is provided as an argument + +[[java-rest-high-ingest-simulate-pipeline-response]] +==== Simulate Pipeline Response + +The returned `SimulatePipelineResponse` allows to retrieve information about the executed + operation as follows: + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/IngestClientDocumentationIT.java[simulate-pipeline-response] +-------------------------------------------------- +<1> Get results for each of the documents provided as instance of `List`. +<2> If the request was in verbose mode cast the response to `SimulateDocumentVerboseResult`. +<3> Check the result after each processor is applied. +<4> Get the ingest document for the result obtained in 3. +<5> Or get the failure for the result obtained in 3. +<6> Get the result as `SimulateDocumentBaseResult` if the result was not verbose. +<7> Get the ingest document for the result obtained in 6. +<8> Or get the failure for the result obtained in 6. diff --git a/docs/java-rest/high-level/supported-apis.asciidoc b/docs/java-rest/high-level/supported-apis.asciidoc index 3caab5100ca0f..9ed54db817551 100644 --- a/docs/java-rest/high-level/supported-apis.asciidoc +++ b/docs/java-rest/high-level/supported-apis.asciidoc @@ -125,10 +125,12 @@ The Java High Level REST Client supports the following Ingest APIs: * <> * <> * <> +* <> include::ingest/put_pipeline.asciidoc[] include::ingest/get_pipeline.asciidoc[] include::ingest/delete_pipeline.asciidoc[] +include::ingest/simulate_pipeline.asciidoc[] == Snapshot APIs diff --git a/server/src/main/java/org/elasticsearch/action/ingest/SimulateDocumentBaseResult.java b/server/src/main/java/org/elasticsearch/action/ingest/SimulateDocumentBaseResult.java index c6252feea276c..f7f76a2bbca7d 100644 --- a/server/src/main/java/org/elasticsearch/action/ingest/SimulateDocumentBaseResult.java +++ b/server/src/main/java/org/elasticsearch/action/ingest/SimulateDocumentBaseResult.java @@ -19,13 +19,18 @@ package org.elasticsearch.action.ingest; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.common.ParseField; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.xcontent.ConstructingObjectParser; import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.ingest.IngestDocument; import java.io.IOException; +import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg; + /** * Holds the end result of what a pipeline did to sample document provided via the simulate api. */ @@ -33,6 +38,33 @@ public final class SimulateDocumentBaseResult implements SimulateDocumentResult private final WriteableIngestDocument ingestDocument; private final Exception failure; + public static final ConstructingObjectParser PARSER = + new ConstructingObjectParser<>( + "simulate_document_base_result", + true, + a -> { + if (a[1] == null) { + assert a[0] != null; + return new SimulateDocumentBaseResult(((WriteableIngestDocument)a[0]).getIngestDocument()); + } else { + assert a[0] == null; + return new SimulateDocumentBaseResult((ElasticsearchException)a[1]); + } + } + ); + static { + PARSER.declareObject( + optionalConstructorArg(), + WriteableIngestDocument.INGEST_DOC_PARSER, + new ParseField(WriteableIngestDocument.DOC_FIELD) + ); + PARSER.declareObject( + optionalConstructorArg(), + (p, c) -> ElasticsearchException.fromXContent(p), + new ParseField("error") + ); + } + public SimulateDocumentBaseResult(IngestDocument ingestDocument) { this.ingestDocument = new WriteableIngestDocument(ingestDocument); failure = null; @@ -89,4 +121,8 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.endObject(); return builder; } + + public static SimulateDocumentBaseResult fromXContent(XContentParser parser) { + return PARSER.apply(parser, null); + } } diff --git a/server/src/main/java/org/elasticsearch/action/ingest/SimulateDocumentVerboseResult.java b/server/src/main/java/org/elasticsearch/action/ingest/SimulateDocumentVerboseResult.java index 21e802981850c..099e238f2d25e 100644 --- a/server/src/main/java/org/elasticsearch/action/ingest/SimulateDocumentVerboseResult.java +++ b/server/src/main/java/org/elasticsearch/action/ingest/SimulateDocumentVerboseResult.java @@ -18,21 +18,38 @@ */ package org.elasticsearch.action.ingest; +import org.elasticsearch.common.ParseField; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.xcontent.ConstructingObjectParser; import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; import java.io.IOException; import java.util.ArrayList; import java.util.List; +import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg; + /** * Holds the result of what a pipeline did to a sample document via the simulate api, but instead of {@link SimulateDocumentBaseResult} * this result class holds the intermediate result each processor did to the sample document. */ public final class SimulateDocumentVerboseResult implements SimulateDocumentResult { + public static final String PROCESSOR_RESULT_FIELD = "processor_results"; private final List processorResults; + @SuppressWarnings("unchecked") + public static final ConstructingObjectParser PARSER = + new ConstructingObjectParser<>( + "simulate_document_verbose_result", + true, + a -> new SimulateDocumentVerboseResult((List)a[0]) + ); + static { + PARSER.declareObjectArray(constructorArg(), SimulateProcessorResult.PARSER, new ParseField(PROCESSOR_RESULT_FIELD)); + } + public SimulateDocumentVerboseResult(List processorResults) { this.processorResults = processorResults; } @@ -63,7 +80,7 @@ public List getProcessorResults() { @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); - builder.startArray("processor_results"); + builder.startArray(PROCESSOR_RESULT_FIELD); for (SimulateProcessorResult processorResult : processorResults) { processorResult.toXContent(builder, params); } @@ -71,4 +88,8 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.endObject(); return builder; } + + public static SimulateDocumentVerboseResult fromXContent(XContentParser parser) { + return PARSER.apply(parser, null); + } } diff --git a/server/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineRequest.java b/server/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineRequest.java index 205add8cc543b..53a7ec1d1f7c9 100644 --- a/server/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineRequest.java +++ b/server/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineRequest.java @@ -25,6 +25,8 @@ import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.VersionType; @@ -42,7 +44,7 @@ import static org.elasticsearch.ingest.IngestDocument.MetaData; -public class SimulatePipelineRequest extends ActionRequest { +public class SimulatePipelineRequest extends ActionRequest implements ToXContentObject { private String id; private boolean verbose; @@ -122,6 +124,12 @@ public void writeTo(StreamOutput out) throws IOException { } } + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.rawValue(source.streamInput(), xContentType); + return builder; + } + public static final class Fields { static final String PIPELINE = "pipeline"; static final String DOCS = "docs"; diff --git a/server/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineResponse.java b/server/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineResponse.java index e9ea1a7750738..991e81a14553b 100644 --- a/server/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineResponse.java +++ b/server/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineResponse.java @@ -19,22 +19,90 @@ package org.elasticsearch.action.ingest; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.common.ParseField; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.xcontent.ConstructingObjectParser; import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.common.xcontent.XContentParser.Token; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg; +import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken; + public class SimulatePipelineResponse extends ActionResponse implements ToXContentObject { private String pipelineId; private boolean verbose; private List results; + @SuppressWarnings("unchecked") + public static final ConstructingObjectParser PARSER = + new ConstructingObjectParser<>( + "simulate_pipeline_response", + true, + a -> { + List results = (List)a[0]; + boolean verbose = false; + if (results.size() > 0) { + if (results.get(0) instanceof SimulateDocumentVerboseResult) { + verbose = true; + } + } + return new SimulatePipelineResponse(null, verbose, results); + } + ); + static { + PARSER.declareObjectArray( + constructorArg(), + (parser, context) -> { + Token token = parser.currentToken(); + ensureExpectedToken(Token.START_OBJECT, token, parser::getTokenLocation); + SimulateDocumentResult result = null; + while ((token = parser.nextToken()) != Token.END_OBJECT) { + ensureExpectedToken(token, Token.FIELD_NAME, parser::getTokenLocation); + String fieldName = parser.currentName(); + token = parser.nextToken(); + if (token == Token.START_ARRAY) { + if (fieldName.equals(SimulateDocumentVerboseResult.PROCESSOR_RESULT_FIELD)) { + List results = new ArrayList<>(); + while ((token = parser.nextToken()) == Token.START_OBJECT) { + results.add(SimulateProcessorResult.fromXContent(parser)); + } + ensureExpectedToken(Token.END_ARRAY, token, parser::getTokenLocation); + result = new SimulateDocumentVerboseResult(results); + } else { + parser.skipChildren(); + } + } else if (token.equals(Token.START_OBJECT)) { + switch (fieldName) { + case WriteableIngestDocument.DOC_FIELD: + result = new SimulateDocumentBaseResult( + WriteableIngestDocument.INGEST_DOC_PARSER.apply(parser, null).getIngestDocument() + ); + break; + case "error": + result = new SimulateDocumentBaseResult(ElasticsearchException.fromXContent(parser)); + break; + default: + parser.skipChildren(); + break; + } + } // else it is a value skip it + } + assert result != null; + return result; + }, + new ParseField(Fields.DOCUMENTS)); + } + public SimulatePipelineResponse() { } @@ -98,6 +166,10 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws return builder; } + public static SimulatePipelineResponse fromXContent(XContentParser parser) { + return PARSER.apply(parser, null); + } + static final class Fields { static final String DOCUMENTS = "docs"; } diff --git a/server/src/main/java/org/elasticsearch/action/ingest/SimulateProcessorResult.java b/server/src/main/java/org/elasticsearch/action/ingest/SimulateProcessorResult.java index 386a00b391f3c..101ce7ec260e1 100644 --- a/server/src/main/java/org/elasticsearch/action/ingest/SimulateProcessorResult.java +++ b/server/src/main/java/org/elasticsearch/action/ingest/SimulateProcessorResult.java @@ -19,33 +19,91 @@ package org.elasticsearch.action.ingest; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.common.ParseField; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; -import org.elasticsearch.common.xcontent.ToXContent.Params; +import org.elasticsearch.common.xcontent.ConstructingObjectParser; import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.ingest.ConfigurationUtils; import org.elasticsearch.ingest.IngestDocument; import java.io.IOException; -class SimulateProcessorResult implements Writeable, ToXContentObject { +import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg; +import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg; + +public class SimulateProcessorResult implements Writeable, ToXContentObject { + + private static final String IGNORED_ERROR_FIELD = "ignored_error"; private final String processorTag; private final WriteableIngestDocument ingestDocument; private final Exception failure; - SimulateProcessorResult(String processorTag, IngestDocument ingestDocument, Exception failure) { + @SuppressWarnings("unchecked") + private static final ConstructingObjectParser IGNORED_ERROR_PARSER = + new ConstructingObjectParser<>( + "ignored_error_parser", + true, + a -> (ElasticsearchException)a[0] + ); + static { + IGNORED_ERROR_PARSER.declareObject( + constructorArg(), + (p, c) -> ElasticsearchException.fromXContent(p), + new ParseField("error") + ); + } + + @SuppressWarnings("unchecked") + public static final ConstructingObjectParser PARSER = + new ConstructingObjectParser<>( + "simulate_processor_result", + true, + a -> { + String processorTag = a[0] == null ? null : (String)a[0]; + IngestDocument document = a[1] == null ? null : ((WriteableIngestDocument)a[1]).getIngestDocument(); + Exception failure = null; + if (a[2] != null) { + failure = (ElasticsearchException)a[2]; + } else if (a[3] != null) { + failure = (ElasticsearchException)a[3]; + } + return new SimulateProcessorResult(processorTag, document, failure); + } + ); + static { + PARSER.declareString(optionalConstructorArg(), new ParseField(ConfigurationUtils.TAG_KEY)); + PARSER.declareObject( + optionalConstructorArg(), + WriteableIngestDocument.INGEST_DOC_PARSER, + new ParseField(WriteableIngestDocument.DOC_FIELD) + ); + PARSER.declareObject( + optionalConstructorArg(), + IGNORED_ERROR_PARSER, + new ParseField(IGNORED_ERROR_FIELD) + ); + PARSER.declareObject( + optionalConstructorArg(), + (p, c) -> ElasticsearchException.fromXContent(p), + new ParseField("error") + ); + } + + public SimulateProcessorResult(String processorTag, IngestDocument ingestDocument, Exception failure) { this.processorTag = processorTag; this.ingestDocument = (ingestDocument == null) ? null : new WriteableIngestDocument(ingestDocument); this.failure = failure; } - SimulateProcessorResult(String processorTag, IngestDocument ingestDocument) { + public SimulateProcessorResult(String processorTag, IngestDocument ingestDocument) { this(processorTag, ingestDocument, null); } - SimulateProcessorResult(String processorTag, Exception failure) { + public SimulateProcessorResult(String processorTag, Exception failure) { this(processorTag, null, failure); } @@ -98,7 +156,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws } if (failure != null && ingestDocument != null) { - builder.startObject("ignored_error"); + builder.startObject(IGNORED_ERROR_FIELD); ElasticsearchException.generateFailureXContent(builder, params, failure, true); builder.endObject(); } else if (failure != null) { @@ -112,4 +170,8 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.endObject(); return builder; } + + public static SimulateProcessorResult fromXContent(XContentParser parser) { + return PARSER.apply(parser, null); + } } diff --git a/server/src/main/java/org/elasticsearch/action/ingest/WriteableIngestDocument.java b/server/src/main/java/org/elasticsearch/action/ingest/WriteableIngestDocument.java index 87168cb7a9bba..06d32f54bc5c0 100644 --- a/server/src/main/java/org/elasticsearch/action/ingest/WriteableIngestDocument.java +++ b/server/src/main/java/org/elasticsearch/action/ingest/WriteableIngestDocument.java @@ -20,24 +20,95 @@ package org.elasticsearch.action.ingest; import org.elasticsearch.Version; +import org.elasticsearch.common.ParseField; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; -import org.elasticsearch.common.xcontent.ToXContent.Params; +import org.elasticsearch.common.xcontent.ConstructingObjectParser; import org.elasticsearch.common.xcontent.ToXContentFragment; import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.ingest.IngestDocument; +import org.elasticsearch.ingest.IngestDocument.MetaData; import java.io.IOException; import java.time.ZoneId; +import java.time.ZonedDateTime; import java.util.Date; +import java.util.HashMap; import java.util.Map; import java.util.Objects; +import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg; +import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg; + final class WriteableIngestDocument implements Writeable, ToXContentFragment { + static final String SOURCE_FIELD = "_source"; + static final String INGEST_FIELD = "_ingest"; + static final String DOC_FIELD = "doc"; private final IngestDocument ingestDocument; + @SuppressWarnings("unchecked") + public static final ConstructingObjectParser INGEST_DOC_PARSER = + new ConstructingObjectParser<>( + "ingest_document", + true, + a -> { + HashMap sourceAndMetadata = new HashMap<>(); + sourceAndMetadata.put(MetaData.INDEX.getFieldName(), a[0]); + sourceAndMetadata.put(MetaData.TYPE.getFieldName(), a[1]); + sourceAndMetadata.put(MetaData.ID.getFieldName(), a[2]); + if (a[3] != null) { + sourceAndMetadata.put(MetaData.PARENT.getFieldName(), a[3]); + } + if (a[4] != null) { + sourceAndMetadata.put(MetaData.ROUTING.getFieldName(), a[4]); + } + if (a[5] != null) { + sourceAndMetadata.put(MetaData.VERSION.getFieldName(), a[5]); + } + if (a[6] != null) { + sourceAndMetadata.put(MetaData.VERSION_TYPE.getFieldName(), a[6]); + } + sourceAndMetadata.putAll((Map)a[7]); + return new WriteableIngestDocument(new IngestDocument(sourceAndMetadata, (Map)a[8])); + } + ); + static { + INGEST_DOC_PARSER.declareString(constructorArg(), new ParseField(MetaData.INDEX.getFieldName())); + INGEST_DOC_PARSER.declareString(constructorArg(), new ParseField(MetaData.TYPE.getFieldName())); + INGEST_DOC_PARSER.declareString(constructorArg(), new ParseField(MetaData.ID.getFieldName())); + INGEST_DOC_PARSER.declareString(optionalConstructorArg(), new ParseField(MetaData.PARENT.getFieldName())); + INGEST_DOC_PARSER.declareString(optionalConstructorArg(), new ParseField(MetaData.ROUTING.getFieldName())); + INGEST_DOC_PARSER.declareLong(optionalConstructorArg(), new ParseField(MetaData.VERSION.getFieldName())); + INGEST_DOC_PARSER.declareString(optionalConstructorArg(), new ParseField(MetaData.VERSION_TYPE.getFieldName())); + INGEST_DOC_PARSER.declareObject(constructorArg(), (p, c) -> p.map(), new ParseField(SOURCE_FIELD)); + INGEST_DOC_PARSER.declareObject( + constructorArg(), + (p, c) -> { + Map ingestMap = p.map(); + ingestMap.computeIfPresent( + "timestamp", + (k, o) -> ZonedDateTime.parse((String)o) + ); + return ingestMap; + }, + new ParseField(INGEST_FIELD) + ); + } + + @SuppressWarnings("unchecked") + public static final ConstructingObjectParser PARSER = + new ConstructingObjectParser<>( + "writeable_ingest_document", + true, + a -> (WriteableIngestDocument)a[0] + ); + static { + PARSER.declareObject(constructorArg(), INGEST_DOC_PARSER, new ParseField(DOC_FIELD)); + } + WriteableIngestDocument(IngestDocument ingestDocument) { assert ingestDocument != null; this.ingestDocument = ingestDocument; @@ -67,19 +138,25 @@ IngestDocument getIngestDocument() { @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - builder.startObject("doc"); - Map metadataMap = ingestDocument.extractMetadata(); + builder.startObject(DOC_FIELD); + Map metadataMap = ingestDocument.getMetadata(); for (Map.Entry metadata : metadataMap.entrySet()) { if (metadata.getValue() != null) { builder.field(metadata.getKey().getFieldName(), metadata.getValue().toString()); } } - builder.field("_source", ingestDocument.getSourceAndMetadata()); - builder.field("_ingest", ingestDocument.getIngestMetadata()); + Map source = IngestDocument.deepCopyMap(ingestDocument.getSourceAndMetadata()); + metadataMap.keySet().forEach(mD -> source.remove(mD.getFieldName())); + builder.field(SOURCE_FIELD, source); + builder.field(INGEST_FIELD, ingestDocument.getIngestMetadata()); builder.endObject(); return builder; } + public static WriteableIngestDocument fromXContent(XContentParser parser) { + return PARSER.apply(parser, null); + } + @Override public boolean equals(Object o) { if (this == o) { diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java b/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java index 89e945780c8f5..5a9683c481513 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java @@ -574,6 +574,17 @@ public Map extractMetadata() { return metadataMap; } + /** + * Does the same thing as {@link #extractMetadata} but does not mutate the map. + */ + public Map getMetadata() { + Map metadataMap = new EnumMap<>(MetaData.class); + for (MetaData metaData : MetaData.values()) { + metadataMap.put(metaData, sourceAndMetadata.get(metaData.getFieldName())); + } + return metadataMap; + } + /** * Returns the available ingest metadata fields, by default only timestamp, but it is possible to set additional ones. * Use only for reading values, modify them instead using {@link #setFieldValue(String, Object)} and {@link #removeField(String)} @@ -592,7 +603,7 @@ public Map getSourceAndMetadata() { } @SuppressWarnings("unchecked") - private static Map deepCopyMap(Map source) { + public static Map deepCopyMap(Map source) { return (Map) deepCopy(source); } diff --git a/server/src/test/java/org/elasticsearch/action/ingest/SimulateDocumentBaseResultTests.java b/server/src/test/java/org/elasticsearch/action/ingest/SimulateDocumentBaseResultTests.java new file mode 100644 index 0000000000000..bfa6c1eb9b8c3 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/action/ingest/SimulateDocumentBaseResultTests.java @@ -0,0 +1,138 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.ingest; + +import org.elasticsearch.common.io.stream.BytesStreamOutput; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.ingest.IngestDocument; +import org.elasticsearch.test.AbstractXContentTestCase; + +import java.io.IOException; +import java.util.StringJoiner; +import java.util.function.Predicate; +import java.util.function.Supplier; + +import static org.elasticsearch.ingest.IngestDocumentMatcher.assertIngestDocument; +import static org.hamcrest.CoreMatchers.containsString; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.elasticsearch.action.ingest.WriteableIngestDocumentTests.createRandomIngestDoc; + +public class SimulateDocumentBaseResultTests extends AbstractXContentTestCase { + + public void testSerialization() throws IOException { + boolean isFailure = randomBoolean(); + SimulateDocumentBaseResult simulateDocumentBaseResult = createTestInstance(isFailure); + + BytesStreamOutput out = new BytesStreamOutput(); + simulateDocumentBaseResult.writeTo(out); + StreamInput streamInput = out.bytes().streamInput(); + SimulateDocumentBaseResult otherSimulateDocumentBaseResult = new SimulateDocumentBaseResult(streamInput); + + if (isFailure) { + assertThat(otherSimulateDocumentBaseResult.getIngestDocument(), equalTo(simulateDocumentBaseResult.getIngestDocument())); + assertThat(otherSimulateDocumentBaseResult.getFailure(), instanceOf(IllegalArgumentException.class)); + IllegalArgumentException e = (IllegalArgumentException) otherSimulateDocumentBaseResult.getFailure(); + assertThat(e.getMessage(), equalTo("test")); + } else { + assertIngestDocument(otherSimulateDocumentBaseResult.getIngestDocument(), simulateDocumentBaseResult.getIngestDocument()); + } + } + + static SimulateDocumentBaseResult createTestInstance(boolean isFailure) { + SimulateDocumentBaseResult simulateDocumentBaseResult; + if (isFailure) { + simulateDocumentBaseResult = new SimulateDocumentBaseResult(new IllegalArgumentException("test")); + } else { + IngestDocument ingestDocument = createRandomIngestDoc(); + simulateDocumentBaseResult = new SimulateDocumentBaseResult(ingestDocument); + } + return simulateDocumentBaseResult; + } + + private static SimulateDocumentBaseResult createTestInstanceWithFailures() { + return createTestInstance(randomBoolean()); + } + + @Override + protected SimulateDocumentBaseResult createTestInstance() { + return createTestInstance(false); + } + + @Override + protected SimulateDocumentBaseResult doParseInstance(XContentParser parser) { + return SimulateDocumentBaseResult.fromXContent(parser); + } + + @Override + protected boolean supportsUnknownFields() { + return true; + } + + @Override + protected Predicate getRandomFieldsExcludeFilter() { + // We cannot have random fields in the _source field and _ingest field + return field -> + field.contains( + new StringJoiner(".") + .add(WriteableIngestDocument.DOC_FIELD) + .add(WriteableIngestDocument.SOURCE_FIELD).toString() + ) || + field.contains( + new StringJoiner(".") + .add(WriteableIngestDocument.DOC_FIELD) + .add(WriteableIngestDocument.INGEST_FIELD).toString() + ); + } + + public static void assertEqualDocs(SimulateDocumentBaseResult response, SimulateDocumentBaseResult parsedResponse) { + assertEquals(response.getIngestDocument(), parsedResponse.getIngestDocument()); + if (response.getFailure() != null) { + assertNotNull(parsedResponse.getFailure()); + assertThat( + parsedResponse.getFailure().getMessage(), + containsString(response.getFailure().getMessage()) + ); + } else { + assertNull(parsedResponse.getFailure()); + } + } + + @Override + public void assertEqualInstances(SimulateDocumentBaseResult response, SimulateDocumentBaseResult parsedResponse) { + assertEqualDocs(response, parsedResponse); + } + + /** + * Test parsing {@link SimulateDocumentBaseResult} with inner failures as they don't support asserting on xcontent + * equivalence, given that exceptions are not parsed back as the same original class. We run the usual + * {@link AbstractXContentTestCase#testFromXContent()} without failures, and this other test with failures where + * we disable asserting on xcontent equivalence at the end. + */ + public void testFromXContentWithFailures() throws IOException { + Supplier instanceSupplier = SimulateDocumentBaseResultTests::createTestInstanceWithFailures; + //exceptions are not of the same type whenever parsed back + boolean assertToXContentEquivalence = false; + AbstractXContentTestCase.testFromXContent(NUMBER_OF_TEST_RUNS, instanceSupplier, supportsUnknownFields(), + getShuffleFieldsExceptions(), getRandomFieldsExcludeFilter(), this::createParser, this::doParseInstance, + this::assertEqualInstances, assertToXContentEquivalence, getToXContentParams()); + } +} diff --git a/server/src/test/java/org/elasticsearch/action/ingest/SimulateDocumentSimpleResultTests.java b/server/src/test/java/org/elasticsearch/action/ingest/SimulateDocumentSimpleResultTests.java deleted file mode 100644 index 83aad26f6a07b..0000000000000 --- a/server/src/test/java/org/elasticsearch/action/ingest/SimulateDocumentSimpleResultTests.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.action.ingest; - -import org.elasticsearch.common.io.stream.BytesStreamOutput; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.ingest.RandomDocumentPicks; -import org.elasticsearch.ingest.IngestDocument; -import org.elasticsearch.test.ESTestCase; - -import java.io.IOException; - -import static org.elasticsearch.ingest.IngestDocumentMatcher.assertIngestDocument; -import static org.hamcrest.CoreMatchers.equalTo; -import static org.hamcrest.CoreMatchers.instanceOf; - -public class SimulateDocumentSimpleResultTests extends ESTestCase { - - public void testSerialization() throws IOException { - boolean isFailure = randomBoolean(); - SimulateDocumentBaseResult simulateDocumentBaseResult; - if (isFailure) { - simulateDocumentBaseResult = new SimulateDocumentBaseResult(new IllegalArgumentException("test")); - } else { - IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random()); - simulateDocumentBaseResult = new SimulateDocumentBaseResult(ingestDocument); - } - - BytesStreamOutput out = new BytesStreamOutput(); - simulateDocumentBaseResult.writeTo(out); - StreamInput streamInput = out.bytes().streamInput(); - SimulateDocumentBaseResult otherSimulateDocumentBaseResult = new SimulateDocumentBaseResult(streamInput); - - if (isFailure) { - assertThat(otherSimulateDocumentBaseResult.getIngestDocument(), equalTo(simulateDocumentBaseResult.getIngestDocument())); - assertThat(otherSimulateDocumentBaseResult.getFailure(), instanceOf(IllegalArgumentException.class)); - IllegalArgumentException e = (IllegalArgumentException) otherSimulateDocumentBaseResult.getFailure(); - assertThat(e.getMessage(), equalTo("test")); - } else { - assertIngestDocument(otherSimulateDocumentBaseResult.getIngestDocument(), simulateDocumentBaseResult.getIngestDocument()); - } - } -} diff --git a/server/src/test/java/org/elasticsearch/action/ingest/SimulateDocumentVerboseResultTests.java b/server/src/test/java/org/elasticsearch/action/ingest/SimulateDocumentVerboseResultTests.java new file mode 100644 index 0000000000000..5701bcc27800f --- /dev/null +++ b/server/src/test/java/org/elasticsearch/action/ingest/SimulateDocumentVerboseResultTests.java @@ -0,0 +1,113 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.action.ingest; + +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.test.AbstractXContentTestCase; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.StringJoiner; +import java.util.function.Predicate; +import java.util.function.Supplier; + +public class SimulateDocumentVerboseResultTests extends AbstractXContentTestCase { + + static SimulateDocumentVerboseResult createTestInstance(boolean withFailures) { + int numDocs = randomIntBetween(0, 10); + List results = new ArrayList<>(); + for (int i = 0; i getRandomFieldsExcludeFilter() { + // We cannot have random fields in the _source field and _ingest field + return field -> + field.contains( + new StringJoiner(".") + .add(WriteableIngestDocument.DOC_FIELD) + .add(WriteableIngestDocument.SOURCE_FIELD).toString() + ) || + field.contains( + new StringJoiner(".") + .add(WriteableIngestDocument.DOC_FIELD) + .add(WriteableIngestDocument.INGEST_FIELD).toString() + ); + } + + /** + * Test parsing {@link SimulateDocumentVerboseResult} with inner failures as they don't support asserting on xcontent + * equivalence, given that exceptions are not parsed back as the same original class. We run the usual + * {@link AbstractXContentTestCase#testFromXContent()} without failures, and this other test with failures where we + * disable asserting on xcontent equivalence at the end. + */ + public void testFromXContentWithFailures() throws IOException { + Supplier instanceSupplier = SimulateDocumentVerboseResultTests::createTestInstanceWithFailures; + //exceptions are not of the same type whenever parsed back + boolean assertToXContentEquivalence = false; + AbstractXContentTestCase.testFromXContent(NUMBER_OF_TEST_RUNS, instanceSupplier, supportsUnknownFields(), + getShuffleFieldsExceptions(), getRandomFieldsExcludeFilter(), this::createParser, this::doParseInstance, + this::assertEqualInstances, assertToXContentEquivalence, getToXContentParams()); + } +} diff --git a/server/src/test/java/org/elasticsearch/action/ingest/SimulatePipelineResponseTests.java b/server/src/test/java/org/elasticsearch/action/ingest/SimulatePipelineResponseTests.java index be448a09db892..60bad4aad460f 100644 --- a/server/src/test/java/org/elasticsearch/action/ingest/SimulatePipelineResponseTests.java +++ b/server/src/test/java/org/elasticsearch/action/ingest/SimulatePipelineResponseTests.java @@ -21,57 +21,29 @@ import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.ingest.IngestDocument; -import org.elasticsearch.ingest.RandomDocumentPicks; -import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.test.AbstractXContentTestCase; import java.io.IOException; import java.util.ArrayList; import java.util.Iterator; import java.util.List; +import java.util.StringJoiner; +import java.util.function.Predicate; +import java.util.function.Supplier; import static org.elasticsearch.ingest.IngestDocumentMatcher.assertIngestDocument; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.CoreMatchers.nullValue; -public class SimulatePipelineResponseTests extends ESTestCase { +public class SimulatePipelineResponseTests extends AbstractXContentTestCase { public void testSerialization() throws IOException { boolean isVerbose = randomBoolean(); String id = randomBoolean() ? randomAlphaOfLengthBetween(1, 10) : null; - int numResults = randomIntBetween(1, 10); - List results = new ArrayList<>(numResults); - for (int i = 0; i < numResults; i++) { - boolean isFailure = randomBoolean(); - IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random()); - if (isVerbose) { - int numProcessors = randomIntBetween(1, 10); - List processorResults = new ArrayList<>(numProcessors); - for (int j = 0; j < numProcessors; j++) { - String processorTag = randomAlphaOfLengthBetween(1, 10); - SimulateProcessorResult processorResult; - if (isFailure) { - processorResult = new SimulateProcessorResult(processorTag, new IllegalArgumentException("test")); - } else { - processorResult = new SimulateProcessorResult(processorTag, ingestDocument); - } - processorResults.add(processorResult); - } - results.add(new SimulateDocumentVerboseResult(processorResults)); - } else { - results.add(new SimulateDocumentBaseResult(ingestDocument)); - SimulateDocumentBaseResult simulateDocumentBaseResult; - if (isFailure) { - simulateDocumentBaseResult = new SimulateDocumentBaseResult(new IllegalArgumentException("test")); - } else { - simulateDocumentBaseResult = new SimulateDocumentBaseResult(ingestDocument); - } - results.add(simulateDocumentBaseResult); - } - } - SimulatePipelineResponse response = new SimulatePipelineResponse(id, isVerbose, results); + SimulatePipelineResponse response = createInstance(id, isVerbose, true); BytesStreamOutput out = new BytesStreamOutput(); response.writeTo(out); StreamInput streamInput = out.bytes().streamInput(); @@ -120,4 +92,97 @@ public void testSerialization() throws IOException { } } } + + static SimulatePipelineResponse createInstance(String pipelineId, boolean isVerbose, boolean withFailure) { + int numResults = randomIntBetween(1, 10); + List results = new ArrayList<>(numResults); + for (int i = 0; i < numResults; i++) { + if (isVerbose) { + results.add( + SimulateDocumentVerboseResultTests.createTestInstance(withFailure) + ); + } else { + results.add( + SimulateDocumentBaseResultTests.createTestInstance(withFailure && randomBoolean()) + ); + } + } + return new SimulatePipelineResponse(pipelineId, isVerbose, results); + } + + private static SimulatePipelineResponse createTestInstanceWithFailures() { + boolean isVerbose = randomBoolean(); + return createInstance(null, isVerbose, false); + } + + @Override + protected SimulatePipelineResponse createTestInstance() { + boolean isVerbose = randomBoolean(); + // since the pipeline id is not serialized with XContent we set it to null for equality tests. + // we test failures separately since comparing XContent is not possible with failures + return createInstance(null, isVerbose, false); + } + + @Override + protected SimulatePipelineResponse doParseInstance(XContentParser parser) { + return SimulatePipelineResponse.fromXContent(parser); + } + + @Override + protected boolean supportsUnknownFields() { + return true; + } + + @Override + protected void assertEqualInstances(SimulatePipelineResponse response, + SimulatePipelineResponse parsedResponse) { + assertEquals(response.getPipelineId(), parsedResponse.getPipelineId()); + assertEquals(response.isVerbose(), parsedResponse.isVerbose()); + assertEquals(response.getResults().size(), parsedResponse.getResults().size()); + for (int i=0; i < response.getResults().size(); i++) { + if (response.isVerbose()) { + assertThat(response.getResults().get(i), instanceOf(SimulateDocumentVerboseResult.class)); + assertThat(parsedResponse.getResults().get(i), instanceOf(SimulateDocumentVerboseResult.class)); + SimulateDocumentVerboseResult responseResult = (SimulateDocumentVerboseResult)response.getResults().get(i); + SimulateDocumentVerboseResult parsedResult = (SimulateDocumentVerboseResult)parsedResponse.getResults().get(i); + SimulateDocumentVerboseResultTests.assertEqualDocs(responseResult, parsedResult); + } else { + assertThat(response.getResults().get(i), instanceOf(SimulateDocumentBaseResult.class)); + assertThat(parsedResponse.getResults().get(i), instanceOf(SimulateDocumentBaseResult.class)); + SimulateDocumentBaseResult responseResult = (SimulateDocumentBaseResult)response.getResults().get(i); + SimulateDocumentBaseResult parsedResult = (SimulateDocumentBaseResult)parsedResponse.getResults().get(i); + SimulateDocumentBaseResultTests.assertEqualDocs(responseResult, parsedResult); + } + } + } + + @Override + protected Predicate getRandomFieldsExcludeFilter() { + // We cannot have random fields in the _source field and _ingest field + return field -> + field.contains( + new StringJoiner(".") + .add(WriteableIngestDocument.DOC_FIELD) + .add(WriteableIngestDocument.SOURCE_FIELD).toString() + ) || + field.contains( + new StringJoiner(".") + .add(WriteableIngestDocument.DOC_FIELD) + .add(WriteableIngestDocument.INGEST_FIELD).toString() + ); + } + + /** + * Test parsing {@link SimulatePipelineResponse} with inner failures as they don't support asserting on xcontent equivalence, given that + * exceptions are not parsed back as the same original class. We run the usual {@link AbstractXContentTestCase#testFromXContent()} + * without failures, and this other test with failures where we disable asserting on xcontent equivalence at the end. + */ + public void testFromXContentWithFailures() throws IOException { + Supplier instanceSupplier = SimulatePipelineResponseTests::createTestInstanceWithFailures; + //exceptions are not of the same type whenever parsed back + boolean assertToXContentEquivalence = false; + AbstractXContentTestCase.testFromXContent(NUMBER_OF_TEST_RUNS, instanceSupplier, supportsUnknownFields(), getShuffleFieldsExceptions(), + getRandomFieldsExcludeFilter(), this::createParser, this::doParseInstance, + this::assertEqualInstances, assertToXContentEquivalence, getToXContentParams()); + } } diff --git a/server/src/test/java/org/elasticsearch/action/ingest/SimulateProcessorResultTests.java b/server/src/test/java/org/elasticsearch/action/ingest/SimulateProcessorResultTests.java index 3014a1a4ae61d..2e0d6a75749bb 100644 --- a/server/src/test/java/org/elasticsearch/action/ingest/SimulateProcessorResultTests.java +++ b/server/src/test/java/org/elasticsearch/action/ingest/SimulateProcessorResultTests.java @@ -21,35 +21,29 @@ import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.ingest.RandomDocumentPicks; +import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.ingest.IngestDocument; -import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.AbstractXContentTestCase; import java.io.IOException; +import java.util.StringJoiner; +import java.util.function.Predicate; +import java.util.function.Supplier; import static org.elasticsearch.ingest.IngestDocumentMatcher.assertIngestDocument; +import static org.elasticsearch.action.ingest.WriteableIngestDocumentTests.createRandomIngestDoc; +import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.nullValue; -public class SimulateProcessorResultTests extends ESTestCase { +public class SimulateProcessorResultTests extends AbstractXContentTestCase { public void testSerialization() throws IOException { - String processorTag = randomAlphaOfLengthBetween(1, 10); boolean isSuccessful = randomBoolean(); boolean isIgnoredException = randomBoolean(); - SimulateProcessorResult simulateProcessorResult; - if (isSuccessful) { - IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random()); - if (isIgnoredException) { - simulateProcessorResult = new SimulateProcessorResult(processorTag, ingestDocument, new IllegalArgumentException("test")); - } else { - simulateProcessorResult = new SimulateProcessorResult(processorTag, ingestDocument); - } - } else { - simulateProcessorResult = new SimulateProcessorResult(processorTag, new IllegalArgumentException("test")); - } + SimulateProcessorResult simulateProcessorResult = createTestInstance(isSuccessful, isIgnoredException); BytesStreamOutput out = new BytesStreamOutput(); simulateProcessorResult.writeTo(out); @@ -72,4 +66,96 @@ public void testSerialization() throws IOException { assertThat(e.getMessage(), equalTo("test")); } } + + static SimulateProcessorResult createTestInstance(boolean isSuccessful, + boolean isIgnoredException) { + String processorTag = randomAlphaOfLengthBetween(1, 10); + SimulateProcessorResult simulateProcessorResult; + if (isSuccessful) { + IngestDocument ingestDocument = createRandomIngestDoc(); + if (isIgnoredException) { + simulateProcessorResult = new SimulateProcessorResult(processorTag, ingestDocument, new IllegalArgumentException("test")); + } else { + simulateProcessorResult = new SimulateProcessorResult(processorTag, ingestDocument); + } + } else { + simulateProcessorResult = new SimulateProcessorResult(processorTag, new IllegalArgumentException("test")); + } + return simulateProcessorResult; + } + + private static SimulateProcessorResult createTestInstanceWithFailures() { + boolean isSuccessful = randomBoolean(); + boolean isIgnoredException = randomBoolean(); + return createTestInstance(isSuccessful, isIgnoredException); + } + + @Override + protected SimulateProcessorResult createTestInstance() { + // we test failures separately since comparing XContent is not possible with failures + return createTestInstance(true, false); + } + + @Override + protected SimulateProcessorResult doParseInstance(XContentParser parser) { + return SimulateProcessorResult.fromXContent(parser); + } + + @Override + protected boolean supportsUnknownFields() { + return true; + } + + @Override + protected Predicate getRandomFieldsExcludeFilter() { + // We cannot have random fields in the _source field and _ingest field + return field -> + field.startsWith( + new StringJoiner(".") + .add(WriteableIngestDocument.DOC_FIELD) + .add(WriteableIngestDocument.SOURCE_FIELD).toString() + ) || + field.startsWith( + new StringJoiner(".") + .add(WriteableIngestDocument.DOC_FIELD) + .add(WriteableIngestDocument.INGEST_FIELD).toString() + ); + } + + static void assertEqualProcessorResults(SimulateProcessorResult response, + SimulateProcessorResult parsedResponse) { + assertEquals(response.getProcessorTag(), parsedResponse.getProcessorTag()); + assertEquals(response.getIngestDocument(), parsedResponse.getIngestDocument()); + if (response.getFailure() != null ) { + assertNotNull(parsedResponse.getFailure()); + assertThat( + parsedResponse.getFailure().getMessage(), + containsString(response.getFailure().getMessage()) + ); + } else { + assertNull(parsedResponse.getFailure()); + } + } + + @Override + protected void assertEqualInstances(SimulateProcessorResult response, SimulateProcessorResult parsedResponse) { + assertEqualProcessorResults(response, parsedResponse); + } + + /** + * Test parsing {@link SimulateProcessorResult} with inner failures as they don't support asserting on xcontent equivalence, given that + * exceptions are not parsed back as the same original class. We run the usual {@link AbstractXContentTestCase#testFromXContent()} + * without failures, and this other test with failures where we disable asserting on xcontent equivalence at the end. + */ + public void testFromXContentWithFailures() throws IOException { + Supplier instanceSupplier = SimulateProcessorResultTests::createTestInstanceWithFailures; + //with random fields insertion in the inner exceptions, some random stuff may be parsed back as metadata, + //but that does not bother our assertions, as we only want to test that we don't break. + boolean supportsUnknownFields = true; + //exceptions are not of the same type whenever parsed back + boolean assertToXContentEquivalence = false; + AbstractXContentTestCase.testFromXContent(NUMBER_OF_TEST_RUNS, instanceSupplier, supportsUnknownFields, + getShuffleFieldsExceptions(), getRandomFieldsExcludeFilter(), this::createParser, this::doParseInstance, + this::assertEqualInstances, assertToXContentEquivalence, getToXContentParams()); + } } diff --git a/server/src/test/java/org/elasticsearch/action/ingest/WriteableIngestDocumentTests.java b/server/src/test/java/org/elasticsearch/action/ingest/WriteableIngestDocumentTests.java index 4d8e0f544c458..bc4589ff5d36c 100644 --- a/server/src/test/java/org/elasticsearch/action/ingest/WriteableIngestDocumentTests.java +++ b/server/src/test/java/org/elasticsearch/action/ingest/WriteableIngestDocumentTests.java @@ -25,14 +25,19 @@ import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.ingest.RandomDocumentPicks; import org.elasticsearch.ingest.IngestDocument; -import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.AbstractXContentTestCase; +import org.elasticsearch.test.RandomObjects; import java.io.IOException; import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.StringJoiner; +import java.util.function.Predicate; import static org.elasticsearch.common.xcontent.ToXContent.EMPTY_PARAMS; import static org.elasticsearch.ingest.IngestDocumentMatcher.assertIngestDocument; @@ -40,7 +45,7 @@ import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.not; -public class WriteableIngestDocumentTests extends ESTestCase { +public class WriteableIngestDocumentTests extends AbstractXContentTestCase { public void testEqualsAndHashcode() throws Exception { Map sourceAndMetadata = RandomDocumentPicks.randomSource(random()); @@ -147,4 +152,42 @@ public void testToXContent() throws IOException { IngestDocument serializedIngestDocument = new IngestDocument(toXContentSource, toXContentIngestMetadata); assertThat(serializedIngestDocument, equalTo(serializedIngestDocument)); } + + static IngestDocument createRandomIngestDoc() { + XContentType xContentType = randomFrom(XContentType.values()); + BytesReference sourceBytes = RandomObjects.randomSource(random(), xContentType); + Map randomSource = XContentHelper.convertToMap(sourceBytes, false, xContentType).v2(); + return RandomDocumentPicks.randomIngestDocument(random(), randomSource); + } + + @Override + protected boolean supportsUnknownFields() { + return true; + } + + @Override + protected WriteableIngestDocument createTestInstance() { + return new WriteableIngestDocument(createRandomIngestDoc()); + } + + @Override + protected WriteableIngestDocument doParseInstance(XContentParser parser) { + return WriteableIngestDocument.fromXContent(parser); + } + + @Override + protected Predicate getRandomFieldsExcludeFilter() { + // We cannot have random fields in the _source field and _ingest field + return field -> + field.startsWith( + new StringJoiner(".") + .add(WriteableIngestDocument.DOC_FIELD) + .add(WriteableIngestDocument.SOURCE_FIELD).toString() + ) || + field.startsWith( + new StringJoiner(".") + .add(WriteableIngestDocument.DOC_FIELD) + .add(WriteableIngestDocument.INGEST_FIELD).toString() + ); + } }