Skip to content

Commit

Permalink
On demand feature group over S3 buckets (#185)
Browse files Browse the repository at this point in the history
  • Loading branch information
SirOibaf committed Jan 7, 2021
1 parent cf9fa72 commit ce659ad
Show file tree
Hide file tree
Showing 22 changed files with 415 additions and 246 deletions.
77 changes: 0 additions & 77 deletions java/src/main/java/com/logicalclocks/hsfs/FeatureGroup.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,8 @@

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.logicalclocks.hsfs.engine.FeatureGroupEngine;
import com.logicalclocks.hsfs.engine.StatisticsEngine;
import com.logicalclocks.hsfs.metadata.FeatureGroupBase;
import com.logicalclocks.hsfs.metadata.Statistics;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
Expand Down Expand Up @@ -58,25 +55,6 @@ public class FeatureGroup extends FeatureGroupBase {
@Setter
protected String location;

@Getter
@Setter
@JsonProperty("descStatsEnabled")
private Boolean statisticsEnabled;

@Getter
@Setter
@JsonProperty("featHistEnabled")
private Boolean histograms;

@Getter
@Setter
@JsonProperty("featCorrEnabled")
private Boolean correlations;

@Getter
@Setter
private List<String> statisticColumns;

@JsonIgnore
// These are only used in the client. In the server they are aggregated in the `features` field
private List<String> primaryKeys;
Expand All @@ -90,7 +68,6 @@ public class FeatureGroup extends FeatureGroupBase {
private String hudiPrecombineKey;

private FeatureGroupEngine featureGroupEngine = new FeatureGroupEngine();
private StatisticsEngine statisticsEngine = new StatisticsEngine(EntityEndpointType.FEATURE_GROUP);

private static final Logger LOGGER = LoggerFactory.getLogger(FeatureGroup.class);

Expand Down Expand Up @@ -318,58 +295,4 @@ public Map<String, Map<String, String>> commitDetails() throws IOException, Feat
public Map<String, Map<String, String>> commitDetails(Integer limit) throws IOException, FeatureStoreException {
return featureGroupEngine.commitDetails(this, limit);
}

/**
* Update the statistics configuration of the feature group.
* Change the `statisticsEnabled`, `histograms`, `correlations` or `statisticColumns` attributes and persist
* the changes by calling this method.
*
* @throws FeatureStoreException
* @throws IOException
*/
public void updateStatisticsConfig() throws FeatureStoreException, IOException {
featureGroupEngine.updateStatisticsConfig(this);
}

/**
* Recompute the statistics for the feature group and save them to the feature store.
*
* @return statistics object of computed statistics
* @throws FeatureStoreException
* @throws IOException
*/
public Statistics computeStatistics() throws FeatureStoreException, IOException {
if (statisticsEnabled) {
return statisticsEngine.computeStatistics(this, read());
} else {
LOGGER.info("StorageWarning: The statistics are not enabled of feature group `" + name + "`, with version `"
+ version + "`. No statistics computed.");
}
return null;
}

/**
* Get the last statistics commit for the feature group.
*
* @return statistics object of latest commit
* @throws FeatureStoreException
* @throws IOException
*/
@JsonIgnore
public Statistics getStatistics() throws FeatureStoreException, IOException {
return statisticsEngine.getLast(this);
}

/**
* Get the statistics of a specific commit time for the feature group.
*
* @param commitTime commit time in the format "YYYYMMDDhhmmss"
* @return statistics object for the commit time
* @throws FeatureStoreException
* @throws IOException
*/
@JsonIgnore
public Statistics getStatistics(String commitTime) throws FeatureStoreException, IOException {
return statisticsEngine.get(this, commitTime);
}
}
26 changes: 26 additions & 0 deletions java/src/main/java/com/logicalclocks/hsfs/OnDemandDataFormat.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright (c) 2020 Logical Clocks AB
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*
* See the License for the specific language governing permissions and limitations under the License.
*/

package com.logicalclocks.hsfs;

public enum OnDemandDataFormat {
ORC,
PARQUET,
AVRO,
CSV,
HUDI,
DELTA
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.logicalclocks.hsfs.engine.OnDemandFeatureGroupEngine;
import com.logicalclocks.hsfs.metadata.FeatureGroupBase;
import com.logicalclocks.hsfs.metadata.OnDemandOptions;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
Expand All @@ -29,6 +30,8 @@

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

@AllArgsConstructor
@JsonIgnoreProperties(ignoreUnknown = true)
Expand All @@ -42,31 +45,62 @@ public class OnDemandFeatureGroup extends FeatureGroupBase {
@Setter
private String query;

@Getter
@Setter
private OnDemandDataFormat dataFormat;

@Getter
@Setter
private String path;

@Getter
@Setter
private List<OnDemandOptions> options;

@Getter
@Setter
private String type = "onDemandFeaturegroupDTO";


private OnDemandFeatureGroupEngine onDemandFeatureGroupEngine = new OnDemandFeatureGroupEngine();

@Builder
public OnDemandFeatureGroup(FeatureStore featureStore, @NonNull String name, Integer version, String query,
@NonNull StorageConnector storageConnector, String description, List<Feature> features) {
OnDemandDataFormat dataFormat, String path, Map<String, String> options,
@NonNull StorageConnector storageConnector, String description, List<Feature> features,
Boolean statisticsEnabled, Boolean histograms, Boolean correlations,
List<String> statisticColumns) {
this.featureStore = featureStore;
this.name = name;
this.version = version;
this.query = query;
this.dataFormat = dataFormat;
this.path = path;
this.options = options != null ? options.entrySet().stream()
.map(e -> new OnDemandOptions(e.getKey(), e.getValue()))
.collect(Collectors.toList())
: null;
this.description = description;
this.storageConnector = storageConnector;
this.features = features;
this.statisticsEnabled = statisticsEnabled != null ? statisticsEnabled : true;
this.histograms = histograms;
this.correlations = correlations;
this.statisticColumns = statisticColumns;
}

public OnDemandFeatureGroup() {
}

public void save() throws FeatureStoreException, IOException {
onDemandFeatureGroupEngine.saveFeatureGroup(this);

if (statisticsEnabled) {
statisticsEngine.computeStatistics(this, read());
}
}

@Override
public Dataset<Row> read() throws FeatureStoreException, IOException {
return selectAll().read();
}
Expand Down
15 changes: 15 additions & 0 deletions java/src/main/java/com/logicalclocks/hsfs/StorageConnector.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.logicalclocks.hsfs;

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.logicalclocks.hsfs.util.Constants;
import lombok.AllArgsConstructor;
import lombok.Getter;
Expand Down Expand Up @@ -123,6 +124,7 @@ public class StorageConnector {
@Setter
private StorageConnectorType storageConnectorType;

@JsonIgnore
public Map<String, String> getSparkOptions() throws FeatureStoreException {
if (StorageConnectorType.JDBC.equals(storageConnectorType)) {
return getJdbcOptions();
Expand All @@ -132,6 +134,7 @@ public Map<String, String> getSparkOptions() throws FeatureStoreException {
throw new FeatureStoreException("Spark options are not supported for connector " + storageConnectorType);
}

@JsonIgnore
private Map<String, String> getJdbcOptions() throws FeatureStoreException {
Map<String, String> options = Arrays.stream(arguments.split(","))
.map(arg -> arg.split("="))
Expand All @@ -140,6 +143,7 @@ private Map<String, String> getJdbcOptions() throws FeatureStoreException {
return options;
}

@JsonIgnore
private Map<String, String> getRedshiftOptions() {
String constr =
"jdbc:redshift://" + clusterIdentifier + "." + databaseEndpoint + ":" + databasePort + "/" + databaseName;
Expand All @@ -156,4 +160,15 @@ private Map<String, String> getRedshiftOptions() {
}
return options;
}

@JsonIgnore
public String getPath(String subPath) throws FeatureStoreException {
switch (storageConnectorType) {
case S3:
return "s3://" + bucket + "/" + (Strings.isNullOrEmpty(subPath) ? "" : subPath);
default:
throw new FeatureStoreException(
"Path method not supported for storage connector type: " + storageConnectorType);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ public class FsQuery {
private List<HudiFeatureGroupAlias> hudiCachedFeatureGroups;

public void removeNewLines() {
query = query.replace("\n", " ");
queryOnline = queryOnline.replace("\n", " ");
query = query != null ? query.replace("\n", " ") : null;
queryOnline = queryOnline != null ? queryOnline.replace("\n", " ") : null;
}

public String getStorageQuery(Storage storage) throws FeatureStoreException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ public Dataset<Row> read(boolean online, Map<String, String> readOptions) throws
LOGGER.info("Executing query: " + fsQuery.getStorageQuery(Storage.ONLINE));
StorageConnector onlineConnector =
storageConnectorApi.getOnlineStorageConnector(leftFeatureGroup.getFeatureStore());
return SparkEngine.getInstance().jdbc(onlineConnector, fsQuery.getStorageQuery(Storage.ONLINE));
return SparkEngine.getInstance().jdbc(fsQuery.getStorageQuery(Storage.ONLINE), onlineConnector);
} else {
registerOnDemandFeatureGroups(fsQuery.getOnDemandFeatureGroups());
registerHudiFeatureGroups(fsQuery.getHudiCachedFeatureGroups(), readOptions);
Expand Down Expand Up @@ -210,8 +210,7 @@ private void registerOnDemandFeatureGroups(List<OnDemandFeatureGroupAlias> onDem
String alias = onDemandFeatureGroupAlias.getAlias();
OnDemandFeatureGroup onDemandFeatureGroup = onDemandFeatureGroupAlias.getOnDemandFeatureGroup();

SparkEngine.getInstance().registerOnDemandTemporaryTable(onDemandFeatureGroup.getQuery(),
onDemandFeatureGroup.getStorageConnector(), alias);
SparkEngine.getInstance().registerOnDemandTemporaryTable(onDemandFeatureGroup, alias);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,4 +66,10 @@ public void appendFeatures(FeatureGroupBase featureGroup, List<Feature> features
FeatureGroup apiFG = featureGroupApi.updateMetadata(fgBaseSend, "updateMetadata");
featureGroup.setFeatures(apiFG.getFeatures());
}

public void updateStatisticsConfig(FeatureGroupBase featureGroup) throws FeatureStoreException, IOException {
FeatureGroup apiFG = featureGroupApi.updateMetadata(featureGroup, "updateStatsSettings");
featureGroup.setCorrelations(apiFG.getCorrelations());
featureGroup.setHistograms(apiFG.getHistograms());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -176,12 +176,6 @@ private void saveOnlineDataframe(FeatureGroup featureGroup, Dataset<Row> dataset
SparkEngine.getInstance().writeOnlineDataframe(dataset, saveMode, writeOptions);
}

public void updateStatisticsConfig(FeatureGroup featureGroup) throws FeatureStoreException, IOException {
FeatureGroup apiFG = featureGroupApi.updateMetadata(featureGroup, "updateStatsSettings");
featureGroup.setCorrelations(apiFG.getCorrelations());
featureGroup.setHistograms(apiFG.getHistograms());
}

public Map<String, Map<String, String>> commitDetails(FeatureGroup featureGroup, Integer limit)
throws IOException, FeatureStoreException {
List<FeatureGroupCommit> featureGroupCommits = featureGroupApi.commitDetails(featureGroup, limit);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public void saveFeatureGroup(OnDemandFeatureGroup onDemandFeatureGroup)
throws FeatureStoreException, IOException {
if (onDemandFeatureGroup.getFeatures() == null) {
Dataset<Row> onDemandDataset = SparkEngine.getInstance()
.jdbc(onDemandFeatureGroup.getStorageConnector(), onDemandFeatureGroup.getQuery());
.registerOnDemandTemporaryTable(onDemandFeatureGroup, "read_ondmd");
onDemandFeatureGroup.setFeatures(utils.parseFeatureGroupSchema(onDemandDataset));
}

Expand Down
Loading

0 comments on commit ce659ad

Please sign in to comment.