Skip to content

Commit

Permalink
Add limit on number of processors in Ingest pipelines
Browse files Browse the repository at this point in the history
Signed-off-by: Rai <nndri@amazon.com>
  • Loading branch information
Rai committed Aug 28, 2024
1 parent 23cba28 commit 8a39f44
Show file tree
Hide file tree
Showing 7 changed files with 78 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add support for index level max slice count setting for concurrent segment search ([#15336](https://github.com/opensearch-project/OpenSearch/pull/15336))
- Add support for centralize snapshot creation with pinned timestamp ([#15124](https://github.com/opensearch-project/OpenSearch/pull/15124))
- Add concurrent search support for Derived Fields ([#15326](https://github.com/opensearch-project/OpenSearch/pull/15326))
- Add limit on number of processors in Ingest pipelines([#15460](https://github.com/opensearch-project/OpenSearch/issues/15460)).

### Dependencies
- Bump `netty` from 4.1.111.Final to 4.1.112.Final ([#15081](https://github.com/opensearch-project/OpenSearch/pull/15081))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ void executeDocument(
}

public void execute(SimulatePipelineRequest.Parsed request, ActionListener<SimulatePipelineResponse> listener) {

threadPool.executor(THREAD_POOL_NAME).execute(ActionRunnable.wrap(listener, l -> {
final AtomicInteger counter = new AtomicInteger();
final List<SimulateDocumentResult> responses = new CopyOnWriteArrayList<>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.opensearch.common.xcontent.XContentHelper;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.io.stream.Writeable;
import org.opensearch.ingest.IngestPipelineValidator;
import org.opensearch.ingest.IngestService;
import org.opensearch.tasks.Task;
import org.opensearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -88,6 +89,8 @@ protected void doExecute(Task task, SimulatePipelineRequest request, ActionListe
return;
}

IngestPipelineValidator.validateIngestPipeline(simulateRequest.getPipeline(), ingestService.getClusterService());

executionService.execute(simulateRequest, listener);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@
import org.opensearch.indices.fielddata.cache.IndicesFieldDataCache;
import org.opensearch.indices.recovery.RecoverySettings;
import org.opensearch.indices.store.IndicesStore;
import org.opensearch.ingest.IngestPipelineValidator;
import org.opensearch.monitor.fs.FsHealthService;
import org.opensearch.monitor.fs.FsService;
import org.opensearch.monitor.jvm.JvmGcMonitorService;
Expand Down Expand Up @@ -405,6 +406,7 @@ public void apply(Settings value, Settings current, Settings previous) {
ClusterService.USER_DEFINED_METADATA,
ClusterManagerService.MASTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING, // deprecated
ClusterManagerService.CLUSTER_MANAGER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING,
IngestPipelineValidator.MAX_NUMBER_OF_INGEST_PROCESSORS,
SearchService.DEFAULT_SEARCH_TIMEOUT_SETTING,
SearchService.DEFAULT_ALLOW_PARTIAL_SEARCH_RESULTS,
TransportSearchAction.SHARD_COUNT_LIMIT_SETTING,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.ingest;

import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Setting;

import java.util.List;

/**
* This class contains methods to validate the ingest pipeline.
*/
public class IngestPipelineValidator {

/**
* Defines the limit for the number of processors which can run on a given document during ingestion.
*/
public static final Setting<Integer> MAX_NUMBER_OF_INGEST_PROCESSORS = Setting.intSetting(
"cluster.ingest.max_number_processors",
Integer.MAX_VALUE,
1,
Integer.MAX_VALUE,
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

/**
* Validates that the number of compound processors in the pipeline does not exceed the configured limit.
*
* @param pipeline
* @param clusterService
*/
public static void validateIngestPipeline(Pipeline pipeline, ClusterService clusterService) {

List<Processor> processors = pipeline.getCompoundProcessor().getProcessors();
int maxNumberOfIngestProcessorsAllowed = clusterService.getClusterSettings().get(MAX_NUMBER_OF_INGEST_PROCESSORS);

if (processors.size() > maxNumberOfIngestProcessorsAllowed) {
throw new IllegalStateException(
"Cannot use more than the maximum processors allowed. Number of processors configured is ["
+ processors.size()
+ "] which exceeds the maximum allowed configuration of ["
+ maxNumberOfIngestProcessorsAllowed
+ "] processors."
);
}
}
}
4 changes: 4 additions & 0 deletions server/src/main/java/org/opensearch/ingest/IngestService.java
Original file line number Diff line number Diff line change
Expand Up @@ -494,6 +494,9 @@ void validatePipeline(Map<DiscoveryNode, IngestInfo> ingestInfos, PutPipelineReq

Map<String, Object> pipelineConfig = XContentHelper.convertToMap(request.getSource(), false, request.getMediaType()).v2();
Pipeline pipeline = Pipeline.create(request.getId(), pipelineConfig, processorFactories, scriptService);

IngestPipelineValidator.validateIngestPipeline(pipeline, clusterService);

List<Exception> exceptions = new ArrayList<>();
for (Processor processor : pipeline.flattenAllProcessors()) {
for (Map.Entry<DiscoveryNode, IngestInfo> entry : ingestInfos.entrySet()) {
Expand Down Expand Up @@ -1099,6 +1102,7 @@ void innerUpdatePipelines(IngestMetadata newIngestMetadata) {
processorFactories,
scriptService
);
IngestPipelineValidator.validateIngestPipeline(newPipeline, clusterService);
newPipelines.put(newConfiguration.getId(), new PipelineHolder(newConfiguration, newPipeline));

if (previous == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.ingest.IngestPipelineValidator;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.threadpool.TestThreadPool;
import org.junit.After;
Expand Down Expand Up @@ -37,4 +38,16 @@ public void testDeprecatedGetMasterServiceBWC() {
assertThat(masterService, equalTo(clusterManagerService));
}
}

public void testUpdateMaxIngestProcessorCountSetting() {
ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);

// verify defaults
assertEquals(Integer.MAX_VALUE, clusterSettings.get(IngestPipelineValidator.MAX_NUMBER_OF_INGEST_PROCESSORS).intValue());

// verify update max processor
Settings newSettings = Settings.builder().put("cluster.ingest.max_number_processors", 3).build();
clusterSettings.applySettings(newSettings);
assertEquals(3, clusterSettings.get(IngestPipelineValidator.MAX_NUMBER_OF_INGEST_PROCESSORS).intValue());
}
}

0 comments on commit 8a39f44

Please sign in to comment.