diff --git a/CHANGELOG.md b/CHANGELOG.md index 0cd030290d4de..d43fa6ab31308 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -28,6 +28,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add support for index level max slice count setting for concurrent segment search ([#15336](https://github.com/opensearch-project/OpenSearch/pull/15336)) - Add support for centralize snapshot creation with pinned timestamp ([#15124](https://github.com/opensearch-project/OpenSearch/pull/15124)) - Add concurrent search support for Derived Fields ([#15326](https://github.com/opensearch-project/OpenSearch/pull/15326)) +- Add limit on number of processors in Ingest pipelines([#15460](https://github.com/opensearch-project/OpenSearch/issues/15460)). ### Dependencies - Bump `netty` from 4.1.111.Final to 4.1.112.Final ([#15081](https://github.com/opensearch-project/OpenSearch/pull/15081)) diff --git a/server/src/main/java/org/opensearch/action/ingest/SimulateExecutionService.java b/server/src/main/java/org/opensearch/action/ingest/SimulateExecutionService.java index c7c0f21eb0876..818100539849a 100644 --- a/server/src/main/java/org/opensearch/action/ingest/SimulateExecutionService.java +++ b/server/src/main/java/org/opensearch/action/ingest/SimulateExecutionService.java @@ -91,6 +91,7 @@ void executeDocument( } public void execute(SimulatePipelineRequest.Parsed request, ActionListener listener) { + threadPool.executor(THREAD_POOL_NAME).execute(ActionRunnable.wrap(listener, l -> { final AtomicInteger counter = new AtomicInteger(); final List responses = new CopyOnWriteArrayList<>( diff --git a/server/src/main/java/org/opensearch/action/ingest/SimulatePipelineTransportAction.java b/server/src/main/java/org/opensearch/action/ingest/SimulatePipelineTransportAction.java index 4753679d370af..22075076f0cf3 100644 --- a/server/src/main/java/org/opensearch/action/ingest/SimulatePipelineTransportAction.java +++ b/server/src/main/java/org/opensearch/action/ingest/SimulatePipelineTransportAction.java @@ -38,6 +38,7 @@ import org.opensearch.common.xcontent.XContentHelper; import org.opensearch.core.action.ActionListener; import org.opensearch.core.common.io.stream.Writeable; +import org.opensearch.ingest.IngestPipelineValidator; import org.opensearch.ingest.IngestService; import org.opensearch.tasks.Task; import org.opensearch.threadpool.ThreadPool; @@ -88,6 +89,8 @@ protected void doExecute(Task task, SimulatePipelineRequest request, ActionListe return; } + IngestPipelineValidator.validateIngestPipeline(simulateRequest.getPipeline(), ingestService.getClusterService()); + executionService.execute(simulateRequest, listener); } } diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 8daf9125bb27e..4d605dcf5d819 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -132,6 +132,7 @@ import org.opensearch.indices.fielddata.cache.IndicesFieldDataCache; import org.opensearch.indices.recovery.RecoverySettings; import org.opensearch.indices.store.IndicesStore; +import org.opensearch.ingest.IngestPipelineValidator; import org.opensearch.monitor.fs.FsHealthService; import org.opensearch.monitor.fs.FsService; import org.opensearch.monitor.jvm.JvmGcMonitorService; @@ -405,6 +406,7 @@ public void apply(Settings value, Settings current, Settings previous) { ClusterService.USER_DEFINED_METADATA, ClusterManagerService.MASTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING, // deprecated ClusterManagerService.CLUSTER_MANAGER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING, + IngestPipelineValidator.MAX_NUMBER_OF_INGEST_PROCESSORS, SearchService.DEFAULT_SEARCH_TIMEOUT_SETTING, SearchService.DEFAULT_ALLOW_PARTIAL_SEARCH_RESULTS, TransportSearchAction.SHARD_COUNT_LIMIT_SETTING, diff --git a/server/src/main/java/org/opensearch/ingest/IngestPipelineValidator.java b/server/src/main/java/org/opensearch/ingest/IngestPipelineValidator.java new file mode 100644 index 0000000000000..ff3abee9f5a11 --- /dev/null +++ b/server/src/main/java/org/opensearch/ingest/IngestPipelineValidator.java @@ -0,0 +1,54 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.ingest; + +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.settings.Setting; + +import java.util.List; + +/** + * This class contains methods to validate the ingest pipeline. + */ +public class IngestPipelineValidator { + + /** + * Defines the limit for the number of processors which can run on a given document during ingestion. + */ + public static final Setting MAX_NUMBER_OF_INGEST_PROCESSORS = Setting.intSetting( + "cluster.ingest.max_number_processors", + Integer.MAX_VALUE, + 1, + Integer.MAX_VALUE, + Setting.Property.NodeScope, + Setting.Property.Dynamic + ); + + /** + * Validates that the number of compound processors in the pipeline does not exceed the configured limit. + * + * @param pipeline + * @param clusterService + */ + public static void validateIngestPipeline(Pipeline pipeline, ClusterService clusterService) { + + List processors = pipeline.getCompoundProcessor().getProcessors(); + int maxNumberOfIngestProcessorsAllowed = clusterService.getClusterSettings().get(MAX_NUMBER_OF_INGEST_PROCESSORS); + + if (processors.size() > maxNumberOfIngestProcessorsAllowed) { + throw new IllegalStateException( + "Cannot use more than the maximum processors allowed. Number of processors configured is [" + + processors.size() + + "] which exceeds the maximum allowed configuration of [" + + maxNumberOfIngestProcessorsAllowed + + "] processors." + ); + } + } +} diff --git a/server/src/main/java/org/opensearch/ingest/IngestService.java b/server/src/main/java/org/opensearch/ingest/IngestService.java index 938ca7493926e..d614badcaea2d 100644 --- a/server/src/main/java/org/opensearch/ingest/IngestService.java +++ b/server/src/main/java/org/opensearch/ingest/IngestService.java @@ -494,6 +494,9 @@ void validatePipeline(Map ingestInfos, PutPipelineReq Map pipelineConfig = XContentHelper.convertToMap(request.getSource(), false, request.getMediaType()).v2(); Pipeline pipeline = Pipeline.create(request.getId(), pipelineConfig, processorFactories, scriptService); + + IngestPipelineValidator.validateIngestPipeline(pipeline, clusterService); + List exceptions = new ArrayList<>(); for (Processor processor : pipeline.flattenAllProcessors()) { for (Map.Entry entry : ingestInfos.entrySet()) { @@ -1099,6 +1102,7 @@ void innerUpdatePipelines(IngestMetadata newIngestMetadata) { processorFactories, scriptService ); + IngestPipelineValidator.validateIngestPipeline(newPipeline, clusterService); newPipelines.put(newConfiguration.getId(), new PipelineHolder(newConfiguration, newPipeline)); if (previous == null) { diff --git a/server/src/test/java/org/opensearch/cluster/service/ClusterServiceTests.java b/server/src/test/java/org/opensearch/cluster/service/ClusterServiceTests.java index 4d88683826af7..f7e6d065dca67 100644 --- a/server/src/test/java/org/opensearch/cluster/service/ClusterServiceTests.java +++ b/server/src/test/java/org/opensearch/cluster/service/ClusterServiceTests.java @@ -10,6 +10,7 @@ import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; +import org.opensearch.ingest.IngestPipelineValidator; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.threadpool.TestThreadPool; import org.junit.After; @@ -37,4 +38,16 @@ public void testDeprecatedGetMasterServiceBWC() { assertThat(masterService, equalTo(clusterManagerService)); } } + + public void testUpdateMaxIngestProcessorCountSetting() { + ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + + // verify defaults + assertEquals(Integer.MAX_VALUE, clusterSettings.get(IngestPipelineValidator.MAX_NUMBER_OF_INGEST_PROCESSORS).intValue()); + + // verify update max processor + Settings newSettings = Settings.builder().put("cluster.ingest.max_number_processors", 3).build(); + clusterSettings.applySettings(newSettings); + assertEquals(3, clusterSettings.get(IngestPipelineValidator.MAX_NUMBER_OF_INGEST_PROCESSORS).intValue()); + } }