Skip to content

Commit

Permalink
[HOPSWORKS-2009] Append Features to Feature Group schema (#115)
Browse files Browse the repository at this point in the history
* add default_value

* add append features

* change

* rearrange

* add update description

* fix

* fix

* send features

* fix docstring

* deduplicate code

* fix docstring
  • Loading branch information
moritzmeister committed Oct 15, 2020
1 parent 21088c9 commit 38d9291
Show file tree
Hide file tree
Showing 6 changed files with 134 additions and 12 deletions.
File renamed without changes.
7 changes: 4 additions & 3 deletions python/.pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
exclude: setup.py
repos:
- repo: https://github.com/psf/black
rev: 19.10b0
rev: 20.8b1
hooks:
- id: black
language_version: python3.6
language_version: python3
- repo: https://gitlab.com/pycqa/flake8
rev: 3.8.3
hooks:
- id: flake8
language_version: python3.6
language_version: python3
args: [--config=python/.flake8]
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v2.4.0
hooks:
Expand Down
28 changes: 22 additions & 6 deletions python/hsfs/core/feature_group_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,11 +113,27 @@ def delete(self, feature_group_instance):
]
_client._send_request("DELETE", path_params)

def update_statistics_config(self, feature_group_instance):
"""Update the statistics configuration of a feature group.
def update_metadata(
self, feature_group_instance, feature_group_copy, query_parameter
):
"""Update the metadata of a feature group.
:param feature_group_instance: metadata object of feature group
:type feature_group_instance: FeatureGroup
This only updates description and schema/features. The
`feature_group_copy` is the metadata object sent to the backend, while
`feature_group_instance` is the user object, which is only updated
after a successful REST call.
# Arguments
feature_group_instance: FeatureGroup. User metadata object of the
feature group.
feature_group_copy: FeatureGroup. Metadata object of the feature
group with the information to be updated.
query_parameter: str. Query parameter that will be set to true to
control which information is updated. E.g. "updateMetadata" or
"updateStatsSettings".
# Returns
FeatureGroup. The updated feature group metadata object.
"""
_client = client.get_instance()
path_params = [
Expand All @@ -129,13 +145,13 @@ def update_statistics_config(self, feature_group_instance):
feature_group_instance.id,
]
headers = {"content-type": "application/json"}
query_params = {"updateStatsSettings": True}
query_params = {query_parameter: True}
return feature_group_instance.update_from_response_json(
_client._send_request(
"PUT",
path_params,
query_params,
headers=headers,
data=feature_group_instance.json(),
data=feature_group_copy.json(),
),
)
35 changes: 34 additions & 1 deletion python/hsfs/core/feature_group_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#

from hsfs import engine
from hsfs import feature_group as fg
from hsfs.core import feature_group_api, storage_connector_api, tags_api


Expand Down Expand Up @@ -105,7 +106,9 @@ def delete(self, feature_group):

def update_statistics_config(self, feature_group):
"""Update the statistics configuration of a feature group."""
self._feature_group_api.update_statistics_config(feature_group)
self._feature_group_api.update_statistics_config(
feature_group, feature_group, "updateStatsSettings"
)

def _get_table_name(self, feature_group):
return (
Expand Down Expand Up @@ -139,3 +142,33 @@ def sql(self, query, feature_store_name, dataframe_type, storage):
return engine.get_instance().sql(
query, feature_store_name, online_conn, dataframe_type
)

def append_features(self, feature_group, new_features):
"""Appends features to a feature group."""
# perform changes on copy in case the update fails, so we don't leave
# the user object in corrupted state
copy_feature_group = fg.FeatureGroup(
None,
None,
None,
None,
id=feature_group.id,
features=feature_group.features + new_features,
)
self._feature_group_api.update_metadata(
feature_group, copy_feature_group, "updateMetadata"
)

def update_description(self, feature_group, description):
"""Updates the description of a feature group."""
copy_feature_group = fg.FeatureGroup(
None,
None,
description,
None,
id=feature_group.id,
features=feature_group.features,
)
self._feature_group_api.update_metadata(
feature_group, copy_feature_group, "updateMetadata"
)
19 changes: 19 additions & 0 deletions python/hsfs/feature.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ def __init__(
primary=None,
partition=None,
online_type=None,
default_value=None,
):
self._name = name
self._type = type
Expand All @@ -34,6 +35,7 @@ def __init__(
self._primary = primary or False
self._partition = partition or False
self._online_type = online_type
self._default_value = default_value

def to_dict(self):
return {
Expand All @@ -43,6 +45,7 @@ def to_dict(self):
"partition": self._partition,
"primary": self._primary,
"onlineType": self._online_type,
"defaultValue": self._default_value,
}

@classmethod
Expand All @@ -54,10 +57,18 @@ def from_response_json(cls, json_dict):
def name(self):
return self._name

@name.setter
def name(self, name):
self._name = name

@property
def type(self):
return self._type

@type.setter
def type(self, type):
self._type = type

@property
def primary(self):
return self._primary
Expand All @@ -73,3 +84,11 @@ def partition(self):
@partition.setter
def partition(self, partition):
self._partition = partition

@property
def default_value(self):
return self._default_value

@default_value.setter
def default_value(self, default_value):
self._default_value = default_value
57 changes: 55 additions & 2 deletions python/hsfs/feature_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,10 @@ def __init__(
self._version = version
self._name = name
self._id = id
self._features = [feature.Feature.from_response_json(feat) for feat in features]
self._features = [
feature.Feature.from_response_json(feat) if isinstance(feat, dict) else feat
for feat in features
]
self._location = location
self._jobs = jobs
self._online_enabled = online_enabled
Expand Down Expand Up @@ -133,7 +136,10 @@ def save(self, features, write_options={}):

user_version = self._version
self._feature_group_engine.save(
self, feature_dataframe, self._default_storage, write_options,
self,
feature_dataframe,
self._default_storage,
write_options,
)
if self.statistics_config.enabled:
self._statistics_engine.compute_statistics(self, feature_dataframe)
Expand Down Expand Up @@ -174,6 +180,18 @@ def update_statistics_config(self):
self._feature_group_engine.update_statistics_config(self)
return self

def update_description(self, description):
"""Update the description of the feature gorup.
# Arguments
description: str. New description string.
# Returns
FeatureGroup. The updated feature group object.
"""
self._feature_group_engine.update_description(self, description)
return self

def compute_statistics(self):
"""Recompute the statistics for the feature group and save them to the
feature store.
Expand All @@ -195,6 +213,41 @@ def compute_statistics(self):
util.StorageWarning,
)

def append_features(self, features):
"""Append features to the schema of the feature group.
It is only possible to append features to a feature group. Removing
features is considered a breaking change.
# Arguments
features: Feature or list. A feature object or list thereof to append to
the schema of the feature group.
# Returns
FeatureGroup. The updated feature group object.
"""
new_features = []
if isinstance(features, feature.Feature):
new_features.append(features)
elif isinstance(features, list):
for feat in features:
if isinstance(feat, feature.Feature):
new_features.append(features)
else:
raise TypeError(
"The argument `features` has to be of type `Feature` or "
"a list thereof, but an element is of type: `{}`".format(
type(features)
)
)
else:
raise TypeError(
"The argument `features` has to be of type `Feature` or a list "
"thereof, but is of type: `{}`".format(type(features))
)
self._feature_group_engine.append_features(self, new_features)
return self

def add_tag(self, name, value=None):
"""Attach a name/value tag to a feature group.
Expand Down

0 comments on commit 38d9291

Please sign in to comment.