Skip to content

Commit

Permalink
[HOPSWORKS-2197] Add ADLS connector support (#221)
Browse files Browse the repository at this point in the history
  • Loading branch information
SirOibaf committed Jan 22, 2021
1 parent 023f97e commit 12bf1ee
Show file tree
Hide file tree
Showing 10 changed files with 292 additions and 98 deletions.
44 changes: 38 additions & 6 deletions java/src/main/java/com/logicalclocks/hsfs/StorageConnector.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.logicalclocks.hsfs;

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.logicalclocks.hsfs.metadata.Option;
import com.logicalclocks.hsfs.util.Constants;
import lombok.AllArgsConstructor;
import lombok.Getter;
Expand All @@ -28,6 +29,7 @@
import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -120,18 +122,48 @@ public class StorageConnector {
@Setter
private String arguments;

@Getter
@Setter
private Integer generation;

@Getter
@Setter
private String directoryId;

@Getter
@Setter
private String applicationId;

@Getter
@Setter
private String serviceCredentials;

@Getter
@Setter
private String accountName;

@Getter
@Setter
private String containerName;

@Getter
@Setter
private List<Option> sparkOptions;

@Getter
@Setter
private StorageConnectorType storageConnectorType;

@JsonIgnore
public Map<String, String> getSparkOptions() throws FeatureStoreException {
if (StorageConnectorType.JDBC.equals(storageConnectorType)) {
return getJdbcOptions();
} else if (StorageConnectorType.REDSHIFT.equals(storageConnectorType)) {
return getRedshiftOptions();
public Map<String, String> getSparkOptionsInt() throws FeatureStoreException {
switch (storageConnectorType) {
case JDBC:
return getJdbcOptions();
case REDSHIFT:
return getRedshiftOptions();
default:
throw new FeatureStoreException("Spark options are not supported for connector " + storageConnectorType);
}
throw new FeatureStoreException("Spark options are not supported for connector " + storageConnectorType);
}

@JsonIgnore
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,6 @@ public enum StorageConnectorType {
HOPSFS,
S3,
JDBC,
REDSHIFT
REDSHIFT,
ADLS
}
11 changes: 3 additions & 8 deletions java/src/main/java/com/logicalclocks/hsfs/TrainingDataset.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.logicalclocks.hsfs.engine.StatisticsEngine;
import com.logicalclocks.hsfs.engine.TrainingDatasetEngine;
import com.logicalclocks.hsfs.constructor.Query;
import com.logicalclocks.hsfs.engine.Utils;
import com.logicalclocks.hsfs.metadata.Statistics;
import lombok.Builder;
import lombok.Getter;
Expand Down Expand Up @@ -73,7 +74,6 @@ public class TrainingDataset {

@Getter
@Setter
@JsonIgnore
private StorageConnector storageConnector;

@Getter
Expand Down Expand Up @@ -103,6 +103,7 @@ public class TrainingDataset {

private TrainingDatasetEngine trainingDatasetEngine = new TrainingDatasetEngine();
private StatisticsEngine statisticsEngine = new StatisticsEngine(EntityEndpointType.TRAINING_DATASET);
private Utils utils = new Utils();

@Builder
public TrainingDataset(@NonNull String name, Integer version, String description, DataFormat dataFormat,
Expand All @@ -115,13 +116,7 @@ public TrainingDataset(@NonNull String name, Integer version, String description
this.location = location;
this.storageConnector = storageConnector;

if (storageConnector != null) {
if (storageConnector.getStorageConnectorType() == StorageConnectorType.S3) {
// Default it's already HOPSFS_TRAINING_DATASET
this.trainingDatasetType = TrainingDatasetType.EXTERNAL_TRAINING_DATASET;
}
}

this.trainingDatasetType = utils.getTrainingDatasetType(storageConnector);
this.splits = splits;
this.seed = seed;
this.featureStore = featureStore;
Expand Down
110 changes: 64 additions & 46 deletions java/src/main/java/com/logicalclocks/hsfs/engine/SparkEngine.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@
import com.logicalclocks.hsfs.OnDemandFeatureGroup;
import com.logicalclocks.hsfs.Split;
import com.logicalclocks.hsfs.StorageConnector;
import com.logicalclocks.hsfs.StorageConnectorType;
import com.logicalclocks.hsfs.TimeTravelFormat;
import com.logicalclocks.hsfs.TrainingDataset;
import com.logicalclocks.hsfs.metadata.OnDemandOptions;
import com.logicalclocks.hsfs.metadata.Option;
import com.logicalclocks.hsfs.util.Constants;
import lombok.Getter;
import org.apache.hadoop.fs.Path;
Expand Down Expand Up @@ -80,7 +80,7 @@ public Dataset<Row> sql(String query) {
}

public Dataset<Row> jdbc(String query, StorageConnector storageConnector) throws FeatureStoreException {
Map<String, String> readOptions = storageConnector.getSparkOptions();
Map<String, String> readOptions = storageConnector.getSparkOptionsInt();
if (!Strings.isNullOrEmpty(query)) {
readOptions.put("query", query);
}
Expand Down Expand Up @@ -124,42 +124,6 @@ public void registerHudiTemporaryTable(FeatureGroup featureGroup, String alias,
leftFeaturegroupStartTimestamp, leftFeaturegroupEndTimestamp, readOptions);
}

public void configureConnector(StorageConnector storageConnector) {
if (storageConnector.getStorageConnectorType() == StorageConnectorType.S3) {
configureS3Connector(storageConnector);
}
}

public static String sparkPath(String path) {
if (path.startsWith(Constants.S3_SCHEME)) {
return path.replaceFirst(Constants.S3_SCHEME, Constants.S3_SPARK_SCHEME);
}
return path;
}

private void configureS3Connector(StorageConnector storageConnector) {
if (!Strings.isNullOrEmpty(storageConnector.getAccessKey())
&& Strings.isNullOrEmpty(storageConnector.getSessionToken())) {
sparkSession.conf().set(Constants.S3_ACCESS_KEY_ENV, storageConnector.getAccessKey());
sparkSession.conf().set(Constants.S3_SECRET_KEY_ENV, storageConnector.getSecretKey());
}
if (!Strings.isNullOrEmpty(storageConnector.getSessionToken())) {
sparkSession.conf().set(Constants.S3_CREDENTIAL_PROVIDER_ENV, Constants.S3_TEMPORARY_CREDENTIAL_PROVIDER);
sparkSession.conf().set(Constants.S3_ACCESS_KEY_ENV, storageConnector.getAccessKey());
sparkSession.conf().set(Constants.S3_SECRET_KEY_ENV, storageConnector.getSecretKey());
sparkSession.conf().set(Constants.S3_SESSION_KEY_ENV, storageConnector.getSessionToken());
}
if (!Strings.isNullOrEmpty(storageConnector.getServerEncryptionAlgorithm())) {
sparkSession.conf().set(
"fs.s3a.server-side-encryption-algorithm",
storageConnector.getServerEncryptionAlgorithm()
);
}
if (!Strings.isNullOrEmpty(storageConnector.getServerEncryptionKey())) {
sparkSession.conf().set("fs.s3a.server-side-encryption.key", storageConnector.getServerEncryptionKey());
}
}

/**
* Setup Spark to write the data on the File System.
*
Expand All @@ -171,9 +135,8 @@ private void configureS3Connector(StorageConnector storageConnector) {
public void write(TrainingDataset trainingDataset, Dataset<Row> dataset,
Map<String, String> writeOptions, SaveMode saveMode) {

if (trainingDataset.getStorageConnector() != null) {
SparkEngine.getInstance().configureConnector(trainingDataset.getStorageConnector());
}
setupConnectorHadoopConf(trainingDataset.getStorageConnector());

if (trainingDataset.getSplits() == null) {
// Write a single dataset

Expand Down Expand Up @@ -296,10 +259,7 @@ private void writeSingle(Dataset<Row> dataset, DataFormat dataFormat,
// OnDemand Feature Group in TFRecords format. However Spark does not use an enum but a string.
public Dataset<Row> read(StorageConnector storageConnector, String dataFormat,
Map<String, String> readOptions, String path) {

if (storageConnector.getStorageConnectorType() == StorageConnectorType.S3) {
configureS3Connector(storageConnector);
}
setupConnectorHadoopConf(storageConnector);

return SparkEngine.getInstance().getSparkSession()
.read()
Expand All @@ -322,7 +282,7 @@ public Dataset<Row> read(StorageConnector storageConnector, String dataFormat,
public Map<String, String> getOnlineOptions(Map<String, String> providedWriteOptions,
FeatureGroup featureGroup,
StorageConnector storageConnector) throws FeatureStoreException {
Map<String, String> writeOptions = storageConnector.getSparkOptions();
Map<String, String> writeOptions = storageConnector.getSparkOptionsInt();
writeOptions.put(Constants.JDBC_TABLE, utils.getFgName(featureGroup));

// add user provided configuration
Expand Down Expand Up @@ -400,4 +360,62 @@ public String profile(Dataset<Row> df, boolean correlation, boolean histogram) {
public String profile(Dataset<Row> df) {
return profile(df, null, true, true);
}

public void setupConnectorHadoopConf(StorageConnector storageConnector) {
if (storageConnector == null) {
return;
}

switch (storageConnector.getStorageConnectorType()) {
case S3:
setupS3ConnectorHadoopConf(storageConnector);
break;
case ADLS:
setupAdlsConnectorHadoopConf(storageConnector);
break;
default:
// No-OP
break;
}
}

public static String sparkPath(String path) {
if (path.startsWith(Constants.S3_SCHEME)) {
return path.replaceFirst(Constants.S3_SCHEME, Constants.S3_SPARK_SCHEME);
}
return path;
}

private void setupS3ConnectorHadoopConf(StorageConnector storageConnector) {
if (!Strings.isNullOrEmpty(storageConnector.getAccessKey())) {
sparkSession.sparkContext().hadoopConfiguration()
.set(Constants.S3_ACCESS_KEY_ENV, storageConnector.getAccessKey());
}
if (!Strings.isNullOrEmpty(storageConnector.getSecretKey())) {
sparkSession.sparkContext().hadoopConfiguration()
.set(Constants.S3_SECRET_KEY_ENV, storageConnector.getSecretKey());
}
if (!Strings.isNullOrEmpty(storageConnector.getServerEncryptionAlgorithm())) {
sparkSession.sparkContext().hadoopConfiguration().set(
"fs.s3a.server-side-encryption-algorithm",
storageConnector.getServerEncryptionAlgorithm()
);
}
if (!Strings.isNullOrEmpty(storageConnector.getServerEncryptionKey())) {
sparkSession.sparkContext().hadoopConfiguration()
.set("fs.s3a.server-side-encryption.key", storageConnector.getServerEncryptionKey());
}
if (!Strings.isNullOrEmpty(storageConnector.getSessionToken())) {
sparkSession.sparkContext().hadoopConfiguration()
.set(Constants.S3_CREDENTIAL_PROVIDER_ENV, Constants.S3_TEMPORARY_CREDENTIAL_PROVIDER);
sparkSession.sparkContext().hadoopConfiguration()
.set(Constants.S3_SESSION_KEY_ENV, storageConnector.getSessionToken());
}
}

private void setupAdlsConnectorHadoopConf(StorageConnector storageConnector) {
for (Option confOption : storageConnector.getSparkOptions()) {
sparkSession.sparkContext().hadoopConfiguration().set(confOption.getName(), confOption.getValue());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -114,10 +114,6 @@ public void insert(TrainingDataset trainingDataset, Dataset<Row> dataset,
}

public Dataset<Row> read(TrainingDataset trainingDataset, String split, Map<String, String> providedOptions) {
if (trainingDataset.getStorageConnector() != null) {
SparkEngine.getInstance().configureConnector(trainingDataset.getStorageConnector());
}

String path = "";
if (com.google.common.base.Strings.isNullOrEmpty(split)) {
// ** glob means "all sub directories"
Expand Down
12 changes: 12 additions & 0 deletions java/src/main/java/com/logicalclocks/hsfs/engine/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@
import com.logicalclocks.hsfs.Feature;
import com.logicalclocks.hsfs.FeatureStoreException;
import com.logicalclocks.hsfs.FeatureGroup;
import com.logicalclocks.hsfs.StorageConnectorType;
import com.logicalclocks.hsfs.TrainingDatasetFeature;
import com.logicalclocks.hsfs.StorageConnector;
import com.logicalclocks.hsfs.TrainingDatasetType;
import com.logicalclocks.hsfs.metadata.StorageConnectorApi;
import org.apache.commons.io.FileUtils;
import org.apache.spark.sql.Dataset;
Expand Down Expand Up @@ -84,6 +86,16 @@ public void trainingDatasetSchemaMatch(Dataset<Row> dataset, List<TrainingDatase
}
}

public TrainingDatasetType getTrainingDatasetType(StorageConnector storageConnector) {
if (storageConnector == null) {
return TrainingDatasetType.HOPSFS_TRAINING_DATASET;
} else if (storageConnector.getStorageConnectorType() == StorageConnectorType.HOPSFS) {
return TrainingDatasetType.HOPSFS_TRAINING_DATASET;
} else {
return TrainingDatasetType.EXTERNAL_TRAINING_DATASET;
}
}

// TODO(Fabio): this should be moved in the backend
public String getTableName(FeatureGroup offlineFeatureGroup) {
return offlineFeatureGroup.getFeatureStore().getName() + "."
Expand Down
34 changes: 34 additions & 0 deletions java/src/main/java/com/logicalclocks/hsfs/metadata/Option.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright (c) 2021 Logical Clocks AB
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*
* See the License for the specific language governing permissions and limitations under the License.
*/

package com.logicalclocks.hsfs.metadata;

import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;

@AllArgsConstructor
@NoArgsConstructor
public class Option {
@Getter
@Setter
private String name;

@Getter
@Setter
private String value;
}
Loading

0 comments on commit 12bf1ee

Please sign in to comment.