Skip to content

Commit

Permalink
Add Security Lake data source type.
Browse files Browse the repository at this point in the history
This changes adds Security Lake as a data source type. Security Lake as
a data source is simply specific options set on top of the base S3Glue
data source.

Signed-off-by: Adi Suresh <adsuresh@amazon.com>
  • Loading branch information
asuresh8 committed Aug 7, 2024
1 parent b32d890 commit 79b8f9a
Show file tree
Hide file tree
Showing 6 changed files with 196 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.opensearch.common.inject.Singleton;
import org.opensearch.sql.common.setting.Settings;
import org.opensearch.sql.datasource.DataSourceService;
import org.opensearch.sql.datasource.model.DataSource;
import org.opensearch.sql.datasource.model.DataSourceType;
import org.opensearch.sql.legacy.metrics.GaugeMetric;
import org.opensearch.sql.legacy.metrics.Metrics;
Expand Down Expand Up @@ -161,6 +162,8 @@ public SparkSubmitParametersBuilderProvider sparkSubmitParametersBuilderProvider
SparkParameterComposerCollection collection = new SparkParameterComposerCollection();
collection.register(
DataSourceType.S3GLUE, new S3GlueDataSourceSparkParameterComposer(clusterSettingLoader));
collection.register(
DataSourceType.SECURITY_LAKE, new S3GlueDataSourceSparkParameterComposer(clusterSettingLoader));
collection.register(new OpenSearchExtraParameterComposer(clusterSettingLoader));
return new SparkSubmitParametersBuilderProvider(collection);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,10 @@ protected AsyncQueryExecutorService createAsyncQueryExecutorService(
DataSourceType.S3GLUE,
new S3GlueDataSourceSparkParameterComposer(
getSparkExecutionEngineConfigClusterSettingLoader()));
sparkParameterComposerCollection.register(
DataSourceType.SECURITY_LAKE,
new S3GlueDataSourceSparkParameterComposer(
getSparkExecutionEngineConfigClusterSettingLoader()));
SparkSubmitParametersBuilderProvider sparkSubmitParametersBuilderProvider =
new SparkSubmitParametersBuilderProvider(sparkParameterComposerCollection);
QueryHandlerFactory queryHandlerFactory =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,13 @@ public class DataSourceType {
public static DataSourceType OPENSEARCH = new DataSourceType("OPENSEARCH");
public static DataSourceType SPARK = new DataSourceType("SPARK");
public static DataSourceType S3GLUE = new DataSourceType("S3GLUE");
public static DataSourceType SECURITY_LAKE = new DataSourceType("SECURITY_LAKE");

// Map from uppercase DataSourceType name to DataSourceType object
private static Map<String, DataSourceType> knownValues = new HashMap<>();

static {
register(PROMETHEUS, OPENSEARCH, SPARK, S3GLUE);
register(PROMETHEUS, OPENSEARCH, SPARK, S3GLUE, SECURITY_LAKE);
}

private final String name;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package org.opensearch.sql.datasources.glue;

import java.net.URISyntaxException;
import java.net.UnknownHostException;
import java.util.Map;
import java.util.Set;
import lombok.RequiredArgsConstructor;
import org.apache.commons.lang3.BooleanUtils;
import org.apache.commons.lang3.StringUtils;
import org.opensearch.sql.common.setting.Settings;
import org.opensearch.sql.datasource.model.DataSource;
import org.opensearch.sql.datasource.model.DataSourceMetadata;
import org.opensearch.sql.datasource.model.DataSourceType;
import org.opensearch.sql.datasources.auth.AuthenticationType;
import org.opensearch.sql.datasources.utils.DatasourceValidationUtils;
import org.opensearch.sql.storage.DataSourceFactory;

public class SecurityLakeDataSourceFactory extends GlueDataSourceFactory {

private final Settings pluginSettings;

public static final String TRUE = "true";

public SecurityLakeDataSourceFactory(final Settings pluginSettings) {
super(pluginSettings);
this.pluginSettings = pluginSettings;
}

@Override
public DataSourceType getDataSourceType() {
return DataSourceType.SECURITY_LAKE;
}

@Override
public DataSource createDataSource(DataSourceMetadata metadata) {
validateProperties(metadata.getProperties());
metadata.getProperties().put(GlueDataSourceFactory.GLUE_ICEBERG_ENABLED, TRUE);
metadata.getProperties().put(GlueDataSourceFactory.GLUE_LAKEFORMATION_ENABLED, TRUE);
return super.createDataSource(metadata);
}

private void validateProperties(Map<String, String> properties) {
// validate Lake Formation config
if (properties.get(GlueDataSourceFactory.GLUE_ICEBERG_ENABLED) != null &&
!BooleanUtils.toBoolean(properties.get(GlueDataSourceFactory.GLUE_ICEBERG_ENABLED))) {
throw new IllegalArgumentException(GlueDataSourceFactory.GLUE_ICEBERG_ENABLED +
" cannot be false when using Security Lake data source.");
}

if (properties.get(GlueDataSourceFactory.GLUE_LAKEFORMATION_ENABLED) != null &&
!BooleanUtils.toBoolean(properties.get(GlueDataSourceFactory.GLUE_LAKEFORMATION_ENABLED))) {
throw new IllegalArgumentException(GLUE_LAKEFORMATION_ENABLED +
" cannot be false when using Security Lake data source.");
}

if (StringUtils.isBlank(properties.get(GLUE_LAKEFORMATION_SESSION_TAG))) {
throw new IllegalArgumentException(GlueDataSourceFactory.GLUE_LAKEFORMATION_SESSION_TAG +
" must be specified when using Security Lake data source");
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package org.opensearch.sql.datasources.glue;

import static org.mockito.Mockito.when;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import lombok.SneakyThrows;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.sql.DataSourceSchemaName;
import org.opensearch.sql.common.setting.Settings;
import org.opensearch.sql.datasource.model.DataSource;
import org.opensearch.sql.datasource.model.DataSourceMetadata;
import org.opensearch.sql.datasource.model.DataSourceType;

@ExtendWith(MockitoExtension.class)
public class SecurityLakeSourceFactoryTest {

@Mock private Settings settings;

@Test
void testGetConnectorType() {
SecurityLakeDataSourceFactory securityLakeDataSourceFactory = new SecurityLakeDataSourceFactory(settings);
Assertions.assertEquals(DataSourceType.SECURITY_LAKE, securityLakeDataSourceFactory.getDataSourceType());
}

@Test
@SneakyThrows
void testCreateSecurityLakeDataSource() {
when(settings.getSettingValue(Settings.Key.DATASOURCES_URI_HOSTS_DENY_LIST))
.thenReturn(Collections.emptyList());
SecurityLakeDataSourceFactory securityLakeDataSourceFactory = new SecurityLakeDataSourceFactory(settings);

Map<String, String> properties = new HashMap<>();
properties.put("glue.auth.type", "iam_role");
properties.put("glue.auth.role_arn", "role_arn");
properties.put("glue.indexstore.opensearch.uri", "http://localhost:9200");
properties.put("glue.indexstore.opensearch.auth", "noauth");
properties.put("glue.indexstore.opensearch.region", "us-west-2");
properties.put("glue.lakeformation.session_tag", "session_tag");
DataSourceMetadata metadata =
new DataSourceMetadata.Builder()
.setName("my_sl")
.setConnector(DataSourceType.SECURITY_LAKE)
.setProperties(properties)
.build();
DataSource dataSource = securityLakeDataSourceFactory.createDataSource(metadata);
Assertions.assertEquals(DataSourceType.SECURITY_LAKE, dataSource.getConnectorType());

Assertions.assertEquals(properties.get(GlueDataSourceFactory.GLUE_ICEBERG_ENABLED), SecurityLakeDataSourceFactory.TRUE);
Assertions.assertEquals(properties.get(GlueDataSourceFactory.GLUE_LAKEFORMATION_ENABLED), SecurityLakeDataSourceFactory.TRUE);
}

@Test
@SneakyThrows
void testCreateSecurityLakeDataSourceIcebergCannotBeDisabled() {
SecurityLakeDataSourceFactory securityLakeDataSourceFactory = new SecurityLakeDataSourceFactory(settings);

Map<String, String> properties = new HashMap<>();
properties.put("glue.auth.type", "iam_role");
properties.put("glue.auth.role_arn", "role_arn");
properties.put("glue.indexstore.opensearch.uri", "http://localhost:9200");
properties.put("glue.indexstore.opensearch.auth", "noauth");
properties.put("glue.indexstore.opensearch.region", "us-west-2");
properties.put("glue.iceberg.enabled", "false");
DataSourceMetadata metadata =
new DataSourceMetadata.Builder()
.setName("my_sl")
.setConnector(DataSourceType.SECURITY_LAKE)
.setProperties(properties)
.build();

Assertions.assertThrows(IllegalArgumentException.class, () -> securityLakeDataSourceFactory.createDataSource(metadata));
}

@Test
@SneakyThrows
void testCreateSecurityLakeDataSourceLakeFormationCannotBeDisabled() {
SecurityLakeDataSourceFactory securityLakeDataSourceFactory = new SecurityLakeDataSourceFactory(settings);

Map<String, String> properties = new HashMap<>();
properties.put("glue.auth.type", "iam_role");
properties.put("glue.auth.role_arn", "role_arn");
properties.put("glue.indexstore.opensearch.uri", "http://localhost:9200");
properties.put("glue.indexstore.opensearch.auth", "noauth");
properties.put("glue.indexstore.opensearch.region", "us-west-2");
properties.put("glue.lakeformation.enabled", "false");
DataSourceMetadata metadata =
new DataSourceMetadata.Builder()
.setName("my_sl")
.setConnector(DataSourceType.SECURITY_LAKE)
.setProperties(properties)
.build();

Assertions.assertThrows(IllegalArgumentException.class, () -> securityLakeDataSourceFactory.createDataSource(metadata));
}

@Test
@SneakyThrows
void testCreateGlueDataSourceWithLakeFormationNoSessionTags() {
SecurityLakeDataSourceFactory securityLakeDataSourceFactory = new SecurityLakeDataSourceFactory(settings);

HashMap<String, String> properties = new HashMap<>();
properties.put("glue.auth.type", "iam_role");
properties.put("glue.auth.role_arn", "role_arn");
properties.put("glue.indexstore.opensearch.uri", "http://localhost:9200");
properties.put("glue.indexstore.opensearch.auth", "noauth");
properties.put("glue.indexstore.opensearch.region", "us-west-2");

DataSourceMetadata metadata =
new DataSourceMetadata.Builder()
.setName("my_sl")
.setConnector(DataSourceType.SECURITY_LAKE)
.setProperties(properties)
.build();

Assertions.assertThrows(IllegalArgumentException.class, () -> securityLakeDataSourceFactory.createDataSource(metadata));
}
}
2 changes: 2 additions & 0 deletions plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import org.opensearch.sql.datasources.auth.DataSourceUserAuthorizationHelperImpl;
import org.opensearch.sql.datasources.encryptor.EncryptorImpl;
import org.opensearch.sql.datasources.glue.GlueDataSourceFactory;
import org.opensearch.sql.datasources.glue.SecurityLakeDataSourceFactory;
import org.opensearch.sql.datasources.model.transport.*;
import org.opensearch.sql.datasources.rest.RestDataSourceQueryAction;
import org.opensearch.sql.datasources.service.DataSourceMetadataStorage;
Expand Down Expand Up @@ -326,6 +327,7 @@ private DataSourceServiceImpl createDataSourceService() {
.add(new PrometheusStorageFactory(pluginSettings))
.add(new SparkStorageFactory(this.client, pluginSettings))
.add(new GlueDataSourceFactory(pluginSettings))
.add(new SecurityLakeDataSourceFactory(pluginSettings))
.build(),
dataSourceMetadataStorage,
dataSourceUserAuthorizationHelper);
Expand Down

0 comments on commit 79b8f9a

Please sign in to comment.