Skip to content

Commit

Permalink
elastic#104411 Add warning headers for ingest pipelines containing sp…
Browse files Browse the repository at this point in the history
…ecial characters (elastic#114837)

* Add logs and headers

For pipeline creation when name is invalid

* Fix YAML tests and add YAML test for warnings

* Update docs/changelog/114837.yaml

* Changelog entry

* Changelog entry

* Update docs/changelog/114837.yaml

* Changelog entry
  • Loading branch information
lukewhiting authored and javanna committed Oct 16, 2024
1 parent 0a7fec4 commit ac584d7
Show file tree
Hide file tree
Showing 7 changed files with 95 additions and 12 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/114837.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 114837
summary: Add warning headers for ingest pipelines containing special characters
area: Ingest Node
type: bug
issues: [ 104411 ]
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@
"Test rolling up json object arrays":
- do:
ingest.put_pipeline:
id: "_id"
id: "pipeline-id"
body: >
{
"processors": [
Expand All @@ -237,7 +237,7 @@
index:
index: test
id: "1"
pipeline: "_id"
pipeline: "pipeline-id"
body: {
values_flat : [],
values: [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"Test with date processor":
- do:
ingest.put_pipeline:
id: "_id"
id: "pipeline-id"
body: >
{
"processors": [
Expand Down Expand Up @@ -44,7 +44,7 @@
index:
index: test
id: "1"
pipeline: "_id"
pipeline: "pipeline-id"
body: {
log: "89.160.20.128 - - [08/Sep/2014:02:54:42 +0000] \"GET /presentations/logstash-scale11x/images/ahhh___rage_face_by_samusmmx-d5g5zap.png HTTP/1.1\" 200 175208 \"http://mobile.rivals.com/board_posts.asp?SID=880&mid=198829575&fid=2208&tid=198829575&Team=&TeamId=&SiteId=\" \"Mozilla/5.0 (Linux; Android 4.2.2; VS980 4G Build/JDQ39B) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/33.0.1750.135 Mobile Safari/537.36\""
}
Expand All @@ -71,7 +71,7 @@
"Test with date processor and ECS-v1":
- do:
ingest.put_pipeline:
id: "_id"
id: "pipeline-id"
body: >
{
"processors": [
Expand Down Expand Up @@ -102,7 +102,7 @@
index:
index: test
id: "1"
pipeline: "_id"
pipeline: "pipeline-id"
body: {
log: "89.160.20.128 - - [08/Sep/2014:02:54:42 +0000] \"GET /presentations/logstash-scale11x/images/ahhh___rage_face_by_samusmmx-d5g5zap.png HTTP/1.1\" 200 175208 \"http://mobile.rivals.com/board_posts.asp?SID=880&mid=198829575&fid=2208&tid=198829575&Team=&TeamId=&SiteId=\" \"Mozilla/5.0 (Linux; Android 4.2.2; VS980 4G Build/JDQ39B) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/33.0.1750.135 Mobile Safari/537.36\""
}
Expand All @@ -128,7 +128,7 @@
"Test mutate":
- do:
ingest.put_pipeline:
id: "_id"
id: "pipeline-id"
body: >
{
"processors": [
Expand Down Expand Up @@ -188,7 +188,7 @@
index:
index: test
id: "1"
pipeline: "_id"
pipeline: "pipeline-id"
body: {
"age" : 33,
"eyeColor" : "brown",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
---
"Test invalid name warnings":
- requires:
cluster_features: [ "ingest.pipeline_name_special_chars_warning" ]
test_runner_features: [ "warnings" ]
reason: verifying deprecation warnings from 9.0 onwards for invalid pipeline names

- do:
cluster.health:
wait_for_status: green

- do:
ingest.put_pipeline:
id: "Invalid*-pipeline:id"
body: >
{
"description": "_description",
"processors": [
{
"set" : {
"field" : "field1",
"value": "_value"
}
}]
}
warnings:
- "Invalid pipeline id: Invalid*-pipeline:id"
- match: { acknowledged: true }
1 change: 1 addition & 0 deletions server/src/main/java/org/elasticsearch/common/Strings.java
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,7 @@ private static String changeFirstCharacterCase(String str, boolean capitalize) {
static final Set<Character> INVALID_CHARS = Set.of('\\', '/', '*', '?', '"', '<', '>', '|', ' ', ',');

public static final String INVALID_FILENAME_CHARS = INVALID_CHARS.stream()
.sorted()
.map(c -> "'" + c + "'")
.collect(Collectors.joining(",", "[", "]"));

Expand Down
21 changes: 21 additions & 0 deletions server/src/main/java/org/elasticsearch/ingest/IngestService.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,16 @@
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.IndexTemplateMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.MetadataCreateIndexService;
import org.elasticsearch.cluster.metadata.MetadataIndexTemplateService;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.cluster.service.MasterServiceTaskQueue;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.TriConsumer;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.logging.DeprecationCategory;
import org.elasticsearch.common.logging.DeprecationLogger;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.CollectionUtils;
Expand All @@ -55,7 +58,9 @@
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.core.UpdateForV10;
import org.elasticsearch.env.Environment;
import org.elasticsearch.features.NodeFeature;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.grok.MatcherWatchdog;
import org.elasticsearch.index.IndexSettings;
Expand Down Expand Up @@ -97,6 +102,7 @@
import java.util.stream.Collectors;

import static org.elasticsearch.core.Strings.format;
import static org.elasticsearch.core.UpdateForV10.Owner.DATA_MANAGEMENT;

/**
* Holder class for several ingest related services.
Expand All @@ -107,7 +113,10 @@ public class IngestService implements ClusterStateApplier, ReportingService<Inge

public static final String INGEST_ORIGIN = "ingest";

public static final NodeFeature PIPELINE_NAME_VALIDATION_WARNINGS = new NodeFeature("ingest.pipeline_name_special_chars_warning");

private static final Logger logger = LogManager.getLogger(IngestService.class);
private static final DeprecationLogger deprecationLogger = DeprecationLogger.getLogger(IngestService.class);

private final MasterServiceTaskQueue<PipelineClusterStateUpdateTask> taskQueue;
private final ClusterService clusterService;
Expand Down Expand Up @@ -652,12 +661,24 @@ public IngestMetadata execute(IngestMetadata currentIngestMetadata, Collection<I
}
}

@UpdateForV10(owner = DATA_MANAGEMENT) // Change deprecation log for special characters in name to a failure
void validatePipeline(Map<DiscoveryNode, IngestInfo> ingestInfos, String pipelineId, Map<String, Object> pipelineConfig)
throws Exception {
if (ingestInfos.isEmpty()) {
throw new IllegalStateException("Ingest info is empty");
}

try {
MetadataCreateIndexService.validateIndexOrAliasName(
pipelineId,
(pipelineName, error) -> new IllegalArgumentException(
"Pipeline name [" + pipelineName + "] will be disallowed in a future version for the following reason: " + error
)
);
} catch (IllegalArgumentException e) {
deprecationLogger.critical(DeprecationCategory.API, "pipeline_name_special_chars", e.getMessage());
}

Pipeline pipeline = Pipeline.create(pipelineId, pipelineConfig, processorFactories, scriptService);
List<Exception> exceptions = new ArrayList<>();
for (Processor processor : pipeline.flattenAllProcessors()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.elasticsearch.action.ingest.DeletePipelineRequest;
import org.elasticsearch.action.ingest.PutPipelineRequest;
import org.elasticsearch.action.support.ActionTestUtils;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.internal.Client;
Expand All @@ -48,6 +49,7 @@
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.core.Strings;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.IndexVersion;
Expand Down Expand Up @@ -424,7 +426,7 @@ public void testDelete() {

public void testValidateNoIngestInfo() throws Exception {
IngestService ingestService = createWithProcessors();
PutPipelineRequest putRequest = putJsonPipelineRequest("_id", """
PutPipelineRequest putRequest = putJsonPipelineRequest("pipeline-id", """
{"processors": [{"set" : {"field": "_field", "value": "_value"}}]}""");

var pipelineConfig = XContentHelper.convertToMap(putRequest.getSource(), false, putRequest.getXContentType()).v2();
Expand Down Expand Up @@ -965,7 +967,7 @@ public void testGetPipelines() {

public void testValidateProcessorTypeOnAllNodes() throws Exception {
IngestService ingestService = createWithProcessors();
PutPipelineRequest putRequest = putJsonPipelineRequest("_id", """
PutPipelineRequest putRequest = putJsonPipelineRequest("pipeline-id", """
{
"processors": [
{
Expand Down Expand Up @@ -1009,7 +1011,7 @@ public void testValidateConfigurationExceptions() {
// ordinary validation issues happen at processor construction time
throw newConfigurationException("fail_validation", tag, "no_property_name", "validation failure reason");
}));
PutPipelineRequest putRequest = putJsonPipelineRequest("_id", """
PutPipelineRequest putRequest = putJsonPipelineRequest("pipeline-id", """
{
"processors": [
{
Expand Down Expand Up @@ -1043,7 +1045,7 @@ public void extraValidation() throws Exception {
}
};
}));
PutPipelineRequest putRequest = putJsonPipelineRequest("_id", """
PutPipelineRequest putRequest = putJsonPipelineRequest("pipeline-id", """
{
"processors": [
{
Expand All @@ -1067,6 +1069,32 @@ public void extraValidation() throws Exception {
assertEquals("fail_extra_validation", e.getMetadata("es.processor_type").get(0));
}

public void testValidatePipelineName() throws Exception {
IngestService ingestService = createWithProcessors();
for (Character badChar : List.of('\\', '/', '*', '?', '"', '<', '>', '|', ' ', ',')) {
PutPipelineRequest putRequest = new PutPipelineRequest(
TimeValue.timeValueSeconds(10),
AcknowledgedRequest.DEFAULT_ACK_TIMEOUT,
"_id",
new BytesArray("""
{"description":"test processor","processors":[{"set":{"field":"_field","value":"_value"}}]}"""),
XContentType.JSON
);
var pipelineConfig = XContentHelper.convertToMap(putRequest.getSource(), false, putRequest.getXContentType()).v2();
DiscoveryNode node1 = DiscoveryNodeUtils.create("_node_id1", buildNewFakeTransportAddress(), Map.of(), Set.of());
Map<DiscoveryNode, IngestInfo> ingestInfos = new HashMap<>();
ingestInfos.put(node1, new IngestInfo(List.of(new ProcessorInfo("set"))));
final String name = randomAlphaOfLength(5) + badChar + randomAlphaOfLength(5);
ingestService.validatePipeline(ingestInfos, name, pipelineConfig);
assertCriticalWarnings(
"Pipeline name ["
+ name
+ "] will be disallowed in a future version for the following reason: must not contain the following characters"
+ " [' ','\"','*',',','/','<','>','?','\\','|']"
);
}
}

public void testExecuteIndexPipelineExistsButFailedParsing() {
IngestService ingestService = createWithProcessors(
Map.of("mock", (factories, tag, description, config) -> new AbstractProcessor("mock", "description") {
Expand Down

0 comments on commit ac584d7

Please sign in to comment.