Skip to content

Commit

Permalink
[HOPSWORKS-2009] Append Features to Feature Group schema (#117)
Browse files Browse the repository at this point in the history
  • Loading branch information
moritzmeister authored Oct 15, 2020
1 parent df68ffc commit 21088c9
Show file tree
Hide file tree
Showing 6 changed files with 81 additions and 12 deletions.
2 changes: 1 addition & 1 deletion java/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
<slf4j.version>1.7.16</slf4j.version>
<log4j.version>1.2.17</log4j.version>
<handy.version>2.1.8</handy.version>
<lombok.version>1.18.6</lombok.version>
<lombok.version>1.18.10</lombok.version>
<fasterxml.jackson.databind.version>2.6.7.1</fasterxml.jackson.databind.version>
<spark.version>2.4.3.2</spark.version>
<deequ.version>1.1.0-SNAPSHOT</deequ.version>
Expand Down
17 changes: 14 additions & 3 deletions java/src/main/java/com/logicalclocks/hsfs/Feature.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.NonNull;
import lombok.Setter;
import org.apache.parquet.Strings;

Expand All @@ -44,17 +45,26 @@ public class Feature {
@Getter @Setter
private Boolean partition;

public Feature(String name) {
@Getter @Setter
private String defaultValue;

public Feature(@NonNull String name) {
this.name = name;
}

public Feature(@NonNull String name, @NonNull String type) {
this.name = name;
this.type = type;
}

public Feature(String name, String type) {
public Feature(@NonNull String name, @NonNull String type, @NonNull String defaultValue) {
this.name = name;
this.type = type;
this.defaultValue = defaultValue;
}

@Builder
public Feature(String name, String type, String onlineType, Boolean primary, Boolean partition)
public Feature(String name, String type, String onlineType, Boolean primary, Boolean partition, String defaultValue)
throws FeatureStoreException {
if (Strings.isNullOrEmpty(name)) {
throw new FeatureStoreException("Name is required when creating a feature");
Expand All @@ -68,5 +78,6 @@ public Feature(String name, String type, String onlineType, Boolean primary, Boo
this.onlineType = onlineType;
this.primary = primary;
this.partition = partition;
this.defaultValue = defaultValue;
}
}
49 changes: 46 additions & 3 deletions java/src/main/java/com/logicalclocks/hsfs/FeatureGroup.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,22 +23,28 @@
import com.logicalclocks.hsfs.engine.StatisticsEngine;
import com.logicalclocks.hsfs.metadata.Query;
import com.logicalclocks.hsfs.metadata.Statistics;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
import lombok.NonNull;
import lombok.Setter;
import lombok.With;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

@AllArgsConstructor
@JsonIgnoreProperties(ignoreUnknown = true)
public class FeatureGroup {
@Getter @Setter
Expand All @@ -50,16 +56,16 @@ public class FeatureGroup {
@Getter @Setter
private Integer version;

@Getter @Setter
@Getter @Setter @With
private String description;

@Getter @Setter
private FeatureStore featureStore;

@Getter @Setter
@Getter @Setter @With
private List<Feature> features;

@Getter
@Getter @Setter
private Date created;

@Getter
Expand Down Expand Up @@ -211,6 +217,43 @@ public void updateStatisticsConfig() throws FeatureStoreException, IOException {
featureGroupEngine.updateStatisticsConfig(this);
}

/**
* Update the description of the feature group.
*
* @param description
* @throws FeatureStoreException
* @throws IOException
*/
public void updateDescription(String description) throws FeatureStoreException, IOException {
featureGroupEngine.updateDescription(this, description);
}

/**
* Append features to the schema of the feature group.
* It is only possible to append features to a feature group. Removing features is considered a breaking change.
*
* @param features
* @throws FeatureStoreException
* @throws IOException
*/
public void appendFeatures(List<Feature> features) throws FeatureStoreException, IOException {
featureGroupEngine.appendFeatures(this, new ArrayList<>(features));
}

/**
* Append a single feature to the schema of the feature group.
* It is only possible to append features to a feature group. Removing features is considered a breaking change.
*
* @param features
* @throws FeatureStoreException
* @throws IOException
*/
public void appendFeatures(Feature features) throws FeatureStoreException, IOException {
List<Feature> featureList = new ArrayList<>();
featureList.add(features);
featureGroupEngine.appendFeatures(this, featureList);
}

/**
* Recompute the statistics for the feature group and save them to the feature store.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.logicalclocks.hsfs.engine;

import com.logicalclocks.hsfs.EntityEndpointType;
import com.logicalclocks.hsfs.Feature;
import com.logicalclocks.hsfs.FeatureGroup;
import com.logicalclocks.hsfs.FeatureStoreException;
import com.logicalclocks.hsfs.Storage;
Expand Down Expand Up @@ -178,8 +179,21 @@ public void deleteTag(FeatureGroup featureGroup, String name) throws FeatureStor
}

public void updateStatisticsConfig(FeatureGroup featureGroup) throws FeatureStoreException, IOException {
FeatureGroup apiFG = featureGroupApi.updateStatsConfig(featureGroup);
FeatureGroup apiFG = featureGroupApi.updateMetadata(featureGroup, "updateStatsSettings");
featureGroup.setCorrelations(apiFG.getCorrelations());
featureGroup.setHistograms(apiFG.getHistograms());
}

public void updateDescription(FeatureGroup featureGroup, String description)
throws FeatureStoreException, IOException {
FeatureGroup apiFG = featureGroupApi.updateMetadata(featureGroup.withDescription(description), "updateMetadata");
featureGroup.setDescription(apiFG.getDescription());
}

public void appendFeatures(FeatureGroup featureGroup, List<Feature> features)
throws FeatureStoreException, IOException {
features.addAll(featureGroup.getFeatures());
FeatureGroup apiFG = featureGroupApi.updateMetadata(featureGroup.withFeatures(features), "updateMetadata");
featureGroup.setFeatures(apiFG.getFeatures());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public List<Feature> parseFeatureGroupSchema(Dataset<Row> dataset) throws Featur
for (StructField structField : dataset.schema().fields()) {
// TODO(Fabio): unit test this one for complext types
features.add(new Feature(structField.name(), structField.dataType().catalogString(),
structField.dataType().catalogString(), false, false));
structField.dataType().catalogString(), false, false, null));
}

return features;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ public class FeatureGroupApi {

public static final String FEATURE_GROUP_ROOT_PATH = "/featuregroups";
public static final String FEATURE_GROUP_PATH = FEATURE_GROUP_ROOT_PATH + "{/fgName}{?version}";
public static final String FEATURE_GROUP_ID_PATH = FEATURE_GROUP_ROOT_PATH + "{/fgId}{?updateStatsSettings}";
public static final String FEATURE_GROUP_ID_PATH = FEATURE_GROUP_ROOT_PATH + "{/fgId}{?updateStatsSettings,"
+ "updateMetadata}";
public static final String FEATURE_GROUP_CLEAR_PATH = FEATURE_GROUP_ID_PATH + "/clear";

private static final Logger LOGGER = LoggerFactory.getLogger(FeatureGroupApi.class);
Expand Down Expand Up @@ -123,7 +124,7 @@ public void deleteContent(FeatureGroup featureGroup) throws FeatureStoreExceptio
hopsworksClient.handleRequest(postRequest);
}

public FeatureGroup updateStatsConfig(FeatureGroup featureGroup)
public FeatureGroup updateMetadata(FeatureGroup featureGroup, String queryParameter)
throws FeatureStoreException, IOException {
HopsworksClient hopsworksClient = HopsworksClient.getInstance();
String pathTemplate = PROJECT_PATH
Expand All @@ -134,7 +135,7 @@ public FeatureGroup updateStatsConfig(FeatureGroup featureGroup)
.set("projectId", featureGroup.getFeatureStore().getProjectId())
.set("fsId", featureGroup.getFeatureStore().getId())
.set("fgId", featureGroup.getId())
.set("updateStatsSettings", true)
.set(queryParameter, true)
.expand();

String featureGroupJson = hopsworksClient.getObjectMapper().writeValueAsString(featureGroup);
Expand Down

0 comments on commit 21088c9

Please sign in to comment.