Skip to content

Commit

Permalink
Fix conflict
Browse files Browse the repository at this point in the history
Signed-off-by: Rishab Nahata <rnnahata@amazon.com>
  • Loading branch information
imRishN committed Aug 27, 2024
2 parents b1b5b99 + 46a269e commit 5d2cd14
Show file tree
Hide file tree
Showing 105 changed files with 2,839 additions and 124 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/gradle-check.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ jobs:
- uses: actions/checkout@v4
- name: Get changed files
id: changed-files-specific
uses: tj-actions/changed-files@v44
uses: tj-actions/changed-files@v45
with:
files_ignore: |
release-notes/*.md
Expand Down
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Fix for hasInitiatedFetching to fix allocation explain and manual reroute APIs (([#14972](https://github.com/opensearch-project/OpenSearch/pull/14972))
- [Workload Management] Add queryGroupId to Task ([14708](https://github.com/opensearch-project/OpenSearch/pull/14708))
- Add setting to ignore throttling nodes for allocation of unassigned primaries in remote restore ([#14991](https://github.com/opensearch-project/OpenSearch/pull/14991))
- [Workload Management] Add Delete QueryGroup API Logic ([#14735](https://github.com/opensearch-project/OpenSearch/pull/14735))
- [Streaming Indexing] Enhance RestClient with a new streaming API support ([#14437](https://github.com/opensearch-project/OpenSearch/pull/14437))
- Add basic aggregation support for derived fields ([#14618](https://github.com/opensearch-project/OpenSearch/pull/14618))
- [Workload Management] Add Create QueryGroup API Logic ([#14680](https://github.com/opensearch-project/OpenSearch/pull/14680))- [Workload Management] Add Create QueryGroup API Logic ([#14680](https://github.com/opensearch-project/OpenSearch/pull/14680))
Expand All @@ -23,7 +24,9 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Support filtering on a large list encoded by bitmap ([#14774](https://github.com/opensearch-project/OpenSearch/pull/14774))
- Add slice execution listeners to SearchOperationListener interface ([#15153](https://github.com/opensearch-project/OpenSearch/pull/15153))
- Make balanced shards allocator timebound ([#15239](https://github.com/opensearch-project/OpenSearch/pull/15239))
- Add allowlist setting for ingest-geoip and ingest-useragent ([#15325](https://github.com/opensearch-project/OpenSearch/pull/15325))
- Adding access to noSubMatches and noOverlappingMatches in Hyphenation ([#13895](https://github.com/opensearch-project/OpenSearch/pull/13895))
- Add support for index level max slice count setting for concurrent segment search ([#15336](https://github.com/opensearch-project/OpenSearch/pull/15336))

### Dependencies
- Bump `netty` from 4.1.111.Final to 4.1.112.Final ([#15081](https://github.com/opensearch-project/OpenSearch/pull/15081))
Expand All @@ -44,6 +47,9 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Bump `com.azure:azure-core-http-netty` from 1.15.1 to 1.15.3 ([#15300](https://github.com/opensearch-project/OpenSearch/pull/15300))
- Bump `com.gradle.develocity` from 3.17.6 to 3.18 ([#15297](https://github.com/opensearch-project/OpenSearch/pull/15297))
- Bump `commons-cli:commons-cli` from 1.8.0 to 1.9.0 ([#15298](https://github.com/opensearch-project/OpenSearch/pull/15298))
- Bump `opentelemetry` from 1.40.0 to 1.41.0 ([#15361](https://github.com/opensearch-project/OpenSearch/pull/15361))
- Bump `opentelemetry-semconv` from 1.26.0-alpha to 1.27.0-alpha ([#15361](https://github.com/opensearch-project/OpenSearch/pull/15361))
- Bump `tj-actions/changed-files` from 44 to 45 ([#15422](https://github.com/opensearch-project/OpenSearch/pull/15422))

### Changed
- Add lower limit for primary and replica batch allocators timeout ([#14979](https://github.com/opensearch-project/OpenSearch/pull/14979))
Expand All @@ -63,6 +69,9 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Fixed array field name omission in flat_object function for nested JSON ([#13620](https://github.com/opensearch-project/OpenSearch/pull/13620))
- Fix range aggregation optimization ignoring top level queries ([#15194](https://github.com/opensearch-project/OpenSearch/pull/15194))
- Fix incorrect parameter names in MinHash token filter configuration handling ([#15233](https://github.com/opensearch-project/OpenSearch/pull/15233))
- Fix indexing error when flat_object field is explicitly null ([#15375](https://github.com/opensearch-project/OpenSearch/pull/15375))
- Fix split response processor not included in allowlist ([#15393](https://github.com/opensearch-project/OpenSearch/pull/15393))
- Fix unchecked cast in dynamic action map getter ([#15394](https://github.com/opensearch-project/OpenSearch/pull/15394))

### Security

Expand Down
4 changes: 2 additions & 2 deletions buildSrc/version.properties
Original file line number Diff line number Diff line change
Expand Up @@ -74,5 +74,5 @@ jzlib = 1.1.3
resteasy = 6.2.4.Final

# opentelemetry dependencies
opentelemetry = 1.40.0
opentelemetrysemconv = 1.26.0-alpha
opentelemetry = 1.41.0
opentelemetrysemconv = 1.27.0-alpha
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.opensearch.common.cache.CacheBuilder;
import org.opensearch.common.io.PathUtils;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.io.IOUtils;
import org.opensearch.ingest.Processor;
import org.opensearch.plugins.IngestPlugin;
Expand All @@ -62,10 +63,18 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class IngestGeoIpModulePlugin extends Plugin implements IngestPlugin, Closeable {
static final Setting<List<String>> PROCESSORS_ALLOWLIST_SETTING = Setting.listSetting(
"ingest.geoip.processors.allowed",
List.of(),
Function.identity(),
Setting.Property.NodeScope
);
public static final Setting<Long> CACHE_SIZE = Setting.longSetting("ingest.geoip.cache_size", 1000, 0, Setting.Property.NodeScope);

static String[] DEFAULT_DATABASE_FILENAMES = new String[] { "GeoLite2-ASN.mmdb", "GeoLite2-City.mmdb", "GeoLite2-Country.mmdb" };
Expand All @@ -74,7 +83,7 @@ public class IngestGeoIpModulePlugin extends Plugin implements IngestPlugin, Clo

@Override
public List<Setting<?>> getSettings() {
return Arrays.asList(CACHE_SIZE);
return Arrays.asList(CACHE_SIZE, PROCESSORS_ALLOWLIST_SETTING);
}

@Override
Expand All @@ -90,7 +99,10 @@ public Map<String, Processor.Factory> getProcessors(Processor.Parameters paramet
} catch (IOException e) {
throw new RuntimeException(e);
}
return Collections.singletonMap(GeoIpProcessor.TYPE, new GeoIpProcessor.Factory(databaseReaders, new GeoIpCache(cacheSize)));
return filterForAllowlistSetting(
parameters.env.settings(),
Map.of(GeoIpProcessor.TYPE, new GeoIpProcessor.Factory(databaseReaders, new GeoIpCache(cacheSize)))
);
}

/*
Expand Down Expand Up @@ -175,6 +187,30 @@ public void close() throws IOException {
}
}

private Map<String, Processor.Factory> filterForAllowlistSetting(Settings settings, Map<String, Processor.Factory> map) {
if (PROCESSORS_ALLOWLIST_SETTING.exists(settings) == false) {
return Map.copyOf(map);
}
final Set<String> allowlist = Set.copyOf(PROCESSORS_ALLOWLIST_SETTING.get(settings));
// Assert that no unknown processors are defined in the allowlist
final Set<String> unknownAllowlistProcessors = allowlist.stream()
.filter(p -> map.containsKey(p) == false)
.collect(Collectors.toUnmodifiableSet());
if (unknownAllowlistProcessors.isEmpty() == false) {
throw new IllegalArgumentException(
"Processor(s) "
+ unknownAllowlistProcessors
+ " were defined in ["
+ PROCESSORS_ALLOWLIST_SETTING.getKey()
+ "] but do not exist"
);
}
return map.entrySet()
.stream()
.filter(e -> allowlist.contains(e.getKey()))
.collect(Collectors.toUnmodifiableMap(Map.Entry::getKey, Map.Entry::getValue));
}

/**
* The in-memory cache for the geoip data. There should only be 1 instance of this class..
* This cache differs from the maxmind's {@link NodeCache} such that this cache stores the deserialized Json objects to avoid the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,20 @@
import com.maxmind.geoip2.model.AbstractResponse;

import org.opensearch.common.network.InetAddresses;
import org.opensearch.common.settings.Settings;
import org.opensearch.env.TestEnvironment;
import org.opensearch.ingest.Processor;
import org.opensearch.ingest.geoip.IngestGeoIpModulePlugin.GeoIpCache;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.test.StreamsUtils;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;
import java.util.Set;

import static org.mockito.Mockito.mock;

Expand Down Expand Up @@ -77,4 +89,87 @@ public void testInvalidInit() {
IllegalArgumentException ex = expectThrows(IllegalArgumentException.class, () -> new GeoIpCache(-1));
assertEquals("geoip max cache size must be 0 or greater", ex.getMessage());
}

public void testAllowList() throws IOException {
runAllowListTest(List.of());
runAllowListTest(List.of("geoip"));
}

public void testInvalidAllowList() throws IOException {
List<String> invalidAllowList = List.of("set");
Settings.Builder settingsBuilder = Settings.builder()
.putList(IngestGeoIpModulePlugin.PROCESSORS_ALLOWLIST_SETTING.getKey(), invalidAllowList);
createDb(settingsBuilder);
try (IngestGeoIpModulePlugin plugin = new IngestGeoIpModulePlugin()) {
IllegalArgumentException e = expectThrows(
IllegalArgumentException.class,
() -> plugin.getProcessors(createParameters(settingsBuilder.build()))
);
assertEquals(
"Processor(s) "
+ invalidAllowList
+ " were defined in ["
+ IngestGeoIpModulePlugin.PROCESSORS_ALLOWLIST_SETTING.getKey()
+ "] but do not exist",
e.getMessage()
);
}
}

public void testAllowListNotSpecified() throws IOException {
Settings.Builder settingsBuilder = Settings.builder();
settingsBuilder.remove(IngestGeoIpModulePlugin.PROCESSORS_ALLOWLIST_SETTING.getKey());
createDb(settingsBuilder);
try (IngestGeoIpModulePlugin plugin = new IngestGeoIpModulePlugin()) {
final Set<String> expected = Set.of("geoip");
assertEquals(expected, plugin.getProcessors(createParameters(settingsBuilder.build())).keySet());
}
}

private void runAllowListTest(List<String> allowList) throws IOException {
Settings.Builder settingsBuilder = Settings.builder();
createDb(settingsBuilder);
final Settings settings = settingsBuilder.putList(IngestGeoIpModulePlugin.PROCESSORS_ALLOWLIST_SETTING.getKey(), allowList).build();
try (IngestGeoIpModulePlugin plugin = new IngestGeoIpModulePlugin()) {
assertEquals(Set.copyOf(allowList), plugin.getProcessors(createParameters(settings)).keySet());
}
}

private void createDb(Settings.Builder settingsBuilder) throws IOException {
Path configDir = createTempDir();
Path userAgentConfigDir = configDir.resolve("ingest-geoip");
Files.createDirectories(userAgentConfigDir);
settingsBuilder.put("ingest.geoip.database_path", configDir).put("path.home", configDir);
try {
Files.copy(
new ByteArrayInputStream(StreamsUtils.copyToBytesFromClasspath("/GeoLite2-City.mmdb")),
configDir.resolve("GeoLite2-City.mmdb")
);
Files.copy(
new ByteArrayInputStream(StreamsUtils.copyToBytesFromClasspath("/GeoLite2-Country.mmdb")),
configDir.resolve("GeoLite2-Country.mmdb")
);
Files.copy(
new ByteArrayInputStream(StreamsUtils.copyToBytesFromClasspath("/GeoLite2-ASN.mmdb")),
configDir.resolve("GeoLite2-ASN.mmdb")
);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}

private static Processor.Parameters createParameters(Settings settings) {
return new Processor.Parameters(
TestEnvironment.newEnvironment(settings),
null,
null,
null,
() -> 0L,
(a, b) -> null,
null,
null,
$ -> {},
null
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
package org.opensearch.ingest.useragent;

import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.ingest.Processor;
import org.opensearch.plugins.IngestPlugin;
import org.opensearch.plugins.Plugin;
Expand All @@ -47,10 +48,19 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class IngestUserAgentModulePlugin extends Plugin implements IngestPlugin {

static final Setting<List<String>> PROCESSORS_ALLOWLIST_SETTING = Setting.listSetting(
"ingest.useragent.processors.allowed",
List.of(),
Function.identity(),
Setting.Property.NodeScope
);
private final Setting<Long> CACHE_SIZE_SETTING = Setting.longSetting(
"ingest.user_agent.cache_size",
1000,
Expand All @@ -77,7 +87,34 @@ public Map<String, Processor.Factory> getProcessors(Processor.Parameters paramet
} catch (IOException e) {
throw new RuntimeException(e);
}
return Collections.singletonMap(UserAgentProcessor.TYPE, new UserAgentProcessor.Factory(userAgentParsers));
return filterForAllowlistSetting(
parameters.env.settings(),
Collections.singletonMap(UserAgentProcessor.TYPE, new UserAgentProcessor.Factory(userAgentParsers))
);
}

private Map<String, Processor.Factory> filterForAllowlistSetting(Settings settings, Map<String, Processor.Factory> map) {
if (PROCESSORS_ALLOWLIST_SETTING.exists(settings) == false) {
return Map.copyOf(map);
}
final Set<String> allowlist = Set.copyOf(PROCESSORS_ALLOWLIST_SETTING.get(settings));
// Assert that no unknown processors are defined in the allowlist
final Set<String> unknownAllowlistProcessors = allowlist.stream()
.filter(p -> map.containsKey(p) == false)
.collect(Collectors.toUnmodifiableSet());
if (unknownAllowlistProcessors.isEmpty() == false) {
throw new IllegalArgumentException(
"Processor(s) "
+ unknownAllowlistProcessors
+ " were defined in ["
+ PROCESSORS_ALLOWLIST_SETTING.getKey()
+ "] but do not exist"
);
}
return map.entrySet()
.stream()
.filter(e -> allowlist.contains(e.getKey()))
.collect(Collectors.toUnmodifiableMap(Map.Entry::getKey, Map.Entry::getValue));
}

static Map<String, UserAgentParser> createUserAgentParsers(Path userAgentConfigDirectory, UserAgentCache cache) throws IOException {
Expand Down
Loading

0 comments on commit 5d2cd14

Please sign in to comment.