Skip to content

Commit

Permalink
[HOPSWORKS-2188] Remove partition key requirements for HUDI feature g…
Browse files Browse the repository at this point in the history
…roups and allow users to set precombine keys (#198)

Also fixes [HOPSWORKS-2196] remove hudiEnabled.
  • Loading branch information
davitbzh committed Jan 7, 2021
1 parent 867a179 commit cf9fa72
Show file tree
Hide file tree
Showing 9 changed files with 120 additions and 23 deletions.
4 changes: 4 additions & 0 deletions java/src/main/java/com/logicalclocks/hsfs/Feature.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ public class Feature {
@Setter
private Boolean partition;

@Getter
@Setter
private Boolean hudiPrecombineKey = false;

@Getter
@Setter
private String defaultValue;
Expand Down
20 changes: 16 additions & 4 deletions java/src/main/java/com/logicalclocks/hsfs/FeatureGroup.java
Original file line number Diff line number Diff line change
Expand Up @@ -85,22 +85,28 @@ public class FeatureGroup extends FeatureGroupBase {
// These are only used in the client. In the server they are aggregated in the `features` field
private List<String> partitionKeys;

@JsonIgnore
// This is only used in the client. In the server they are aggregated in the `features` field
private String hudiPrecombineKey;

private FeatureGroupEngine featureGroupEngine = new FeatureGroupEngine();
private StatisticsEngine statisticsEngine = new StatisticsEngine(EntityEndpointType.FEATURE_GROUP);

private static final Logger LOGGER = LoggerFactory.getLogger(FeatureGroup.class);

@Builder
public FeatureGroup(FeatureStore featureStore, @NonNull String name, Integer version, String description,
List<String> primaryKeys, List<String> partitionKeys, boolean onlineEnabled,
TimeTravelFormat timeTravelFormat, List<Feature> features, Boolean statisticsEnabled,
Boolean histograms, Boolean correlations, List<String> statisticColumns) {
List<String> primaryKeys, List<String> partitionKeys, String hudiPrecombineKey,
boolean onlineEnabled, TimeTravelFormat timeTravelFormat, List<Feature> features,
Boolean statisticsEnabled, Boolean histograms, Boolean correlations,
List<String> statisticColumns) {
this.featureStore = featureStore;
this.name = name;
this.version = version;
this.description = description;
this.primaryKeys = primaryKeys;
this.partitionKeys = partitionKeys;
this.hudiPrecombineKey = timeTravelFormat == TimeTravelFormat.HUDI ? hudiPrecombineKey : null;
this.onlineEnabled = onlineEnabled;
this.timeTravelFormat = timeTravelFormat != null ? timeTravelFormat : TimeTravelFormat.HUDI;
this.features = features;
Expand Down Expand Up @@ -198,7 +204,8 @@ public void save(Dataset<Row> featureData) throws FeatureStoreException, IOExcep

public void save(Dataset<Row> featureData, Map<String, String> writeOptions)
throws FeatureStoreException, IOException {
featureGroupEngine.saveFeatureGroup(this, featureData, primaryKeys, partitionKeys, writeOptions);
featureGroupEngine.saveFeatureGroup(this, featureData, primaryKeys, partitionKeys, hudiPrecombineKey,
writeOptions);
if (statisticsEnabled) {
statisticsEngine.computeStatistics(this, featureData);
}
Expand All @@ -208,6 +215,11 @@ public void insert(Dataset<Row> featureData) throws IOException, FeatureStoreExc
insert(featureData, null, false);
}

public void insert(Dataset<Row> featureData, Map<String, String> writeOptions)
throws FeatureStoreException, IOException {
insert(featureData, null, false, null, writeOptions);
}

public void insert(Dataset<Row> featureData, Storage storage) throws IOException, FeatureStoreException {
insert(featureData, storage, false, null, null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.logicalclocks.hsfs.engine;

import com.logicalclocks.hsfs.Feature;
import com.logicalclocks.hsfs.FeatureGroup;
import com.logicalclocks.hsfs.FeatureGroupCommit;
import com.logicalclocks.hsfs.FeatureStoreException;
Expand Down Expand Up @@ -57,7 +58,7 @@ public class FeatureGroupEngine {
* @throws IOException
*/
public void saveFeatureGroup(FeatureGroup featureGroup, Dataset<Row> dataset, List<String> primaryKeys,
List<String> partitionKeys, Map<String, String> writeOptions)
List<String> partitionKeys, String hudiPrecombineKey, Map<String, String> writeOptions)
throws FeatureStoreException, IOException {

if (featureGroup.getFeatures() == null) {
Expand Down Expand Up @@ -86,6 +87,15 @@ public void saveFeatureGroup(FeatureGroup featureGroup, Dataset<Row> dataset, Li
}));
}

/* set hudi precombine key name */
if (hudiPrecombineKey != null) {
featureGroup.getFeatures().forEach(f -> {
if (f.getName().equals(hudiPrecombineKey)) {
f.setHudiPrecombineKey(true);
}
});
}

// Send Hopsworks the request to create a new feature group
FeatureGroup apiFG = featureGroupApi.save(featureGroup);

Expand All @@ -102,6 +112,12 @@ public void saveFeatureGroup(FeatureGroup featureGroup, Dataset<Row> dataset, Li
featureGroup.setCorrelations(apiFG.getCorrelations());
featureGroup.setHistograms(apiFG.getHistograms());

/* if hudi precombine key was not provided and TimeTravelFormat is HUDI, retrieve from backend and set */
if (featureGroup.getTimeTravelFormat() == TimeTravelFormat.HUDI & hudiPrecombineKey == null) {
List<Feature> features = apiFG.getFeatures();
featureGroup.setFeatures(features);
}

// Write the dataframe
saveDataframe(featureGroup, dataset, null, SaveMode.Append,
featureGroup.getTimeTravelFormat() == TimeTravelFormat.HUDI
Expand Down
23 changes: 14 additions & 9 deletions java/src/main/java/com/logicalclocks/hsfs/engine/HudiEngine.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.logicalclocks.hsfs.engine;

import com.logicalclocks.hsfs.Feature;
import com.logicalclocks.hsfs.FeatureGroup;
import com.logicalclocks.hsfs.FeatureGroupCommit;
import com.logicalclocks.hsfs.FeatureStoreException;
Expand Down Expand Up @@ -67,6 +68,8 @@ public class HudiEngine {
"hoodie.datasource.hive_sync.partition_extractor_class";
private static final String DEFAULT_HIVE_PARTITION_EXTRACTOR_CLASS_OPT_VAL =
"org.apache.hudi.hive.MultiPartKeysValueExtractor";
private static final String HIVE_NON_PARTITION_EXTRACTOR_CLASS_OPT_VAL =
"org.apache.hudi.hive.NonPartitionedExtractor";
private static final String HIVE_AUTO_CREATE_DATABASE_OPT_KEY = "hoodie.datasource.hive_sync.auto_create_database";
private static final String HIVE_AUTO_CREATE_DATABASE_OPT_VAL = "false";

Expand Down Expand Up @@ -155,24 +158,26 @@ private Map<String, String> setupHudiWriteOpts(FeatureGroup featureGroup, HudiOp

// primary keys
Seq<String> primaryColumns = utils.getPrimaryColumns(featureGroup);
if (primaryColumns.isEmpty()) {
throw new FeatureStoreException("For time travel enabled feature groups You must provide at least 1 primary key");
}
hudiArgs.put(HUDI_RECORD_KEY, primaryColumns.mkString(","));

// table name
String tableName = utils.getFgName(featureGroup);
hudiArgs.put(HUDI_TABLE_NAME, tableName);

// partition keys
Seq<String> partitionColumns = utils.getPartitionColumns(featureGroup);
if (!partitionColumns.isEmpty()) {
hudiArgs.put(HUDI_PARTITION_FIELD, partitionColumns.mkString(":SIMPLE,") + ":SIMPLE");
// For precombine key take 1st primary key
hudiArgs.put(HUDI_PRECOMBINE_FIELD, primaryColumns.head());
hudiArgs.put(HUDI_HIVE_SYNC_PARTITION_FIELDS, partitionColumns.mkString(","));
hudiArgs.put(HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, DEFAULT_HIVE_PARTITION_EXTRACTOR_CLASS_OPT_VAL);
} else {
hudiArgs.put(HUDI_PARTITION_FIELD, "");
hudiArgs.put(HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, HIVE_NON_PARTITION_EXTRACTOR_CLASS_OPT_VAL);
}

// table name
String tableName = utils.getFgName(featureGroup);
hudiArgs.put(HUDI_TABLE_NAME, tableName);
hudiArgs.put(HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, DEFAULT_HIVE_PARTITION_EXTRACTOR_CLASS_OPT_VAL);
String precombineKey = featureGroup.getFeatures().stream().filter(Feature::getHudiPrecombineKey).findFirst()
.orElseThrow(() -> new FeatureStoreException("Can't find hudi precombine key")).getName();
hudiArgs.put(HUDI_PRECOMBINE_FIELD, precombineKey);

// Hive args
hudiArgs.put(HUDI_HIVE_SYNC_ENABLE, "true");
Expand Down
5 changes: 5 additions & 0 deletions python/hsfs/core/feature_group_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ def save(self, feature_group, feature_dataframe, write_options):
feat.primary = True
if feat.name in feature_group.partition_key:
feat.partition = True
if (
feature_group.hudi_precombine_key is not None
and feat.name == feature_group.hudi_precombine_key
):
feat.hudi_precombine_key = True

self._feature_group_api.save(feature_group)

Expand Down
30 changes: 24 additions & 6 deletions python/hsfs/core/hudi_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ class HudiEngine:
DEFAULT_HIVE_PARTITION_EXTRACTOR_CLASS_OPT_VAL = (
"org.apache.hudi.hive.MultiPartKeysValueExtractor"
)
HIVE_NON_PARTITION_EXTRACTOR_CLASS_OPT_VAL = (
"org.apache.hudi.hive.NonPartitionedExtractor"
)
HUDI_COPY_ON_WRITE = "COPY_ON_WRITE"
HUDI_BULK_INSERT = "bulk_insert"
HUDI_INSERT = "insert"
Expand Down Expand Up @@ -71,9 +74,21 @@ def __init__(
self._table_name = feature_group.name + "_" + str(feature_group.version)

self._primary_key = ",".join(feature_group.primary_key)
self._partition_key = ",".join(feature_group.partition_key)
self._partition_path = ":SIMPLE,".join(feature_group.partition_key) + ":SIMPLE"
self._pre_combine_key = feature_group.primary_key[0]
self._partition_key = (
",".join(feature_group.partition_key)
if len(feature_group.partition_key) >= 1
else ""
)
self._partition_path = (
":SIMPLE,".join(feature_group.partition_key) + ":SIMPLE"
if len(feature_group.partition_key) >= 1
else ""
)
self._pre_combine_key = (
feature_group.hudi_precombine_key
if feature_group.hudi_precombine_key
else feature_group.primary_key[0]
)

self._feature_group_api = feature_group_api.FeatureGroupApi(feature_store_id)
self._storage_connector_api = storage_connector_api.StorageConnectorApi(
Expand Down Expand Up @@ -108,7 +123,6 @@ def register_temporary_table(

def _write_hudi_dataset(self, dataset, save_mode, operation, write_options):
hudi_options = self._setup_hudi_write_opts(operation, write_options)

dataset.write.format(HudiEngine.HUDI_SPARK_FORMAT).options(**hudi_options).mode(
save_mode
).save(self._base_path)
Expand All @@ -123,11 +137,15 @@ def _setup_hudi_write_opts(self, operation, write_options):
_jdbc_url = self._get_conn_str()
hudi_options = {
self.HUDI_KEY_GENERATOR_OPT_KEY: self.HUDI_COMPLEX_KEY_GENERATOR_OPT_VAL,
self.HUDI_PRECOMBINE_FIELD: self._pre_combine_key,
self.HUDI_PRECOMBINE_FIELD: self._pre_combine_key[0]
if isinstance(self._pre_combine_key, list)
else self._pre_combine_key,
self.HUDI_RECORD_KEY: self._primary_key,
self.HUDI_PARTITION_FIELD: self._partition_path,
self.HUDI_TABLE_NAME: self._table_name,
self.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY: self.DEFAULT_HIVE_PARTITION_EXTRACTOR_CLASS_OPT_VAL,
self.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY: self.DEFAULT_HIVE_PARTITION_EXTRACTOR_CLASS_OPT_VAL
if len(self._partition_key) >= 1
else self.HIVE_NON_PARTITION_EXTRACTOR_CLASS_OPT_VAL,
self.HUDI_HIVE_SYNC_ENABLE: "true",
self.HUDI_HIVE_SYNC_TABLE: self._table_name,
self.HUDI_HIVE_SYNC_JDBC_URL: _jdbc_url,
Expand Down
12 changes: 12 additions & 0 deletions python/hsfs/feature.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ def __init__(
description=None,
primary=None,
partition=None,
hudi_precombine_key=None,
online_type=None,
default_value=None,
feature_group_id=None,
Expand All @@ -45,6 +46,7 @@ def __init__(
self._description = description
self._primary = primary or False
self._partition = partition or False
self._hudi_precombine_key = hudi_precombine_key or False
self._online_type = online_type
self._default_value = default_value
if feature_group is not None:
Expand All @@ -58,6 +60,7 @@ def to_dict(self):
"type": self._type,
"description": self._description,
"partition": self._partition,
"hudiPrecombineKey": self._hudi_precombine_key,
"primary": self._primary,
"onlineType": self._online_type,
"defaultValue": self._default_value,
Expand Down Expand Up @@ -123,6 +126,15 @@ def partition(self):
def partition(self, partition):
self._partition = partition

@property
def hudi_precombine_key(self):
"""Whether the feature is part of the hudi precombine key of the feature group."""
return self._hudi_precombine_key

@hudi_precombine_key.setter
def hudi_precombine_key(self, hudi_precombine_key):
self._hudi_precombine_key = hudi_precombine_key

@property
def default_value(self):
"""Default value of the feature as string, if the feature was appended to the
Expand Down
25 changes: 22 additions & 3 deletions python/hsfs/feature_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,7 @@ def __init__(
description="",
partition_key=None,
primary_key=None,
hudi_precombine_key=None,
featurestore_name=None,
created=None,
creator=None,
Expand All @@ -263,7 +264,6 @@ def __init__(
statistic_columns=None,
online_enabled=False,
time_travel_format=None,
hudi_enabled=False,
statistics_config=None,
):
super().__init__(featurestore_id)
Expand All @@ -287,7 +287,6 @@ def __init__(
self._time_travel_format = (
time_travel_format.upper() if time_travel_format is not None else None
)
self._hudi_enabled = hudi_enabled

if id is not None:
# initialized by backend
Expand All @@ -303,12 +302,23 @@ def __init__(
self._partition_key = [
feat.name for feat in self._features if feat.partition is True
]

if time_travel_format.upper() == "HUDI":
# hudi precombine key is always a single feature
self._hudi_precombine_key = [
feat.name
for feat in self._features
if feat.hudi_precombine_key is True
][0]
else:
self._hudi_precombine_key = None
else:
# initialized by user
self.statistics_config = statistics_config
self._primary_key = primary_key
self._partition_key = partition_key
self._hudi_precombine_key = (
hudi_precombine_key if time_travel_format.upper() == "HUDI" else None
)

self._feature_group_engine = feature_group_engine.FeatureGroupEngine(
featurestore_id
Expand Down Expand Up @@ -759,6 +769,11 @@ def partition_key(self):
"""List of features building the partition key."""
return self._partition_key

@property
def hudi_precombine_key(self):
"""Feature name that is the hudi precombine key."""
return self._hudi_precombine_key

@property
def feature_store_id(self):
return self._feature_store_id
Expand Down Expand Up @@ -802,6 +817,10 @@ def primary_key(self, new_primary_key):
def partition_key(self, new_partition_key):
self._partition_key = new_partition_key

@hudi_precombine_key.setter
def hudi_precombine_key(self, hudi_precombine_key):
self._hudi_precombine_key = hudi_precombine_key

@online_enabled.setter
def online_enabled(self, new_online_enabled):
self._online_enabled = new_online_enabled
Expand Down
6 changes: 6 additions & 0 deletions python/hsfs/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@ def create_feature_group(
time_travel_format: Optional[str] = "HUDI",
partition_key: Optional[List[str]] = [],
primary_key: Optional[List[str]] = [],
hudi_precombine_key: Optional[str] = None,
features: Optional[List[feature.Feature]] = [],
statistics_config: Optional[Union[StatisticsConfig, bool, dict]] = None,
):
Expand Down Expand Up @@ -260,6 +261,10 @@ def create_feature_group(
features and will be used as joining key, if not specified otherwise.
Defaults to empty list `[]`, and the first column of the DataFrame will
be used as primary key.
hudi_precombine_key: A feature name to be used as a precombine key for the `"HUDI"`
feature group. Defaults to `None`. If feature group has time travel format
`"HUDI"` and hudi precombine key was not specified then the first primary key of
the feature group will be used as hudi precombine key.
features: Optionally, define the schema of the feature group manually as a
list of `Feature` objects. Defaults to empty list `[]` and will use the
schema information of the DataFrame provided in the `save` method.
Expand All @@ -282,6 +287,7 @@ def create_feature_group(
time_travel_format=time_travel_format,
partition_key=partition_key,
primary_key=primary_key,
hudi_precombine_key=hudi_precombine_key,
featurestore_id=self._id,
featurestore_name=self._name,
features=features,
Expand Down

0 comments on commit cf9fa72

Please sign in to comment.