Skip to content

Commit

Permalink
[HOPSWORKS-2177] Refactor storage connectors (#193)
Browse files Browse the repository at this point in the history
  • Loading branch information
SirOibaf committed Dec 15, 2020
1 parent 2fc5687 commit 42ace4d
Show file tree
Hide file tree
Showing 9 changed files with 165 additions and 67 deletions.
5 changes: 2 additions & 3 deletions java/src/main/java/com/logicalclocks/hsfs/FeatureStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -121,9 +121,8 @@ public Dataset<Row> sql(String query) {
return SparkEngine.getInstance().sql(query);
}

public StorageConnector getStorageConnector(String name, StorageConnectorType type)
throws FeatureStoreException, IOException {
return storageConnectorApi.getByNameAndType(this, name, type);
public StorageConnector getStorageConnector(String name) throws FeatureStoreException, IOException {
return storageConnectorApi.getByName(this, name);
}

public StorageConnector getOnlineStorageConnector() throws FeatureStoreException, IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,6 @@ public class TrainingDataset {
@JsonIgnore
private FeatureStore featureStore;

@Getter
@Setter
private Integer storageConnectorId;

@Getter
@Setter
@JsonIgnore
Expand Down Expand Up @@ -137,7 +133,6 @@ public TrainingDataset(@NonNull String name, Integer version, String description
this.storageConnector = storageConnector;

if (storageConnector != null) {
this.storageConnectorId = storageConnector.getId();
if (storageConnector.getStorageConnectorType() == StorageConnectorType.S3) {
// Default it's already HOPSFS_TRAINING_DATASET
this.trainingDatasetType = TrainingDatasetType.EXTERNAL_TRAINING_DATASET;
Expand Down
6 changes: 2 additions & 4 deletions java/src/main/java/com/logicalclocks/hsfs/engine/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@
import com.logicalclocks.hsfs.FeatureGroup;
import com.logicalclocks.hsfs.TrainingDatasetFeature;
import com.logicalclocks.hsfs.StorageConnector;
import com.logicalclocks.hsfs.StorageConnectorType;
import com.logicalclocks.hsfs.TrainingDatasetFeature;
import com.logicalclocks.hsfs.metadata.StorageConnectorApi;
import org.apache.commons.io.FileUtils;
import org.apache.spark.sql.Dataset;
Expand Down Expand Up @@ -115,8 +113,8 @@ public String getFgName(FeatureGroup featureGroup) {
}

public String getHiveMetastoreConnector(FeatureGroup featureGroup) throws IOException, FeatureStoreException {
StorageConnector storageConnector = storageConnectorApi.getByNameAndType(featureGroup.getFeatureStore(),
featureGroup.getFeatureStore().getName(), StorageConnectorType.JDBC);
StorageConnector storageConnector = storageConnectorApi.getByName(featureGroup.getFeatureStore(),
featureGroup.getFeatureStore().getName());
String connStr = storageConnector.getConnectionString();
String pw = FileUtils.readFileToString(new File("material_passwd"));
return connStr + "sslTrustStore=t_certificate;trustStorePassword=" + pw
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import com.logicalclocks.hsfs.FeatureStore;
import com.logicalclocks.hsfs.FeatureStoreException;
import com.logicalclocks.hsfs.StorageConnector;
import com.logicalclocks.hsfs.StorageConnectorType;
import org.apache.http.client.methods.HttpGet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -36,7 +35,7 @@ public class StorageConnectorApi {

private static final Logger LOGGER = LoggerFactory.getLogger(StorageConnectorApi.class);

public StorageConnector getByNameAndType(FeatureStore featureStore, String name, StorageConnectorType type)
public StorageConnector getByName(FeatureStore featureStore, String name)
throws IOException, FeatureStoreException {
HopsworksClient hopsworksClient = HopsworksClient.getInstance();
String pathTemplate = HopsworksClient.PROJECT_PATH
Expand All @@ -46,7 +45,6 @@ public StorageConnector getByNameAndType(FeatureStore featureStore, String name,
String uri = UriTemplate.fromTemplate(pathTemplate)
.set("projectId", featureStore.getProjectId())
.set("fsId", featureStore.getId())
.set("connType", type)
.set("name", name)
.set("temporaryCredentials", true)
.expand();
Expand Down
5 changes: 1 addition & 4 deletions python/hsfs/core/storage_connector_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,11 @@ class StorageConnectorApi:
def __init__(self, feature_store_id):
self._feature_store_id = feature_store_id

def get(self, name, connector_type):
def get(self, name):
"""Get storage connector with name and type.
:param name: name of the storage connector
:type name: str
:param connector_type: connector type
:type connector_type: str
:return: the storage connector
:rtype: StorageConnector
"""
Expand All @@ -40,7 +38,6 @@ def get(self, name, connector_type):
"featurestores",
self._feature_store_id,
"storageconnectors",
connector_type,
name,
]
query_params = {"temporaryCredentials": True}
Expand Down
17 changes: 4 additions & 13 deletions python/hsfs/engine/spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,13 +185,7 @@ def save_dataframe(
)

def _save_offline_dataframe(
self,
table_name,
feature_group,
dataframe,
save_mode,
operation,
write_options,
self, table_name, feature_group, dataframe, save_mode, operation, write_options,
):
if feature_group.time_travel_format == "HUDI":
hudi_engine_instance = hudi_engine.HudiEngine(
Expand Down Expand Up @@ -246,10 +240,8 @@ def read(self, storage_connector, data_format, read_options, path):

def profile(self, dataframe, relevant_columns, correlations, histograms):
"""Profile a dataframe with Deequ."""
return (
self._jvm.com.logicalclocks.hsfs.engine.SparkEngine.getInstance().profile(
dataframe._jdf, relevant_columns, correlations, histograms
)
return self._jvm.com.logicalclocks.hsfs.engine.SparkEngine.getInstance().profile(
dataframe._jdf, relevant_columns, correlations, histograms
)

def write_options(self, data_format, provided_options):
Expand Down Expand Up @@ -362,8 +354,7 @@ def _setup_s3(self, storage_connector, path):
"org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider",
)
self._spark_context._jsc.hadoopConfiguration().set(
"fs.s3a.session.token",
storage_connector.session_token,
"fs.s3a.session.token", storage_connector.session_token,
)
return path.replace("s3", "s3a", 1)

Expand Down
8 changes: 3 additions & 5 deletions python/hsfs/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ def get_training_dataset(self, name: str, version: int = None):
version = self.DEFAULT_VERSION
return self._training_dataset_api.get(name, version)

def get_storage_connector(self, name: str, connector_type: str):
def get_storage_connector(self, name: str):
"""Get a previously created storage connector from the feature store.
Storage connectors encapsulate all information needed for the execution engine
Expand All @@ -194,20 +194,18 @@ def get_storage_connector(self, name: str, connector_type: str):
!!! example "Getting a Storage Connector"
```python
sc = fs.get_storage_connector("demo_fs_meb10000_Training_Datasets", "HOPSFS")
sc = fs.get_storage_connector("demo_fs_meb10000_Training_Datasets")
td = fs.create_training_dataset(..., storage_connector=sc, ...)
```
# Arguments
name: Name of the storage connector to retrieve.
connector_type: Type of the storage connector, e.g. `"JDBC"`, `"HOPSFS"`
or `"S3"`.
# Returns
`StorageConnector`. Storage connector object.
"""
return self._storage_connector_api.get(name, connector_type)
return self._storage_connector_api.get(name)

def sql(self, query, dataframe_type="default", online=False):
return self._feature_group_engine.sql(query, self._name, dataframe_type, online)
Expand Down
Loading

0 comments on commit 42ace4d

Please sign in to comment.