Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

INGEST: Enable default pipelines #32286

Merged
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
---
teardown:
- do:
ingest.delete_pipeline:
id: "my_pipeline"
ignore: 404

---
"Test index with default pipeline":
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While having rest tests is fine to ensure the param is passed through the rest layer correctly, we should first have unit tests.

- do:
ingest.put_pipeline:
id: "my_pipeline"
body: >
{
"description": "_description",
"processors": [
{
"bytes" : {
"field" : "bytes_source_field",
"target_field" : "bytes_target_field"
}
}
]
}
- match: { acknowledged: true }

- do:
indices.create:
index: test
body:
settings:
index:
default_pipeline: "my_pipeline"

- do:
index:
index: test
type: test
id: 1
body: {bytes_source_field: "1kb"}

- do:
get:
index: test
type: test
id: 1
- match: { _source.bytes_source_field: "1kb" }
- match: { _source.bytes_target_field: 1024 }

- do:
index:
index: test
type: test
id: 2
pipeline: "_none"
body: {bytes_source_field: "1kb"}

- do:
get:
index: test
type: test
id: 2
- match: { _source.bytes_source_field: "1kb" }
- is_false: _source.bytes_target_field

- do:
catch: bad_request
index:
index: test
type: test
id: 3
pipeline: ""
body: {bytes_source_field: "1kb"}
Original file line number Diff line number Diff line change
Expand Up @@ -523,22 +523,6 @@ private int findNextMarker(byte marker, int from, BytesReference data, int lengt
return -1;
}

/**
* @return Whether this bulk request contains index request with an ingest pipeline enabled.
*/
public boolean hasIndexRequestsWithPipelines() {
for (DocWriteRequest<?> actionRequest : requests) {
if (actionRequest instanceof IndexRequest) {
IndexRequest indexRequest = (IndexRequest) actionRequest;
if (Strings.hasText(indexRequest.getPipeline())) {
return true;
}
}
}

return false;
}

