Skip to content

Commit

Permalink
Add geoip service extension (#3944)
Browse files Browse the repository at this point in the history
* Add geoip service extension

Signed-off-by: Asif Sohail Mohammed <nsifmoh@amazon.com>

---------

Signed-off-by: Asif Sohail Mohammed <nsifmoh@amazon.com>
  • Loading branch information
asifsmohammed authored Jan 17, 2024
1 parent f21937a commit 29b0630
Show file tree
Hide file tree
Showing 21 changed files with 558 additions and 49 deletions.
2 changes: 2 additions & 0 deletions data-prepper-plugins/geoip-processor/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@ dependencies {
implementation 'software.amazon.awssdk.crt:aws-crt:0.21.17'
implementation 'com.maxmind.geoip2:geoip2:4.0.1'
implementation 'com.maxmind.db:maxmind-db:3.0.0'
implementation 'org.hibernate.validator:hibernate-validator:8.0.1.Final'

implementation libs.commons.lang3
testImplementation project(':data-prepper-core')
testImplementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310'
testImplementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml'
testImplementation project(':data-prepper-test-common')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.plugins.processor.configuration.KeysConfig;
import org.opensearch.dataprepper.plugins.processor.databaseenrich.EnrichFailedException;
import org.opensearch.dataprepper.plugins.processor.extension.GeoIpConfigSupplier;
import org.opensearch.dataprepper.plugins.processor.utils.IPValidationcheck;
import org.opensearch.dataprepper.logging.DataPrepperMarkers;
import org.slf4j.Logger;
Expand Down Expand Up @@ -52,14 +53,17 @@ public class GeoIPProcessor extends AbstractProcessor<Record<Event>, Record<Even
*/
@DataPrepperPluginConstructor
public GeoIPProcessor(PluginSetting pluginSetting,
final GeoIPProcessorConfig geoCodingProcessorConfig) {
final GeoIPProcessorConfig geoCodingProcessorConfig,
final GeoIpConfigSupplier geoIpConfigSupplier) {
super(pluginSetting);
this.geoIPProcessorConfig = geoCodingProcessorConfig;
this.tempPath = System.getProperty("java.io.tmpdir")+ File.separator + TEMP_PATH_FOLDER;
geoIPProcessorService = new GeoIPProcessorService(geoCodingProcessorConfig,tempPath);
tagsOnSourceNotFoundFailure = geoCodingProcessorConfig.getTagsOnSourceNotFoundFailure();
this.geoIpProcessingMatchCounter = pluginMetrics.counter(GEO_IP_PROCESSING_MATCH);
this.geoIpProcessingMismatchCounter = pluginMetrics.counter(GEO_IP_PROCESSING_MISMATCH);
// TODO: use this service and clean up MaxMind service config from pipeline.yaml
//geoIPProcessorService = geoIpConfigSupplier.getGeoIPProcessorService();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import jakarta.validation.constraints.NotNull;
import org.opensearch.dataprepper.plugins.processor.configuration.KeysConfig;
import org.opensearch.dataprepper.plugins.processor.configuration.ServiceTypeOptions;
import org.opensearch.dataprepper.plugins.processor.configuration.AwsAuthenticationOptions;
import org.opensearch.dataprepper.plugins.processor.extension.AwsAuthenticationOptionsConfig;

import java.util.List;

Expand All @@ -22,7 +22,7 @@ public class GeoIPProcessorConfig {
@JsonProperty("aws")
@NotNull
@Valid
private AwsAuthenticationOptions awsAuthenticationOptions;
private AwsAuthenticationOptionsConfig awsAuthenticationOptionsConfig;

@JsonProperty("keys")
@NotNull
Expand All @@ -39,8 +39,8 @@ public class GeoIPProcessorConfig {
* Aws Authentication configuration Options
* @return AwsAuthenticationOptions
*/
public AwsAuthenticationOptions getAwsAuthenticationOptions() {
return awsAuthenticationOptions;
public AwsAuthenticationOptionsConfig getAwsAuthenticationOptions() {
return awsAuthenticationOptionsConfig;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.processor.configuration;
package org.opensearch.dataprepper.plugins.processor.extension;

import java.util.Map;
import java.util.UUID;
Expand All @@ -20,8 +20,7 @@
/**
* An implementation class AWS Authentication configuration
*/
public class AwsAuthenticationOptions {

public class AwsAuthenticationOptionsConfig {
@JsonProperty("region")
@Size(min = 1, message = "Region cannot be empty string")
private String awsRegion;
Expand Down Expand Up @@ -66,8 +65,10 @@ public AwsCredentialsProvider authenticateAwsConfiguration() {
configuration -> awsStsHeaderOverrides.forEach(configuration::putHeader));
}

awsCredentialsProvider = StsAssumeRoleCredentialsProvider.builder().stsClient(stsClient)
.refreshRequest(assumeRoleRequestBuilder.build()).build();
awsCredentialsProvider = StsAssumeRoleCredentialsProvider.builder()
.stsClient(stsClient)
.refreshRequest(assumeRoleRequestBuilder.build())
.build();

} else {
awsCredentialsProvider = DefaultCredentialsProvider.create();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.processor.extension;

import org.opensearch.dataprepper.plugins.processor.GeoIPProcessorService;

public class DefaultGeoIpConfigSupplier implements GeoIpConfigSupplier {
private final GeoIpServiceConfig geoIpServiceConfig;

public DefaultGeoIpConfigSupplier(final GeoIpServiceConfig geoIpServiceConfig) {
this.geoIpServiceConfig = geoIpServiceConfig;
}

@Override
public GeoIPProcessorService getGeoIPProcessorService() {
//TODO: use GeoIpServiceConfig and return GeoIPProcessorService
return null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.processor.extension;

import org.opensearch.dataprepper.model.annotations.DataPrepperExtensionPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.plugin.ExtensionPlugin;
import org.opensearch.dataprepper.model.plugin.ExtensionPoints;

@DataPrepperExtensionPlugin(modelType = GeoIpServiceConfig.class, rootKeyJsonPath = "/geoip_service")
public class GeoIpConfigExtension implements ExtensionPlugin {
private final DefaultGeoIpConfigSupplier defaultGeoIpConfigSupplier;

@DataPrepperPluginConstructor
public GeoIpConfigExtension(final GeoIpServiceConfig geoIpServiceConfig) {
this.defaultGeoIpConfigSupplier = new DefaultGeoIpConfigSupplier(geoIpServiceConfig != null ? geoIpServiceConfig : new GeoIpServiceConfig());
}

@Override
public void apply(final ExtensionPoints extensionPoints) {
extensionPoints.addExtensionProvider(new GeoIpConfigProvider(this.defaultGeoIpConfigSupplier));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.processor.extension;

import org.opensearch.dataprepper.model.plugin.ExtensionProvider;

import java.util.Optional;

public class GeoIpConfigProvider implements ExtensionProvider<GeoIpConfigSupplier> {
private final GeoIpConfigSupplier geoIpConfigSupplier;

public GeoIpConfigProvider(final GeoIpConfigSupplier geoIpConfigSupplier) {
this.geoIpConfigSupplier = geoIpConfigSupplier;
}

@Override
public Optional<GeoIpConfigSupplier> provideInstance(Context context) {
return Optional.of(this.geoIpConfigSupplier);
}

@Override
public Class<GeoIpConfigSupplier> supportedClass() {
return GeoIpConfigSupplier.class;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.processor.extension;

import org.opensearch.dataprepper.plugins.processor.GeoIPProcessorService;

/**
* Interface for supplying {@link GeoIPProcessorService} to {@link GeoIpConfigExtension}
*
* @since 2.7
*/
public interface GeoIpConfigSupplier {
/**
* Returns the {@link GeoIPProcessorService}
*
* @since 2.7
*/
GeoIPProcessorService getGeoIPProcessorService();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.processor.extension;

import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.Valid;

public class GeoIpServiceConfig {
private static final MaxMindConfig DEFAULT_MAXMIND_CONFIG = new MaxMindConfig();

public GeoIpServiceConfig() {
// This default constructor is used if geoip_service is not configured
}

@JsonProperty("maxmind")
@Valid
private MaxMindConfig maxMindConfig = DEFAULT_MAXMIND_CONFIG;

/**
* Gets the configuration for MaxMind.
*
* @return The MaxMind configuration
* @since 2.7
*/
public MaxMindConfig getMaxMindConfig() {
return maxMindConfig;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.processor.extension;

import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.Valid;
import jakarta.validation.constraints.AssertTrue;
import jakarta.validation.constraints.Min;
import org.hibernate.validator.constraints.time.DurationMax;
import org.hibernate.validator.constraints.time.DurationMin;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;

public class MaxMindConfig {
private static final String S3_PREFIX = "s3://";

//TODO: Add validations to database paths
//TODO: Make default path to be a public CDN endpoint
private static final List<String> DEFAULT_DATABASE_PATHS = new ArrayList<>();
private static final Duration DEFAULT_DATABASE_REFRESH_INTERVAL = Duration.ofDays(7);
private static final int DEFAULT_CACHE_SIZE = 4096;

@JsonProperty("database_paths")
private List<String> databasePaths = DEFAULT_DATABASE_PATHS;

@JsonProperty("database_refresh_interval")
@DurationMin(days = 1)
@DurationMax(days = 30)
private Duration databaseRefreshInterval = DEFAULT_DATABASE_REFRESH_INTERVAL;

@JsonProperty("cache_size")
@Min(1)
//TODO: Add a Max limit on cache size
private int cacheSize = DEFAULT_CACHE_SIZE;

//TODO: Add a destination path to store database files
@JsonProperty("aws")
@Valid
private AwsAuthenticationOptionsConfig awsAuthenticationOptionsConfig;

public MaxMindConfig() {
// This default constructor is used if maxmind is not configured
}

@AssertTrue(message = "aws should be configured if any path in database_paths is S3 bucket path.")
boolean isAwsAuthenticationOptionsRequired() {
for (final String databasePath : databasePaths) {
if (databasePath.startsWith(S3_PREFIX)) {
return awsAuthenticationOptionsConfig != null;
}
}
return true;
}

/**
* Gets the MaxMind database paths
*
* @return The MaxMind database paths
* @since 2.7
*/
public List<String> getDatabasePaths() {
return databasePaths;
}

/**
* Gets the database refresh duration. This loads the database from the paths into memory again in case if there are any updates.
*
* @return The refresh duration
* @since 2.7
*/
public Duration getDatabaseRefreshInterval() {
return databaseRefreshInterval;
}

/**
* Gets the cache size used in CHM cache for MaxMind DatabaseReader.
*
* @return The cache size
* @since 2.7
*/
public int getCacheSize() {
return cacheSize;
}

/**
* Gets the AWS authentication config used for reading from S3 bucket
*
* @return The AWS authentication options
* @since 2.7
*/
public AwsAuthenticationOptionsConfig getAwsAuthenticationOptionsConfig() {
return awsAuthenticationOptionsConfig;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.dataprepper.plugins.processor.configuration.AwsAuthenticationOptions;
import org.opensearch.dataprepper.plugins.processor.configuration.KeysConfig;
import org.opensearch.dataprepper.plugins.processor.configuration.ServiceTypeOptions;
import org.opensearch.dataprepper.test.helper.ReflectivelySetField;
Expand All @@ -30,14 +29,6 @@ void setUp() {
geoIPProcessorConfig = new GeoIPProcessorConfig();
}

@Test
void getAwsAuthenticationOptionsTestPositive() throws NoSuchFieldException, IllegalAccessException {
AwsAuthenticationOptions awsAuthenticationOptions = new AwsAuthenticationOptions();
ReflectivelySetField.setField(GeoIPProcessorConfig.class,
geoIPProcessorConfig, "awsAuthenticationOptions", awsAuthenticationOptions);
assertThat(geoIPProcessorConfig.getAwsAuthenticationOptions(), equalTo(awsAuthenticationOptions));
}

@Test
void getAwsAuthenticationOptionsTestNegative() {
assertThat(new GeoIPProcessorConfig().getAwsAuthenticationOptions(), equalTo(null));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.opensearch.dataprepper.plugins.processor.configuration.MaxMindServiceConfig;
import org.opensearch.dataprepper.plugins.processor.configuration.ServiceTypeOptions;
import org.opensearch.dataprepper.plugins.processor.databaseenrich.EnrichFailedException;
import org.opensearch.dataprepper.plugins.processor.extension.GeoIpConfigSupplier;
import org.opensearch.dataprepper.plugins.processor.loadtype.LoadTypeOptions;
import org.opensearch.dataprepper.test.helper.ReflectivelySetField;

Expand All @@ -40,7 +41,6 @@
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

@ExtendWith(MockitoExtension.class)
Expand All @@ -56,6 +56,8 @@ class GeoIPProcessorTest {
@Mock
private GeoIPProcessorConfig geoCodingProcessorConfig;
@Mock
private GeoIpConfigSupplier geoIpConfigSupplier;
@Mock
private PluginSetting pluginSetting;
@Mock
private ServiceTypeOptions serviceTypeOptions;
Expand Down Expand Up @@ -164,7 +166,7 @@ private Map<String, Object> prepareGeoData() {
}

private GeoIPProcessor createObjectUnderTest() throws MalformedURLException {
return new GeoIPProcessor(pluginSetting, geoCodingProcessorConfig);
return new GeoIPProcessor(pluginSetting, geoCodingProcessorConfig, geoIpConfigSupplier);
}

private List<String> setAttributes() {
Expand Down
Loading

0 comments on commit 29b0630

Please sign in to comment.