Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ingest: Add ignore_missing property to foreach filter (#22147) #31578

Merged
merged 2 commits into from
Jun 26, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions docs/reference/ingest/ingest-node.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -1075,9 +1075,10 @@ then it aborts the execution and leaves the array unmodified.
.Foreach Options
[options="header"]
|======
| Name | Required | Default | Description
| `field` | yes | - | The array field
| `processor` | yes | - | The processor to execute against each field
| Name | Required | Default | Description
| `field` | yes | - | The array field
| `processor` | yes | - | The processor to execute against each field
| `ignore_missing` | no | false | If `true` and `field` does not exist or is `null`, the processor quietly exits without modifying the document
|======

Assume the following document:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.Set;

import static org.elasticsearch.ingest.ConfigurationUtils.newConfigurationException;
import static org.elasticsearch.ingest.ConfigurationUtils.readBooleanProperty;
import static org.elasticsearch.ingest.ConfigurationUtils.readMap;
import static org.elasticsearch.ingest.ConfigurationUtils.readStringProperty;

Expand All @@ -47,16 +48,28 @@ public final class ForEachProcessor extends AbstractProcessor {

private final String field;
private final Processor processor;
private final boolean ignoreMissing;

ForEachProcessor(String tag, String field, Processor processor) {
ForEachProcessor(String tag, String field, Processor processor, boolean ignoreMissing) {
super(tag);
this.field = field;
this.processor = processor;
this.ignoreMissing = ignoreMissing;
}

boolean isIgnoreMissing() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why was this method added? It doesn't seem to add anything, other than being checked in tests, but the tests are directly setting it when constructing the processor.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rjernst tbh. I only added this since the GeoIP processor had the same getter (as did all other fields in this processor) so I just stuck with the style. You're right though, this is effectively test only code leaking in :(

return ignoreMissing;
}

@Override
public void execute(IngestDocument ingestDocument) throws Exception {
List values = ingestDocument.getFieldValue(field, List.class);
List values = ingestDocument.getFieldValue(field, List.class, ignoreMissing);
if (values == null) {
if (ignoreMissing) {
return;
}
throw new IllegalArgumentException("field [" + field + "] is null, cannot loop over its elements.");
}
List<Object> newValues = new ArrayList<>(values.size());
for (Object value : values) {
Object previousValue = ingestDocument.getIngestMetadata().put("_value", value);
Expand Down Expand Up @@ -87,14 +100,15 @@ public static final class Factory implements Processor.Factory {
public ForEachProcessor create(Map<String, Processor.Factory> factories, String tag,
Map<String, Object> config) throws Exception {
String field = readStringProperty(TYPE, tag, config, "field");
boolean ignoreMissing = readBooleanProperty(TYPE, tag, config, "ignore_missing", false);
Map<String, Map<String, Object>> processorConfig = readMap(TYPE, tag, config, "processor");
Set<Map.Entry<String, Map<String, Object>>> entries = processorConfig.entrySet();
if (entries.size() != 1) {
throw newConfigurationException(TYPE, tag, "processor", "Must specify exactly one processor type");
}
Map.Entry<String, Map<String, Object>> entry = entries.iterator().next();
Processor processor = ConfigurationUtils.readProcessor(factories, entry.getKey(), entry.getValue());
return new ForEachProcessor(tag, field, processor);
return new ForEachProcessor(tag, field, processor, ignoreMissing);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,24 @@ public void testCreate() throws Exception {
assertThat(forEachProcessor, Matchers.notNullValue());
assertThat(forEachProcessor.getField(), equalTo("_field"));
assertThat(forEachProcessor.getProcessor(), Matchers.sameInstance(processor));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add an assertion that ignore_missing is false by default?

assertFalse(forEachProcessor.isIgnoreMissing());
}

public void testSetIgnoreMissing() throws Exception {
Processor processor = new TestProcessor(ingestDocument -> { });
Map<String, Processor.Factory> registry = new HashMap<>();
registry.put("_name", (r, t, c) -> processor);
ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory();

Map<String, Object> config = new HashMap<>();
config.put("field", "_field");
config.put("processor", Collections.singletonMap("_name", Collections.emptyMap()));
config.put("ignore_missing", true);
ForEachProcessor forEachProcessor = forEachFactory.create(registry, null, config);
assertThat(forEachProcessor, Matchers.notNullValue());
assertThat(forEachProcessor.getField(), equalTo("_field"));
assertThat(forEachProcessor.getProcessor(), Matchers.sameInstance(processor));
assertTrue(forEachProcessor.isIgnoreMissing());
}

public void testCreateWithTooManyProcessorTypes() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,22 @@

package org.elasticsearch.ingest.common;

import org.elasticsearch.ingest.CompoundProcessor;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.Processor;
import org.elasticsearch.ingest.TestProcessor;
import org.elasticsearch.ingest.TestTemplateService;
import org.elasticsearch.script.TemplateScript;
import org.elasticsearch.test.ESTestCase;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import org.elasticsearch.ingest.CompoundProcessor;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.Processor;
import org.elasticsearch.ingest.TestProcessor;
import org.elasticsearch.ingest.TestTemplateService;
import org.elasticsearch.script.TemplateScript;
import org.elasticsearch.test.ESTestCase;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please change the import order in your IDE so that these sort before java.util and that would have led to these import orderings not changing. For example:

screen shot 2018-06-26 at 2 11 20 pm

It's annoying to see these change in PR after PR based on different IDE settings.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done :), sorry about that


import static org.elasticsearch.ingest.IngestDocumentMatcher.assertIngestDocument;
import static org.hamcrest.Matchers.equalTo;

public class ForEachProcessorTests extends ESTestCase {
Expand All @@ -49,7 +49,8 @@ public void testExecute() throws Exception {
);

ForEachProcessor processor = new ForEachProcessor(
"_tag", "values", new UppercaseProcessor("_tag", "_ingest._value", false, "_ingest._value")
"_tag", "values", new UppercaseProcessor("_tag", "_ingest._value", false, "_ingest._value"),
false
);
processor.execute(ingestDocument);

Expand All @@ -69,7 +70,7 @@ public void testExecuteWithFailure() throws Exception {
throw new RuntimeException("failure");
}
});
ForEachProcessor processor = new ForEachProcessor("_tag", "values", testProcessor);
ForEachProcessor processor = new ForEachProcessor("_tag", "values", testProcessor, false);
try {
processor.execute(ingestDocument);
fail("exception expected");
Expand All @@ -89,7 +90,8 @@ public void testExecuteWithFailure() throws Exception {
});
Processor onFailureProcessor = new TestProcessor(ingestDocument1 -> {});
processor = new ForEachProcessor(
"_tag", "values", new CompoundProcessor(false, Arrays.asList(testProcessor), Arrays.asList(onFailureProcessor))
"_tag", "values", new CompoundProcessor(false, Arrays.asList(testProcessor), Arrays.asList(onFailureProcessor)),
false
);
processor.execute(ingestDocument);
assertThat(testProcessor.getInvokedCounter(), equalTo(3));
Expand All @@ -109,7 +111,7 @@ public void testMetaDataAvailable() throws Exception {
id.setFieldValue("_ingest._value.type", id.getSourceAndMetadata().get("_type"));
id.setFieldValue("_ingest._value.id", id.getSourceAndMetadata().get("_id"));
});
ForEachProcessor processor = new ForEachProcessor("_tag", "values", innerProcessor);
ForEachProcessor processor = new ForEachProcessor("_tag", "values", innerProcessor, false);
processor.execute(ingestDocument);

assertThat(innerProcessor.getInvokedCounter(), equalTo(2));
Expand Down Expand Up @@ -137,7 +139,7 @@ public void testRestOfTheDocumentIsAvailable() throws Exception {
ForEachProcessor processor = new ForEachProcessor(
"_tag", "values", new SetProcessor("_tag",
new TestTemplateService.MockTemplateScript.Factory("_ingest._value.new_field"),
(model) -> model.get("other")));
(model) -> model.get("other")), false);
processor.execute(ingestDocument);

assertThat(ingestDocument.getFieldValue("values.0.new_field", String.class), equalTo("value"));
Expand Down Expand Up @@ -174,7 +176,7 @@ public String getTag() {
"_index", "_type", "_id", null, null, null, Collections.singletonMap("values", values)
);

ForEachProcessor processor = new ForEachProcessor("_tag", "values", innerProcessor);
ForEachProcessor processor = new ForEachProcessor("_tag", "values", innerProcessor, false);
processor.execute(ingestDocument);
@SuppressWarnings("unchecked")
List<String> result = ingestDocument.getFieldValue("values", List.class);
Expand All @@ -199,7 +201,7 @@ public void testModifyFieldsOutsideArray() throws Exception {
"_tag", "values", new CompoundProcessor(false,
Collections.singletonList(new UppercaseProcessor("_tag_upper", "_ingest._value", false, "_ingest._value")),
Collections.singletonList(new AppendProcessor("_tag", template, (model) -> (Collections.singletonList("added"))))
));
), false);
processor.execute(ingestDocument);

List result = ingestDocument.getFieldValue("values", List.class);
Expand All @@ -225,7 +227,7 @@ public void testScalarValueAllowsUnderscoreValueFieldToRemainAccessible() throws

TestProcessor processor = new TestProcessor(doc -> doc.setFieldValue("_ingest._value",
doc.getFieldValue("_source._value", String.class)));
ForEachProcessor forEachProcessor = new ForEachProcessor("_tag", "values", processor);
ForEachProcessor forEachProcessor = new ForEachProcessor("_tag", "values", processor, false);
forEachProcessor.execute(ingestDocument);

List result = ingestDocument.getFieldValue("values", List.class);
Expand Down Expand Up @@ -258,7 +260,7 @@ public void testNestedForEach() throws Exception {
doc -> doc.setFieldValue("_ingest._value", doc.getFieldValue("_ingest._value", String.class).toUpperCase(Locale.ENGLISH))
);
ForEachProcessor processor = new ForEachProcessor(
"_tag", "values1", new ForEachProcessor("_tag", "_ingest._value.values2", testProcessor));
"_tag", "values1", new ForEachProcessor("_tag", "_ingest._value.values2", testProcessor, false), false);
processor.execute(ingestDocument);

List result = ingestDocument.getFieldValue("values1.0.values2", List.class);
Expand All @@ -270,4 +272,16 @@ public void testNestedForEach() throws Exception {
assertThat(result.get(1), equalTo("JKL"));
}

public void testIgnoreMissing() throws Exception {
IngestDocument originalIngestDocument = new IngestDocument(
"_index", "_type", "_id", null, null, null, Collections.emptyMap()
);
IngestDocument ingestDocument = new IngestDocument(originalIngestDocument);
TestProcessor testProcessor = new TestProcessor(doc -> {});
ForEachProcessor processor = new ForEachProcessor("_tag", "_ingest._value", testProcessor, true);
processor.execute(ingestDocument);
assertIngestDocument(originalIngestDocument, ingestDocument);
assertThat(testProcessor.getInvokedCounter(), equalTo(0));
}

}