@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,15 @@
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndexClosedException;
Expand Down Expand Up @@ -125,7 +127,29 @@ public TransportBulkAction(Settings settings, ThreadPool threadPool, TransportSe

@Override
protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener<BulkResponse> listener) {
if (bulkRequest.hasIndexRequestsWithPipelines()) {
boolean hasIndexRequestsWithPipelines = false;
ImmutableOpenMap<String, IndexMetaData> indicesMetaData = clusterService.state().getMetaData().indices();
for (DocWriteRequest<?> actionRequest : bulkRequest.requests) {
if (actionRequest instanceof IndexRequest) {
IndexRequest indexRequest = (IndexRequest) actionRequest;
String pipeline = indexRequest.getPipeline();
if (pipeline == null) {
IndexMetaData indexMetaData = indicesMetaData.get(indexRequest.index());
if (indexMetaData == null) {
indexRequest.setPipeline(IngestService.NOOP_PIPELINE_NAME);
} else {
String defaultPipeline = IndexSettings.DEFAULT_PIPELINE.get(indexMetaData.getSettings());
indexRequest.setPipeline(defaultPipeline);
if (IngestService.NOOP_PIPELINE_NAME.equals(defaultPipeline) == false) {
hasIndexRequestsWithPipelines = true;
}
}
} else if (IngestService.NOOP_PIPELINE_NAME.equals(pipeline) == false) {
hasIndexRequestsWithPipelines = true;
}
}
}
if (hasIndexRequestsWithPipelines) {
if (clusterService.localNode().isIngestNode()) {
processBulkIndexIngestRequest(task, bulkRequest, listener);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,10 @@ public ActionRequestValidationException validate() {
validationException = addValidationError("an id must be provided if version type or value are set", validationException);
}

if (pipeline != null && pipeline.isEmpty()) {
validationException = addValidationError("pipeline cannot be an empty string", validationException);
}

return validationException;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
EngineConfig.INDEX_CODEC_SETTING,
EngineConfig.INDEX_OPTIMIZE_AUTO_GENERATED_IDS,
IndexMetaData.SETTING_WAIT_FOR_ACTIVE_SHARDS,
IndexSettings.DEFAULT_PIPELINE,

// validate that built-in similarities don't get redefined
Setting.groupSetting("index.similarity.", (s) -> {
Expand Down
20 changes: 20 additions & 0 deletions server/src/main/java/org/elasticsearch/index/IndexSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.ingest.IngestService;
import org.elasticsearch.node.Node;

import java.util.Collections;
Expand Down Expand Up @@ -254,6 +255,14 @@ public final class IndexSettings {
public static final Setting<Integer> MAX_REGEX_LENGTH_SETTING = Setting.intSetting("index.max_regex_length",
1000, 1, Property.Dynamic, Property.IndexScope);

public static final Setting<String> DEFAULT_PIPELINE =
new Setting<>("index.default_pipeline", IngestService.NOOP_PIPELINE_NAME, s -> {
if (s == null || s.isEmpty()) {
throw new IllegalArgumentException("Value for [index.default_pipeline] must be a non-empty string.");
}
return s;
}, Property.Dynamic, Property.IndexScope);

private final Index index;
private final Version version;
private final Logger logger;
Expand Down Expand Up @@ -293,6 +302,7 @@ public final class IndexSettings {
private volatile TimeValue searchIdleAfter;
private volatile int maxAnalyzedOffset;
private volatile int maxTermsCount;
private volatile String defaultPipeline;

/**
* The maximum number of refresh listeners allows on this shard.
Expand Down Expand Up @@ -408,6 +418,7 @@ public IndexSettings(final IndexMetaData indexMetaData, final Settings nodeSetti
this.mergePolicyConfig = new MergePolicyConfig(logger, this);
this.indexSortConfig = new IndexSortConfig(this);
searchIdleAfter = scopedSettings.get(INDEX_SEARCH_IDLE_AFTER);
defaultPipeline = scopedSettings.get(DEFAULT_PIPELINE);

scopedSettings.addSettingsUpdateConsumer(MergePolicyConfig.INDEX_COMPOUND_FORMAT_SETTING, mergePolicyConfig::setNoCFSRatio);
scopedSettings.addSettingsUpdateConsumer(MergePolicyConfig.INDEX_MERGE_POLICY_EXPUNGE_DELETES_ALLOWED_SETTING, mergePolicyConfig::setExpungeDeletesAllowed);
Expand Down Expand Up @@ -446,6 +457,7 @@ public IndexSettings(final IndexMetaData indexMetaData, final Settings nodeSetti
scopedSettings.addSettingsUpdateConsumer(DEFAULT_FIELD_SETTING, this::setDefaultFields);
scopedSettings.addSettingsUpdateConsumer(INDEX_SEARCH_IDLE_AFTER, this::setSearchIdleAfter);
scopedSettings.addSettingsUpdateConsumer(MAX_REGEX_LENGTH_SETTING, this::setMaxRegexLength);
scopedSettings.addSettingsUpdateConsumer(DEFAULT_PIPELINE, this::setDefaultPipeline);
}

private void setSearchIdleAfter(TimeValue searchIdleAfter) { this.searchIdleAfter = searchIdleAfter; }
Expand Down Expand Up @@ -821,4 +833,12 @@ public boolean isExplicitRefresh() {
* Returns the time that an index shard becomes search idle unless it's accessed in between
*/
public TimeValue getSearchIdleAfter() { return searchIdleAfter; }

public String getDefaultPipeline() {
return defaultPipeline;
}

public void setDefaultPipeline(String defaultPipeline) {
this.defaultPipeline = defaultPipeline;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@
* Holder class for several ingest related services.
*/
public class IngestService {

public static final String NOOP_PIPELINE_NAME = "_none";

private final PipelineStore pipelineStore;
private final PipelineExecutionService pipelineExecutionService;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterStateApplier;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.metrics.CounterMetric;
import org.elasticsearch.common.metrics.MeanMetric;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
Expand Down Expand Up @@ -73,12 +72,16 @@ protected void doRun() throws Exception {
UpdateRequest updateRequest = (UpdateRequest) actionRequest;
indexRequest = updateRequest.docAsUpsert() ? updateRequest.doc() : updateRequest.upsertRequest();
}
if (indexRequest != null && Strings.hasText(indexRequest.getPipeline())) {
if (indexRequest == null) {
continue;
}
String pipeline = indexRequest.getPipeline();
if (IngestService.NOOP_PIPELINE_NAME.equals(pipeline) == false) {
try {
innerExecute(indexRequest, getPipeline(indexRequest.getPipeline()));
//this shouldn't be needed here but we do it for consistency with index api
// which requires it to prevent double execution
indexRequest.setPipeline(null);
indexRequest.setPipeline(IngestService.NOOP_PIPELINE_NAME);
} catch (Exception e) {
itemFailureHandler.accept(indexRequest, e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
Expand All @@ -45,6 +46,7 @@
import static java.util.Collections.emptySet;
import static java.util.Collections.singleton;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class TransportBulkActionIndicesThatCannotBeCreatedTests extends ESTestCase {
public void testNonExceptional() {
Expand Down Expand Up @@ -97,7 +99,11 @@ public void testSomeFail() {

private void indicesThatCannotBeCreatedTestCase(Set<String> expected,
BulkRequest bulkRequest, Function<String, Boolean> shouldAutoCreate) {
TransportBulkAction action = new TransportBulkAction(Settings.EMPTY, null, mock(TransportService.class), mock(ClusterService.class),
ClusterService clusterService = mock(ClusterService.class);
ClusterState state = mock(ClusterState.class);
when(state.getMetaData()).thenReturn(MetaData.EMPTY_META_DATA);
when(clusterService.state()).thenReturn(state);
TransportBulkAction action = new TransportBulkAction(Settings.EMPTY, null, mock(TransportService.class), clusterService,
null, null, null, mock(ActionFilters.class), null, null) {
@Override
void executeBulk(Task task, BulkRequest bulkRequest, long startTimeNanos, ActionListener<BulkResponse> listener,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.action.bulk;

import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.index.IndexAction;
Expand All @@ -28,13 +29,16 @@
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateApplier;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.ingest.IngestService;
import org.elasticsearch.ingest.PipelineExecutionService;
import org.elasticsearch.tasks.Task;
Expand Down Expand Up @@ -68,6 +72,11 @@

public class TransportBulkActionIngestTests extends ESTestCase {

/**
* Index for which mock settings contain a default pipeline.
*/
private static final String WITH_DEFAULT_PIPELINE = "index_with_default_pipeline";

/** Services needed by bulk action */
TransportService transportService;
ClusterService clusterService;
Expand Down Expand Up @@ -153,6 +162,15 @@ public void setupAction() {
when(nodes.getIngestNodes()).thenReturn(ingestNodes);
ClusterState state = mock(ClusterState.class);
when(state.getNodes()).thenReturn(nodes);
when(state.getMetaData()).thenReturn(MetaData.builder().indices(ImmutableOpenMap.<String, IndexMetaData>builder()
.putAll(
Collections.singletonMap(
WITH_DEFAULT_PIPELINE,
IndexMetaData.builder(WITH_DEFAULT_PIPELINE).settings(
settings(Version.CURRENT).put(IndexSettings.DEFAULT_PIPELINE.getKey(), "default_pipeline")
.build()
).numberOfShards(1).numberOfReplicas(1).build()))
.build()).build());
when(clusterService.state()).thenReturn(state);
doAnswer(invocation -> {
ClusterChangedEvent event = mock(ClusterChangedEvent.class);
Expand Down Expand Up @@ -227,7 +245,7 @@ public void testIngestLocal() throws Exception {
// now check success
Iterator<DocWriteRequest<?>> req = bulkDocsItr.getValue().iterator();
failureHandler.getValue().accept((IndexRequest)req.next(), exception); // have an exception for our one index request
indexRequest2.setPipeline(null); // this is done by the real pipeline execution service when processing
indexRequest2.setPipeline(IngestService.NOOP_PIPELINE_NAME); // this is done by the real pipeline execution service when processing
completionHandler.getValue().accept(null);
assertTrue(action.isExecuted);
assertFalse(responseCalled.get()); // listener would only be called by real index action, not our mocked one
Expand Down Expand Up @@ -259,7 +277,7 @@ public void testSingleItemBulkActionIngestLocal() throws Exception {
assertTrue(failureCalled.get());

// now check success
indexRequest.setPipeline(null); // this is done by the real pipeline execution service when processing
indexRequest.setPipeline(IngestService.NOOP_PIPELINE_NAME); // this is done by the real pipeline execution service when processing
completionHandler.getValue().accept(null);
assertTrue(action.isExecuted);
assertFalse(responseCalled.get()); // listener would only be called by real index action, not our mocked one
Expand Down Expand Up @@ -359,4 +377,35 @@ public void testSingleItemBulkActionIngestForward() throws Exception {
}
}

public void testUseDefaultPipeline() throws Exception {
Exception exception = new Exception("fake exception");
IndexRequest indexRequest = new IndexRequest(WITH_DEFAULT_PIPELINE, "type", "id");
indexRequest.source(Collections.emptyMap());
AtomicBoolean responseCalled = new AtomicBoolean(false);
AtomicBoolean failureCalled = new AtomicBoolean(false);
singleItemBulkWriteAction.execute(null, indexRequest, ActionListener.wrap(
response -> {
responseCalled.set(true);
},
e -> {
assertThat(e, sameInstance(exception));
failureCalled.set(true);
}));

// check failure works, and passes through to the listener
assertFalse(action.isExecuted); // haven't executed yet
assertFalse(responseCalled.get());
assertFalse(failureCalled.get());
verify(executionService).executeBulkRequest(bulkDocsItr.capture(), failureHandler.capture(), completionHandler.capture());
completionHandler.getValue().accept(exception);
assertTrue(failureCalled.get());

// now check success
indexRequest.setPipeline(IngestService.NOOP_PIPELINE_NAME); // this is done by the real pipeline execution service when processing
completionHandler.getValue().accept(null);
assertTrue(action.isExecuted);
assertFalse(responseCalled.get()); // listener would only be called by real index action, not our mocked one
verifyZeroInteractions(transportService);
}

}
Loading