Skip to content

Commit

Permalink
INGEST: Add Pipeline Processor (#32473)
Browse files Browse the repository at this point in the history
* INGEST: Add Pipeline Processor

* Adds Processor capable of invoking other pipelines
* Closes #31842
  • Loading branch information
original-brownbear authored Aug 29, 2018
1 parent 48b388c commit f690b49
Show file tree
Hide file tree
Showing 7 changed files with 330 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ public Map<String, Processor.Factory> getProcessors(Processor.Parameters paramet
processors.put(KeyValueProcessor.TYPE, new KeyValueProcessor.Factory());
processors.put(URLDecodeProcessor.TYPE, new URLDecodeProcessor.Factory());
processors.put(BytesProcessor.TYPE, new BytesProcessor.Factory());
processors.put(PipelineProcessor.TYPE, new PipelineProcessor.Factory(parameters.ingestService));
processors.put(DissectProcessor.TYPE, new DissectProcessor.Factory());
return Collections.unmodifiableMap(processors);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.ingest.common;

import java.util.Map;
import org.elasticsearch.ingest.AbstractProcessor;
import org.elasticsearch.ingest.ConfigurationUtils;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.IngestService;
import org.elasticsearch.ingest.Pipeline;
import org.elasticsearch.ingest.Processor;

public class PipelineProcessor extends AbstractProcessor {

public static final String TYPE = "pipeline";

private final String pipelineName;

private final IngestService ingestService;

private PipelineProcessor(String tag, String pipelineName, IngestService ingestService) {
super(tag);
this.pipelineName = pipelineName;
this.ingestService = ingestService;
}

@Override
public void execute(IngestDocument ingestDocument) throws Exception {
Pipeline pipeline = ingestService.getPipeline(pipelineName);
if (pipeline == null) {
throw new IllegalStateException("Pipeline processor configured for non-existent pipeline [" + pipelineName + ']');
}
ingestDocument.executePipeline(pipeline);
}

@Override
public String getType() {
return TYPE;
}

public static final class Factory implements Processor.Factory {

private final IngestService ingestService;

public Factory(IngestService ingestService) {
this.ingestService = ingestService;
}

@Override
public PipelineProcessor create(Map<String, Processor.Factory> registry, String processorTag,
Map<String, Object> config) throws Exception {
String pipeline =
ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "pipeline");
return new PipelineProcessor(processorTag, pipeline, ingestService);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.ingest.common;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ingest.CompoundProcessor;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.IngestService;
import org.elasticsearch.ingest.Pipeline;
import org.elasticsearch.ingest.Processor;
import org.elasticsearch.ingest.RandomDocumentPicks;
import org.elasticsearch.test.ESTestCase;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class PipelineProcessorTests extends ESTestCase {

public void testExecutesPipeline() throws Exception {
String pipelineId = "pipeline";
IngestService ingestService = mock(IngestService.class);
CompletableFuture<IngestDocument> invoked = new CompletableFuture<>();
IngestDocument testIngestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>());
Pipeline pipeline = new Pipeline(
pipelineId, null, null,
new CompoundProcessor(new Processor() {
@Override
public void execute(final IngestDocument ingestDocument) throws Exception {
invoked.complete(ingestDocument);
}

@Override
public String getType() {
return null;
}

@Override
public String getTag() {
return null;
}
})
);
when(ingestService.getPipeline(pipelineId)).thenReturn(pipeline);
PipelineProcessor.Factory factory = new PipelineProcessor.Factory(ingestService);
Map<String, Object> config = new HashMap<>();
config.put("pipeline", pipelineId);
factory.create(Collections.emptyMap(), null, config).execute(testIngestDocument);
assertEquals(testIngestDocument, invoked.get());
}

public void testThrowsOnMissingPipeline() throws Exception {
IngestService ingestService = mock(IngestService.class);
IngestDocument testIngestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>());
PipelineProcessor.Factory factory = new PipelineProcessor.Factory(ingestService);
Map<String, Object> config = new HashMap<>();
config.put("pipeline", "missingPipelineId");
IllegalStateException e = expectThrows(
IllegalStateException.class,
() -> factory.create(Collections.emptyMap(), null, config).execute(testIngestDocument)
);
assertEquals(
"Pipeline processor configured for non-existent pipeline [missingPipelineId]", e.getMessage()
);
}

public void testThrowsOnRecursivePipelineInvocations() throws Exception {
String innerPipelineId = "inner";
String outerPipelineId = "outer";
IngestService ingestService = mock(IngestService.class);
IngestDocument testIngestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>());
Map<String, Object> outerConfig = new HashMap<>();
outerConfig.put("pipeline", innerPipelineId);
PipelineProcessor.Factory factory = new PipelineProcessor.Factory(ingestService);
Pipeline outer = new Pipeline(
outerPipelineId, null, null,
new CompoundProcessor(factory.create(Collections.emptyMap(), null, outerConfig))
);
Map<String, Object> innerConfig = new HashMap<>();
innerConfig.put("pipeline", outerPipelineId);
Pipeline inner = new Pipeline(
innerPipelineId, null, null,
new CompoundProcessor(factory.create(Collections.emptyMap(), null, innerConfig))
);
when(ingestService.getPipeline(outerPipelineId)).thenReturn(outer);
when(ingestService.getPipeline(innerPipelineId)).thenReturn(inner);
outerConfig.put("pipeline", innerPipelineId);
ElasticsearchException e = expectThrows(
ElasticsearchException.class,
() -> factory.create(Collections.emptyMap(), null, outerConfig).execute(testIngestDocument)
);
assertEquals(
"Recursive invocation of pipeline [inner] detected.", e.getRootCause().getMessage()
);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
---
teardown:
- do:
ingest.delete_pipeline:
id: "inner"
ignore: 404

- do:
ingest.delete_pipeline:
id: "outer"
ignore: 404

---
"Test Pipeline Processor with Simple Inner Pipeline":
- do:
ingest.put_pipeline:
id: "inner"
body: >
{
"description" : "inner pipeline",
"processors" : [
{
"set" : {
"field": "foo",
"value": "bar"
}
},
{
"set" : {
"field": "baz",
"value": "blub"
}
}
]
}
- match: { acknowledged: true }

- do:
ingest.put_pipeline:
id: "outer"
body: >
{
"description" : "outer pipeline",
"processors" : [
{
"pipeline" : {
"pipeline": "inner"
}
}
]
}
- match: { acknowledged: true }

- do:
index:
index: test
type: test
id: 1
pipeline: "outer"
body: {}

- do:
get:
index: test
type: test
id: 1
- match: { _source.foo: "bar" }
- match: { _source.baz: "blub" }

---
"Test Pipeline Processor with Circular Pipelines":
- do:
ingest.put_pipeline:
id: "outer"
body: >
{
"description" : "outer pipeline",
"processors" : [
{
"pipeline" : {
"pipeline": "inner"
}
}
]
}
- match: { acknowledged: true }

- do:
ingest.put_pipeline:
id: "inner"
body: >
{
"description" : "inner pipeline",
"processors" : [
{
"pipeline" : {
"pipeline": "outer"
}
}
]
}
- match: { acknowledged: true }

- do:
catch: /illegal_state_exception/
index:
index: test
type: test
id: 1
pipeline: "outer"
body: {}
- match: { error.root_cause.0.type: "exception" }
- match: { error.root_cause.0.reason: "java.lang.IllegalArgumentException: java.lang.IllegalStateException: Recursive invocation of pipeline [inner] detected." }
19 changes: 19 additions & 0 deletions server/src/main/java/org/elasticsearch/ingest/IngestDocument.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@

package org.elasticsearch.ingest;

import java.util.Collections;
import java.util.IdentityHashMap;
import java.util.Set;
import org.elasticsearch.common.Strings;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.mapper.IdFieldMapper;
Expand Down Expand Up @@ -55,6 +58,9 @@ public final class IngestDocument {
private final Map<String, Object> sourceAndMetadata;
private final Map<String, Object> ingestMetadata;

// Contains all pipelines that have been executed for this document
private final Set<Pipeline> executedPipelines = Collections.newSetFromMap(new IdentityHashMap<>());

public IngestDocument(String index, String type, String id, String routing,
Long version, VersionType versionType, Map<String, Object> source) {
this.sourceAndMetadata = new HashMap<>();
Expand Down Expand Up @@ -632,6 +638,19 @@ private static Object deepCopy(Object value) {
}
}

/**
* Executes the given pipeline with for this document unless the pipeline has already been executed
* for this document.
* @param pipeline Pipeline to execute
* @throws Exception On exception in pipeline execution
*/
public void executePipeline(Pipeline pipeline) throws Exception {
if (this.executedPipelines.add(pipeline) == false) {
throw new IllegalStateException("Recursive invocation of pipeline [" + pipeline.getId() + "] detected.");
}
pipeline.execute(this);
}

@Override
public boolean equals(Object obj) {
if (obj == this) { return true; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public IngestService(ClusterService clusterService, ThreadPool threadPool,
threadPool.getThreadContext(), threadPool::relativeTimeInMillis,
(delay, command) -> threadPool.schedule(
TimeValue.timeValueMillis(delay), ThreadPool.Names.GENERIC, command
)
), this
)
);
this.threadPool = threadPool;
Expand Down
10 changes: 7 additions & 3 deletions server/src/main/java/org/elasticsearch/ingest/Processor.java
Original file line number Diff line number Diff line change
Expand Up @@ -97,22 +97,26 @@ class Parameters {
* instances that have run prior to in ingest.
*/
public final ThreadContext threadContext;

public final LongSupplier relativeTimeSupplier;


public final IngestService ingestService;

/**
* Provides scheduler support
*/
public final BiFunction<Long, Runnable, ScheduledFuture<?>> scheduler;

public Parameters(Environment env, ScriptService scriptService, AnalysisRegistry analysisRegistry, ThreadContext threadContext,
LongSupplier relativeTimeSupplier, BiFunction<Long, Runnable, ScheduledFuture<?>> scheduler) {
LongSupplier relativeTimeSupplier, BiFunction<Long, Runnable, ScheduledFuture<?>> scheduler,
IngestService ingestService) {
this.env = env;
this.scriptService = scriptService;
this.threadContext = threadContext;
this.analysisRegistry = analysisRegistry;
this.relativeTimeSupplier = relativeTimeSupplier;
this.scheduler = scheduler;
this.ingestService = ingestService;
}

}
Expand Down

0 comments on commit f690b49

Please sign in to comment.