Skip to content

Commit

Permalink
Add support for parsing inline script (#23824) (#26846)
Browse files Browse the repository at this point in the history
* Add support for parsing inline script (#23824)

* Fix test
  • Loading branch information
kel authored and talevy committed Oct 11, 2017
1 parent 592ab04 commit 2e36f19
Show file tree
Hide file tree
Showing 6 changed files with 65 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -294,13 +295,13 @@ public static ElasticsearchException newConfigurationException(String processorT
return exception;
}

public static List<Processor> readProcessorConfigs(List<Map<String, Map<String, Object>>> processorConfigs,
public static List<Processor> readProcessorConfigs(List<Map<String, Object>> processorConfigs,
Map<String, Processor.Factory> processorFactories) throws Exception {
Exception exception = null;
List<Processor> processors = new ArrayList<>();
if (processorConfigs != null) {
for (Map<String, Map<String, Object>> processorConfigWithKey : processorConfigs) {
for (Map.Entry<String, Map<String, Object>> entry : processorConfigWithKey.entrySet()) {
for (Map<String, Object> processorConfigWithKey : processorConfigs) {
for (Map.Entry<String, Object> entry : processorConfigWithKey.entrySet()) {
try {
processors.add(readProcessor(processorFactories, entry.getKey(), entry.getValue()));
} catch (Exception e) {
Expand Down Expand Up @@ -353,13 +354,28 @@ private static void addHeadersToException(ElasticsearchException exception, Stri
}
}

@SuppressWarnings("unchecked")
public static Processor readProcessor(Map<String, Processor.Factory> processorFactories,
String type, Object config) throws Exception {
if (config instanceof Map) {
return readProcessor(processorFactories, type, (Map<String, Object>) config);
} else if (config instanceof String && "script".equals(type)) {
Map<String, Object> normalizedScript = new HashMap<>(1);
normalizedScript.put(ScriptType.INLINE.getName(), config);
return readProcessor(processorFactories, type, normalizedScript);
} else {
throw newConfigurationException(type, null, null,
"property isn't a map, but of type [" + config.getClass().getName() + "]");
}
}

public static Processor readProcessor(Map<String, Processor.Factory> processorFactories,
String type, Map<String, Object> config) throws Exception {
String tag = ConfigurationUtils.readOptionalStringProperty(null, null, config, TAG_KEY);
Processor.Factory factory = processorFactories.get(type);
if (factory != null) {
boolean ignoreFailure = ConfigurationUtils.readBooleanProperty(null, null, config, "ignore_failure", false);
List<Map<String, Map<String, Object>>> onFailureProcessorConfigs =
List<Map<String, Object>> onFailureProcessorConfigs =
ConfigurationUtils.readOptionalList(null, null, config, Pipeline.ON_FAILURE_KEY);

List<Processor> onFailureProcessors = readProcessorConfigs(onFailureProcessorConfigs, processorFactories);
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/java/org/elasticsearch/ingest/Pipeline.java
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,9 @@ public static final class Factory {
public Pipeline create(String id, Map<String, Object> config, Map<String, Processor.Factory> processorFactories) throws Exception {
String description = ConfigurationUtils.readOptionalStringProperty(null, null, config, DESCRIPTION_KEY);
Integer version = ConfigurationUtils.readIntProperty(null, null, config, VERSION_KEY, null);
List<Map<String, Map<String, Object>>> processorConfigs = ConfigurationUtils.readList(null, null, config, PROCESSORS_KEY);
List<Map<String, Object>> processorConfigs = ConfigurationUtils.readList(null, null, config, PROCESSORS_KEY);
List<Processor> processors = ConfigurationUtils.readProcessorConfigs(processorConfigs, processorFactories);
List<Map<String, Map<String, Object>>> onFailureProcessorConfigs =
List<Map<String, Object>> onFailureProcessorConfigs =
ConfigurationUtils.readOptionalList(null, null, config, ON_FAILURE_KEY);
List<Processor> onFailureProcessors = ConfigurationUtils.readProcessorConfigs(onFailureProcessorConfigs, processorFactories);
if (config.isEmpty() == false) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ public void testReadProcessors() throws Exception {
Map<String, Processor.Factory> registry =
Collections.singletonMap("test_processor", (factories, tag, config) -> processor);

List<Map<String, Map<String, Object>>> config = new ArrayList<>();
List<Map<String, Object>> config = new ArrayList<>();
Map<String, Object> emptyConfig = Collections.emptyMap();
config.add(Collections.singletonMap("test_processor", emptyConfig));
config.add(Collections.singletonMap("test_processor", emptyConfig));
Expand All @@ -135,7 +135,7 @@ public void testReadProcessors() throws Exception {
assertThat(e.getHeader("processor_type"), equalTo(Collections.singletonList("unknown_processor")));
assertThat(e.getHeader("property_name"), is(nullValue()));

List<Map<String, Map<String, Object>>> config2 = new ArrayList<>();
List<Map<String, Object>> config2 = new ArrayList<>();
unknownTaggedConfig = new HashMap<>();
unknownTaggedConfig.put("tag", "my_unknown");
config2.add(Collections.singletonMap("unknown_processor", unknownTaggedConfig));
Expand All @@ -157,4 +157,27 @@ public void testReadProcessors() throws Exception {
assertThat(e2.getHeader("property_name"), is(nullValue()));
}

public void testReadProcessorFromObjectOrMap() throws Exception {
Processor processor = mock(Processor.class);
Map<String, Processor.Factory> registry =
Collections.singletonMap("script", (processorFactories, tag, config) -> {
config.clear();
return processor;
});

Object emptyConfig = Collections.emptyMap();
Processor processor1 = ConfigurationUtils.readProcessor(registry, "script", emptyConfig);
assertThat(processor1, sameInstance(processor));

Object inlineScript = "test_script";
Processor processor2 = ConfigurationUtils.readProcessor(registry, "script", inlineScript);
assertThat(processor2, sameInstance(processor));

Object invalidConfig = 12L;

ElasticsearchParseException ex = expectThrows(ElasticsearchParseException.class,
() -> ConfigurationUtils.readProcessor(registry, "unknown_processor", invalidConfig));
assertThat(ex.getMessage(), equalTo("property isn't a map, but of type [" + invalidConfig.getClass().getName() + "]"));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@

package org.elasticsearch.ingest.common;

import org.apache.logging.log4j.Logger;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.logging.DeprecationLogger;
import org.elasticsearch.common.logging.ESLoggerFactory;
import com.fasterxml.jackson.core.JsonFactory;

import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.common.xcontent.json.JsonXContentParser;
import org.elasticsearch.ingest.AbstractProcessor;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.Processor;
Expand All @@ -31,22 +33,18 @@
import org.elasticsearch.script.ScriptException;
import org.elasticsearch.script.ScriptService;

import java.util.Arrays;
import java.util.Map;

import static java.util.Collections.emptyMap;
import static org.elasticsearch.common.Strings.hasLength;
import static org.elasticsearch.ingest.ConfigurationUtils.newConfigurationException;
import static org.elasticsearch.ingest.ConfigurationUtils.readOptionalMap;
import static org.elasticsearch.ingest.ConfigurationUtils.readOptionalStringProperty;
import static org.elasticsearch.script.ScriptType.INLINE;
import static org.elasticsearch.script.ScriptType.STORED;

/**
* Processor that evaluates a script with an ingest document in its context.
*/
public final class ScriptProcessor extends AbstractProcessor {

public static final String TYPE = "script";
private static final JsonFactory JSON_FACTORY = new JsonFactory();

private final Script script;
private final ScriptService scriptService;
Expand Down Expand Up @@ -87,66 +85,27 @@ Script getScript() {
}

public static final class Factory implements Processor.Factory {
private final Logger logger = ESLoggerFactory.getLogger(Factory.class);
private final DeprecationLogger deprecationLogger = new DeprecationLogger(logger);

private final ScriptService scriptService;

public Factory(ScriptService scriptService) {
this.scriptService = scriptService;
}

@Override
@SuppressWarnings("unchecked")
public ScriptProcessor create(Map<String, Processor.Factory> registry, String processorTag,
Map<String, Object> config) throws Exception {
String lang = readOptionalStringProperty(TYPE, processorTag, config, "lang");
String source = readOptionalStringProperty(TYPE, processorTag, config, "source");
String id = readOptionalStringProperty(TYPE, processorTag, config, "id");
Map<String, ?> params = readOptionalMap(TYPE, processorTag, config, "params");

if (source == null) {
source = readOptionalStringProperty(TYPE, processorTag, config, "inline");
if (source != null) {
deprecationLogger.deprecated("Specifying script source with [inline] is deprecated, use [source] instead.");
}
}

boolean containsNoScript = !hasLength(id) && !hasLength(source);
if (containsNoScript) {
throw newConfigurationException(TYPE, processorTag, null, "Need [id] or [source] parameter to refer to scripts");
}

boolean moreThanOneConfigured = Strings.hasLength(id) && Strings.hasLength(source);
if (moreThanOneConfigured) {
throw newConfigurationException(TYPE, processorTag, null, "Only one of [id] or [source] may be configured");
}

if (lang == null) {
lang = Script.DEFAULT_SCRIPT_LANG;
}
XContentBuilder builder = XContentBuilder.builder(JsonXContent.jsonXContent).map(config);
JsonXContentParser parser = new JsonXContentParser(NamedXContentRegistry.EMPTY,
JSON_FACTORY.createParser(builder.bytes().streamInput()));
Script script = Script.parse(parser);

if (params == null) {
params = emptyMap();
}

final Script script;
String scriptPropertyUsed;
if (Strings.hasLength(source)) {
script = new Script(INLINE, lang, source, (Map<String, Object>)params);
scriptPropertyUsed = "source";
} else if (Strings.hasLength(id)) {
script = new Script(STORED, null, id, (Map<String, Object>)params);
scriptPropertyUsed = "id";
} else {
throw newConfigurationException(TYPE, processorTag, null, "Could not initialize script");
}
Arrays.asList("id", "source", "inline", "lang", "params", "options").forEach(config::remove);

// verify script is able to be compiled before successfully creating processor.
try {
scriptService.compile(script, ExecutableScript.INGEST_CONTEXT);
} catch (ScriptException e) {
throw newConfigurationException(TYPE, processorTag, scriptPropertyUsed, e);
throw newConfigurationException(TYPE, processorTag, null, e);
}

return new ScriptProcessor(processorTag, script, scriptService);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,25 +82,25 @@ public void testFactoryValidationForMultipleScriptingTypes() throws Exception {

ElasticsearchException exception = expectThrows(ElasticsearchException.class,
() -> factory.create(null, randomAlphaOfLength(10), configMap));
assertThat(exception.getMessage(), is("Only one of [id] or [source] may be configured"));
assertThat(exception.getMessage(), is("[script] failed to parse field [source]"));
}

public void testFactoryValidationAtLeastOneScriptingType() throws Exception {
Map<String, Object> configMap = new HashMap<>();
configMap.put("lang", "mockscript");

ElasticsearchException exception = expectThrows(ElasticsearchException.class,
IllegalArgumentException exception = expectThrows(IllegalArgumentException.class,
() -> factory.create(null, randomAlphaOfLength(10), configMap));

assertThat(exception.getMessage(), is("Need [id] or [source] parameter to refer to scripts"));
assertThat(exception.getMessage(), is("must specify either [source] for an inline script or [id] for a stored script"));
}

public void testInlineBackcompat() throws Exception {
Map<String, Object> configMap = new HashMap<>();
configMap.put("inline", "code");

factory.create(null, randomAlphaOfLength(10), configMap);
assertWarnings("Specifying script source with [inline] is deprecated, use [source] instead.");
assertWarnings("Deprecated field [inline] used, expected [source] instead");
}

public void testFactoryInvalidateWithInvalidCompiledScript() throws Exception {
Expand All @@ -112,7 +112,6 @@ public void testFactoryInvalidateWithInvalidCompiledScript() throws Exception {
factory = new ScriptProcessor.Factory(mockedScriptService);

Map<String, Object> configMap = new HashMap<>();
configMap.put("lang", "mockscript");
configMap.put(randomType, "my_script");

ElasticsearchException exception = expectThrows(ElasticsearchException.class,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,6 @@
]
}
- match: { error.header.processor_type: "script" }
- match: { error.header.property_name: "source" }
- match: { error.type: "script_exception" }
- match: { error.reason: "compile error" }

0 comments on commit 2e36f19

Please sign in to comment.