Skip to content

Commit

Permalink
Fixed ingest pipeline script issue (opensearch-project#11725)
Browse files Browse the repository at this point in the history
Signed-off-by: vikasvb90 <vikasvb@amazon.com>
  • Loading branch information
vikasvb90 authored and rayshrey committed Mar 18, 2024
1 parent 0a79177 commit 41b14b7
Show file tree
Hide file tree
Showing 6 changed files with 146 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,11 @@ public IngestDocument execute(IngestDocument document) {
} else {
ingestScript = precompiledIngestScript;
}
ingestScript.execute(document.getSourceAndMetadata());
CollectionUtils.ensureNoSelfReferences(document.getSourceAndMetadata(), "ingest script");
IngestDocument mutableDocument = new IngestDocument(document);
ingestScript.execute(mutableDocument.getSourceAndMetadata());
CollectionUtils.ensureNoSelfReferences(mutableDocument.getSourceAndMetadata(), "ingest script");
document.getSourceAndMetadata().clear();
document.getSourceAndMetadata().putAll(mutableDocument.getSourceAndMetadata());
return document;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,4 +105,16 @@ private void assertIngestDocument(IngestDocument ingestDocument) {
int bytesTotal = ingestDocument.getFieldValue("bytes_in", Integer.class) + ingestDocument.getFieldValue("bytes_out", Integer.class);
assertThat(ingestDocument.getSourceAndMetadata().get("bytes_total"), is(bytesTotal));
}

public void testScriptingWithSelfReferencingSourceMetadata() {
ScriptProcessor processor = new ScriptProcessor(randomAlphaOfLength(10), null, script, null, scriptService);
IngestDocument originalIngestDocument = randomDocument();
String index = originalIngestDocument.getSourceAndMetadata().get(IngestDocument.Metadata.INDEX.getFieldName()).toString();
String id = originalIngestDocument.getSourceAndMetadata().get(IngestDocument.Metadata.ID.getFieldName()).toString();
Map<String, Object> sourceMetadata = originalIngestDocument.getSourceAndMetadata();
originalIngestDocument.getSourceAndMetadata().put("_source", sourceMetadata);
IngestDocument ingestDocument = new IngestDocument(index, id, null, null, null, originalIngestDocument.getSourceAndMetadata());
expectThrows(IllegalArgumentException.class, () -> processor.execute(ingestDocument));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -202,3 +202,79 @@ teardown:
id: 1
- match: { _source.source_field: "foo%20bar" }
- match: { _source.target_field: "foo bar" }

---
"Test self referencing source with ignore failure":
- do:
ingest.put_pipeline:
id: "my_pipeline"
body: >
{
"description": "_description",
"processors": [
{
"script" : {
"lang": "painless",
"source" : "ctx.foo['foo']=ctx.foo;ctx['test-field']='test-value'",
"ignore_failure": true
}
},
{
"script" : {
"lang": "painless",
"source" : "ctx.target_field = Processors.uppercase(ctx.source_field)"
}
}
]
}
- match: { acknowledged: true }

- do:
index:
index: test
id: 1
pipeline: "my_pipeline"
body: {source_field: "fooBar", foo: {foo: "bar"}}

- do:
get:
index: test
id: 1
- match: { _source.source_field: "fooBar" }
- match: { _source.target_field: "FOOBAR"}
- match: { _source.test-field: null}

---
"Test self referencing source without ignoring failure":
- do:
ingest.put_pipeline:
id: "my_pipeline"
body: >
{
"description": "_description",
"processors": [
{
"script" : {
"lang": "painless",
"source" : "ctx.foo['foo']=ctx.foo;ctx['test-field']='test-value'"
}
},
{
"script" : {
"lang": "painless",
"source" : "ctx.target_field = Processors.uppercase(ctx.source_field)"
}
}
]
}
- match: { acknowledged: true }

- do:
catch: bad_request
index:
index: test
id: 1
pipeline: "my_pipeline"
body: {source_field: "fooBar", foo: {foo: "bar"}}
- match: { error.root_cause.0.type: "illegal_argument_exception" }
- match: { error.root_cause.0.reason: "Iterable object is self-referencing itself (ingest script)" }
Original file line number Diff line number Diff line change
Expand Up @@ -1113,3 +1113,48 @@ teardown:
- match: { status: 400 }
- match: { error.root_cause.0.type: "illegal_argument_exception" }
- match: { error.root_cause.0.reason: "Failed to parse parameter [_if_primary_term], only int or long is accepted" }

---
"Test simulate with pipeline with ignore failure and cyclic field assignments in script":
- do:
ingest.simulate:
verbose: true
body: >
{
"pipeline": {
"description": "_description",
"processors": [
{
"script" : {
"ignore_failure" : true,
"lang": "painless",
"source": "ctx.foo['foo']=ctx.foo;ctx.tag='recursive'"
}
},
{
"script" : {
"lang": "painless",
"source" : "ctx.target_field = Processors.uppercase(ctx.foo.foo)"
}
}
]
},
"docs": [
{
"_source": {
"foo": {
"foo": "bar"
}
}
}
]
}
- length: { docs: 1 }
- length: { docs.0.processor_results: 2 }
- match: { docs.0.processor_results.0.status: "error_ignored" }
- match: { docs.0.processor_results.0.ignored_error.error.type: "illegal_argument_exception" }
- match: { docs.0.processor_results.0.doc._source.tag: null }
- match: { docs.0.processor_results.1.doc._source.target_field: "BAR" }
- match: { docs.0.processor_results.1.doc._source.foo.foo: "bar" }
- match: { docs.0.processor_results.1.status: "success" }
- match: { docs.0.processor_results.1.processor_type: "script" }
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
package org.opensearch.ingest;

import org.opensearch.core.common.Strings;
import org.opensearch.core.common.util.CollectionUtils;
import org.opensearch.index.VersionType;
import org.opensearch.index.mapper.IdFieldMapper;
import org.opensearch.index.mapper.IndexFieldMapper;
Expand Down Expand Up @@ -752,6 +753,7 @@ public Map<String, Object> getSourceAndMetadata() {

@SuppressWarnings("unchecked")
public static <K, V> Map<K, V> deepCopyMap(Map<K, V> source) {
CollectionUtils.ensureNoSelfReferences(source, "IngestDocument: Self reference present in object.");
return (Map<K, V>) deepCopy(source);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,12 @@ public void setTestIngestDocument() {
ingestDocument = new IngestDocument("index", "id", null, null, null, document);
}

public void testSelfReferencingSource() {
Map<String, Object> value = new HashMap<>();
value.put("foo", value);
expectThrows(IllegalArgumentException.class, () -> IngestDocument.deepCopyMap(value));
}

public void testSimpleGetFieldValue() {
assertThat(ingestDocument.getFieldValue("foo", String.class), equalTo("bar"));
assertThat(ingestDocument.getFieldValue("int", Integer.class), equalTo(123));
Expand Down

0 comments on commit 41b14b7

Please sign in to comment.