Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add remove_by_pattern ingest processor #11920

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add copy ingest processor ([#11870](https://github.com/opensearch-project/OpenSearch/pull/11870))
- Introduce new feature flag "WRITEABLE_REMOTE_INDEX" to gate the writeable remote index functionality ([#11717](https://github.com/opensearch-project/OpenSearch/pull/11170))
- Bump OpenTelemetry from 1.32.0 to 1.34.1 ([#11891](https://github.com/opensearch-project/OpenSearch/pull/11891))
- Add remove_by_pattern ingest processor ([#11920](https://github.com/opensearch-project/OpenSearch/pull/11920))
- Support index level allocation filtering for searchable snapshot index ([#11563](https://github.com/opensearch-project/OpenSearch/pull/11563))
- Add `org.opensearch.rest.MethodHandlers` and `RestController#getAllHandlers` ([11876](https://github.com/opensearch-project/OpenSearch/pull/11876))
- New DateTime format for RFC3339 compatible date fields ([#11465](https://github.com/opensearch-project/OpenSearch/pull/11465))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@
processors.put(HtmlStripProcessor.TYPE, new HtmlStripProcessor.Factory());
processors.put(CsvProcessor.TYPE, new CsvProcessor.Factory());
processors.put(CopyProcessor.TYPE, new CopyProcessor.Factory(parameters.scriptService));
processors.put(RemoveByPatternProcessor.TYPE, new RemoveByPatternProcessor.Factory());

Check warning on line 110 in modules/ingest-common/src/main/java/org/opensearch/ingest/common/IngestCommonModulePlugin.java

View check run for this annotation

Codecov / codecov/patch

modules/ingest-common/src/main/java/org/opensearch/ingest/common/IngestCommonModulePlugin.java#L110

Added line #L110 was not covered by tests
return Collections.unmodifiableMap(processors);
}

Expand Down
gaobinlong marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.ingest.common;

import org.opensearch.common.Nullable;
import org.opensearch.common.ValidationException;
import org.opensearch.common.regex.Regex;
import org.opensearch.core.common.Strings;
import org.opensearch.ingest.AbstractProcessor;
import org.opensearch.ingest.ConfigurationUtils;
import org.opensearch.ingest.IngestDocument;
import org.opensearch.ingest.Processor;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

import static org.opensearch.ingest.ConfigurationUtils.newConfigurationException;

/**
* Processor that removes existing fields by field patterns or excluding field patterns.
*/
public final class RemoveByPatternProcessor extends AbstractProcessor {

public static final String TYPE = "remove_by_pattern";
gaobinlong marked this conversation as resolved.
Show resolved Hide resolved
private final List<String> fieldPatterns;
private final List<String> excludeFieldPatterns;

RemoveByPatternProcessor(
String tag,
String description,
@Nullable List<String> fieldPatterns,
@Nullable List<String> excludeFieldPatterns
) {
super(tag, description);
if (fieldPatterns != null && excludeFieldPatterns != null || fieldPatterns == null && excludeFieldPatterns == null) {
throw new IllegalArgumentException("either fieldPatterns and excludeFieldPatterns must be set");
}
if (fieldPatterns == null) {
this.fieldPatterns = null;
this.excludeFieldPatterns = new ArrayList<>(excludeFieldPatterns);
} else {
this.fieldPatterns = new ArrayList<>(fieldPatterns);
this.excludeFieldPatterns = null;
}
}

public List<String> getFieldPatterns() {
return fieldPatterns;
}

public List<String> getExcludeFieldPatterns() {
return excludeFieldPatterns;
}

@Override
public IngestDocument execute(IngestDocument document) {
Set<String> existingFields = new HashSet<>(document.getSourceAndMetadata().keySet());
Set<String> metadataFields = document.getMetadata()
.keySet()
.stream()
.map(IngestDocument.Metadata::getFieldName)
.collect(Collectors.toSet());

if (fieldPatterns != null && !fieldPatterns.isEmpty()) {
existingFields.forEach(field -> {
// ignore metadata fields such as _index, _id, etc.
if (!metadataFields.contains(field)) {
final boolean matched = fieldPatterns.stream().anyMatch(pattern -> Regex.simpleMatch(pattern, field));
gaobinlong marked this conversation as resolved.
Show resolved Hide resolved
if (matched) {
document.removeField(field);
}
}
});
}

if (excludeFieldPatterns != null && !excludeFieldPatterns.isEmpty()) {
existingFields.forEach(field -> {
// ignore metadata fields such as _index, _id, etc.
if (!metadataFields.contains(field)) {
final boolean matched = excludeFieldPatterns.stream().anyMatch(pattern -> Regex.simpleMatch(pattern, field));
gaobinlong marked this conversation as resolved.
Show resolved Hide resolved
if (!matched) {
document.removeField(field);
}
}
});
}

return document;
}

@Override
public String getType() {
return TYPE;

Check warning on line 103 in modules/ingest-common/src/main/java/org/opensearch/ingest/common/RemoveByPatternProcessor.java

View check run for this annotation

Codecov / codecov/patch

modules/ingest-common/src/main/java/org/opensearch/ingest/common/RemoveByPatternProcessor.java#L103

Added line #L103 was not covered by tests
}

public static final class Factory implements Processor.Factory {

public Factory() {}

@Override
public RemoveByPatternProcessor create(
Map<String, Processor.Factory> registry,
String processorTag,
String description,
Map<String, Object> config
) throws Exception {
final List<String> fieldPatterns = new ArrayList<>();
final List<String> excludeFieldPatterns = new ArrayList<>();
final Object fieldPattern = ConfigurationUtils.readOptionalObject(config, "field_pattern");
final Object excludeFieldPattern = ConfigurationUtils.readOptionalObject(config, "exclude_field_pattern");
gaobinlong marked this conversation as resolved.
Show resolved Hide resolved

if (fieldPattern == null && excludeFieldPattern == null || fieldPattern != null && excludeFieldPattern != null) {
throw newConfigurationException(
TYPE,
processorTag,
"field_pattern",
"either field_pattern or exclude_field_pattern must be set"
);
}

if (fieldPattern != null) {
if (fieldPattern instanceof List) {
@SuppressWarnings("unchecked")
List<String> fieldPatternList = (List<String>) fieldPattern;
fieldPatterns.addAll(fieldPatternList);
} else {
fieldPatterns.add((String) fieldPattern);
}
validateFieldPatterns(processorTag, fieldPatterns, "field_pattern");
return new RemoveByPatternProcessor(processorTag, description, fieldPatterns, null);
} else {
if (excludeFieldPattern instanceof List) {
@SuppressWarnings("unchecked")
List<String> excludeFieldPatternList = (List<String>) excludeFieldPattern;
excludeFieldPatterns.addAll(excludeFieldPatternList);
} else {
excludeFieldPatterns.add((String) excludeFieldPattern);
}
validateFieldPatterns(processorTag, excludeFieldPatterns, "exclude_field_pattern");
return new RemoveByPatternProcessor(processorTag, description, null, excludeFieldPatterns);
}
}

private void validateFieldPatterns(String processorTag, List<String> patterns, String patternKey) {
gaobinlong marked this conversation as resolved.
Show resolved Hide resolved
List<String> validationErrors = new ArrayList<>();
for (String fieldPattern : patterns) {
if (fieldPattern.contains("#")) {
validationErrors.add(patternKey + " [" + fieldPattern + "] must not contain a '#'");
}
if (fieldPattern.contains(":")) {
validationErrors.add(patternKey + " [" + fieldPattern + "] must not contain a ':'");
}
if (fieldPattern.startsWith("_")) {
validationErrors.add(patternKey + " [" + fieldPattern + "] must not start with '_'");
}
if (Strings.validFileNameExcludingAstrix(fieldPattern) == false) {
validationErrors.add(
patternKey + " [" + fieldPattern + "] must not contain the following characters " + Strings.INVALID_FILENAME_CHARS
gaobinlong marked this conversation as resolved.
Show resolved Hide resolved
);
}
}

if (validationErrors.size() > 0) {
ValidationException validationException = new ValidationException();
validationException.addValidationErrors(validationErrors);
throw newConfigurationException(TYPE, processorTag, patternKey, validationException.getMessage());
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.ingest.common;

import org.opensearch.OpenSearchException;
import org.opensearch.OpenSearchParseException;
import org.opensearch.test.OpenSearchTestCase;
import org.junit.Before;

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.hamcrest.CoreMatchers.equalTo;

public class RemoveByPatternProcessorFactoryTests extends OpenSearchTestCase {

private RemoveByPatternProcessor.Factory factory;

@Before
public void init() {
factory = new RemoveByPatternProcessor.Factory();
}

public void testCreateFieldPatterns() throws Exception {
Map<String, Object> config = new HashMap<>();
config.put("field_pattern", "field1*");
String processorTag = randomAlphaOfLength(10);
RemoveByPatternProcessor removeByPatternProcessor = factory.create(null, processorTag, null, config);
assertThat(removeByPatternProcessor.getTag(), equalTo(processorTag));
assertThat(removeByPatternProcessor.getFieldPatterns().get(0), equalTo("field1*"));

Map<String, Object> config2 = new HashMap<>();
config2.put("field_pattern", List.of("field1*", "field2*"));
removeByPatternProcessor = factory.create(null, processorTag, null, config2);
assertThat(removeByPatternProcessor.getTag(), equalTo(processorTag));
assertThat(removeByPatternProcessor.getFieldPatterns().get(0), equalTo("field1*"));
assertThat(removeByPatternProcessor.getFieldPatterns().get(1), equalTo("field2*"));

Map<String, Object> config3 = new HashMap<>();
List<String> patterns = Arrays.asList("foo*", "*", " ", ",", "#", ":", "_");
config3.put("field_pattern", patterns);
Exception exception = expectThrows(OpenSearchParseException.class, () -> factory.create(null, processorTag, null, config3));
assertThat(
exception.getMessage(),
equalTo(
"[field_pattern] Validation Failed: "
+ "1: field_pattern [ ] must not contain the following characters [ , \", *, \\, <, |, ,, >, /, ?];"
+ "2: field_pattern [,] must not contain the following characters [ , \", *, \\, <, |, ,, >, /, ?];"
+ "3: field_pattern [#] must not contain a '#';"
+ "4: field_pattern [:] must not contain a ':';"
+ "5: field_pattern [_] must not start with '_';"
)
);
}

public void testCreateExcludeFieldPatterns() throws Exception {
Map<String, Object> config = new HashMap<>();
config.put("exclude_field_pattern", "field1*");
String processorTag = randomAlphaOfLength(10);
RemoveByPatternProcessor removeByPatternProcessor = factory.create(null, processorTag, null, config);
assertThat(removeByPatternProcessor.getTag(), equalTo(processorTag));
assertThat(removeByPatternProcessor.getExcludeFieldPatterns().get(0), equalTo("field1*"));

Map<String, Object> config2 = new HashMap<>();
config2.put("exclude_field_pattern", List.of("field1*", "field2*"));
removeByPatternProcessor = factory.create(null, processorTag, null, config2);
assertThat(removeByPatternProcessor.getTag(), equalTo(processorTag));
assertThat(removeByPatternProcessor.getExcludeFieldPatterns().get(0), equalTo("field1*"));
assertThat(removeByPatternProcessor.getExcludeFieldPatterns().get(1), equalTo("field2*"));

Map<String, Object> config3 = new HashMap<>();
List<String> patterns = Arrays.asList("foo*", "*", " ", ",", "#", ":", "_");
config3.put("exclude_field_pattern", patterns);
Exception exception = expectThrows(OpenSearchParseException.class, () -> factory.create(null, processorTag, null, config3));
assertThat(
exception.getMessage(),
equalTo(
"[exclude_field_pattern] Validation Failed: "
+ "1: exclude_field_pattern [ ] must not contain the following characters [ , \", *, \\, <, |, ,, >, /, ?];"
+ "2: exclude_field_pattern [,] must not contain the following characters [ , \", *, \\, <, |, ,, >, /, ?];"
+ "3: exclude_field_pattern [#] must not contain a '#';"
+ "4: exclude_field_pattern [:] must not contain a ':';"
+ "5: exclude_field_pattern [_] must not start with '_';"
)
);
}

public void testCreatePatternsFailed() throws Exception {
Map<String, Object> config = new HashMap<>();
config.put("field_pattern", List.of("foo*"));
config.put("exclude_field_pattern", List.of("bar*"));
String processorTag = randomAlphaOfLength(10);
OpenSearchException exception = expectThrows(
OpenSearchParseException.class,
() -> factory.create(null, processorTag, null, config)
);
assertThat(exception.getMessage(), equalTo("[field_pattern] either field_pattern or exclude_field_pattern must be set"));

Map<String, Object> config2 = new HashMap<>();
config2.put("field_pattern", null);
config2.put("exclude_field_pattern", null);

exception = expectThrows(OpenSearchParseException.class, () -> factory.create(null, processorTag, null, config2));
assertThat(exception.getMessage(), equalTo("[field_pattern] either field_pattern or exclude_field_pattern must be set"));
}
}
Loading
Loading