Skip to content

Commit

Permalink
code review feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
masseyke committed Nov 9, 2023
1 parent 6be766e commit fd12236
Show file tree
Hide file tree
Showing 6 changed files with 160 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public SimulateIndexResponse(
List<String> pipelines
) {
// We don't actually care about most of the IndexResponse fields:
super(new ShardId(index, "", 0), id == null ? "" : id, 0, 0, version, true, pipelines);
super(new ShardId(index, "", 0), id == null ? "<n/a>" : id, 0, 0, version, true, pipelines);
this.source = source;
this.sourceXContentType = sourceXContentType;
setShardInfo(new ReplicationResponse.ShardInfo(0, 0));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,8 @@ public class IngestService implements ClusterStateApplier, ReportingService<Inge
// are loaded, so in the cluster state we just save the pipeline config and here we keep the actual pipelines around.
private volatile Map<String, PipelineHolder> pipelines = Map.of();
private final ThreadPool threadPool;
private final IngestMetric totalMetrics;
private final List<Consumer<ClusterState>> ingestClusterStateListeners;
private final IngestMetric totalMetrics = new IngestMetric();
private final List<Consumer<ClusterState>> ingestClusterStateListeners = new CopyOnWriteArrayList<>();
private volatile ClusterState state;

private static BiFunction<Long, Runnable, Scheduler.ScheduledCancellable> createScheduler(ThreadPool threadPool) {
Expand Down Expand Up @@ -184,8 +184,6 @@ public IngestService(
MatcherWatchdog matcherWatchdog,
Supplier<DocumentParsingObserver> documentParsingObserverSupplier
) {
this.totalMetrics = new IngestMetric();
this.ingestClusterStateListeners = new CopyOnWriteArrayList<>();
this.clusterService = clusterService;
this.scriptService = scriptService;
this.documentParsingObserverSupplier = documentParsingObserverSupplier;
Expand Down Expand Up @@ -214,8 +212,6 @@ public IngestService(
* @param ingestService
*/
IngestService(IngestService ingestService) {
this.totalMetrics = new IngestMetric();
this.ingestClusterStateListeners = new CopyOnWriteArrayList<>();
this.clusterService = ingestService.clusterService;
this.scriptService = ingestService.scriptService;
this.documentParsingObserverSupplier = ingestService.documentParsingObserverSupplier;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public SimulateIngestService(IngestService ingestService, BulkRequest request) {
throw new RuntimeException(e);
}
} else {
pipelineSubstitutions = Map.of();
throw new IllegalArgumentException("Expecting a SimulateBulkRequest but got " + request.getClass());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,9 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
* The simulate ingest API is intended to have inputs and outputs that are formatted similarly to the simulate pipeline API for the
* sake of consistency. But internally it uses the same code as the _bulk API, so that we have confidence that we are simulating what
* really happens on ingest. This method transforms simulate-style inputs into an input that the bulk API can accept.
* Non-private for unit testing
*/
private BytesReference convertToBulkRequestXContentBytes(Map<String, Object> sourceMap) throws IOException {
static BytesReference convertToBulkRequestXContentBytes(Map<String, Object> sourceMap) throws IOException {
List<Map<String, Object>> docs = ConfigurationUtils.readList(null, null, sourceMap, "docs");
if (docs.isEmpty()) {
throw new IllegalArgumentException("must specify at least one document in [docs]");
Expand All @@ -108,20 +109,14 @@ private BytesReference convertToBulkRequestXContentBytes(Map<String, Object> sou
throw new IllegalArgumentException("malformed [docs] section, should include an inner object");
}
Map<String, Object> document = ConfigurationUtils.readMap(null, null, doc, "_source");
String index = ConfigurationUtils.readStringOrIntProperty(
null,
null,
doc,
IngestDocument.Metadata.INDEX.getFieldName(),
"_index"
);
String id = ConfigurationUtils.readStringOrIntProperty(null, null, doc, IngestDocument.Metadata.ID.getFieldName(), "_id");
String index = ConfigurationUtils.readOptionalStringProperty(null, null, doc, IngestDocument.Metadata.INDEX.getFieldName());
String id = ConfigurationUtils.readOptionalStringProperty(null, null, doc, IngestDocument.Metadata.ID.getFieldName());
XContentBuilder actionXContentBuilder = XContentFactory.contentBuilder(XContentType.JSON).lfAtEnd();
actionXContentBuilder.startObject().field("index").startObject();
if ("_index".equals(index) == false) {
if (index != null) {
actionXContentBuilder.field("_index", index);
}
if ("id".equals(id) == false) {
if (id != null) {
actionXContentBuilder.field("_id", id);
}
actionXContentBuilder.endObject().endObject();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import org.elasticsearch.test.ESTestCase;

import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -19,7 +20,12 @@
public class SimulateBulkRequestTests extends ESTestCase {

public void testSerialization() throws Exception {
Map<String, Map<String, Object>> pipelineSubstitutions = getTestPipelineSubstitutions();
testSerialization(getTestPipelineSubstitutions());
testSerialization(null);
testSerialization(Map.of());
}

private void testSerialization(Map<String, Map<String, Object>> pipelineSubstitutions) throws IOException {
SimulateBulkRequest simulateBulkRequest = new SimulateBulkRequest(pipelineSubstitutions);
/*
* Note: SimulateBulkRequest does not implement equals or hashCode, so we can't test serialization in the usual way for a
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.rest.action.ingest;

import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xcontent.XContentType;

import java.util.Map;

import static org.hamcrest.Matchers.equalTo;

public class RestSimulateIngestActionTests extends ESTestCase {

public void testConvertToBulkRequestXContentBytes() throws Exception {
{
// No index, no id, which we expect to be fine:
String simulateRequestJson = """
{
"docs": [
{
"_source": {
"my-keyword-field": "FOO"
}
},
{
"_source": {
"my-keyword-field": "BAR"
}
}
],
"pipeline_substitutions": {
"my-pipeline-2": {
"processors": [
{
"set": {
"field": "my-new-boolean-field",
"value": true
}
}
]
}
}
}
""";
String bulkRequestJson = """
{"index":{}}
{"my-keyword-field":"FOO"}
{"index":{}}
{"my-keyword-field":"BAR"}
""";
testInputJsonConvertsToOutputJson(simulateRequestJson, bulkRequestJson);
}

{
// index and id:
String simulateRequestJson = """
{
"docs": [
{
"_index": "index",
"_id": "123",
"_source": {
"foo": "bar"
}
},
{
"_index": "index",
"_id": "456",
"_source": {
"foo": "rab"
}
}
]
}
""";
String bulkRequestJson = """
{"index":{"_index":"index","_id":"123"}}
{"foo":"bar"}
{"index":{"_index":"index","_id":"456"}}
{"foo":"rab"}
""";
testInputJsonConvertsToOutputJson(simulateRequestJson, bulkRequestJson);
}

{
// We expect an IllegalArgumentException if there are no docs:
String simulateRequestJson = """
{
"docs": [
]
}
""";
String bulkRequestJson = """
{"index":{"_index":"index","_id":"123"}}
{"foo":"bar"}
{"index":{"_index":"index","_id":"456"}}
{"foo":"rab"}
""";
expectThrows(IllegalArgumentException.class, () -> testInputJsonConvertsToOutputJson(simulateRequestJson, bulkRequestJson));
}

{
// non-trivial source:
String simulateRequestJson = """
{
"docs": [
{
"_index": "index",
"_id": "123",
"_source": {
"foo": "bar",
"some_object": {
"prop1": "val1",
"some_array": [1, 2, 3, 4]
}
}
}
]
}
""";
String bulkRequestJson = """
{"index":{"_index":"index","_id":"123"}}
{"some_object":{"prop1":"val1","some_array":[1,2,3,4]},"foo":"bar"}
""";
testInputJsonConvertsToOutputJson(simulateRequestJson, bulkRequestJson);
}
}

private void testInputJsonConvertsToOutputJson(String inputJson, String expectedOutputJson) throws Exception {
Map<String, Object> sourceMap = XContentHelper.convertToMap(XContentType.JSON.xContent(), inputJson, false);
BytesReference bulkXcontentBytes = RestSimulateIngestAction.convertToBulkRequestXContentBytes(sourceMap);
String bulkRequestJson = XContentHelper.convertToJson(bulkXcontentBytes, false, XContentType.JSON);
assertThat(bulkRequestJson, equalTo(expectedOutputJson));
}
}

0 comments on commit fd12236

Please sign in to comment.