From 38d9291007d8e2ad287707e047f460e1b7fd06ec Mon Sep 17 00:00:00 2001 From: Moritz Meister <8422705+moritzmeister@users.noreply.github.com> Date: Thu, 15 Oct 2020 12:10:49 +0200 Subject: [PATCH] [HOPSWORKS-2009] Append Features to Feature Group schema (#115) * add default_value * add append features * change * rearrange * add update description * fix * fix * send features * fix docstring * deduplicate code * fix docstring --- python/{setup.cfg => .flake8} | 0 python/.pre-commit-config.yaml | 7 +-- python/hsfs/core/feature_group_api.py | 28 +++++++++--- python/hsfs/core/feature_group_engine.py | 35 ++++++++++++++- python/hsfs/feature.py | 19 ++++++++ python/hsfs/feature_group.py | 57 +++++++++++++++++++++++- 6 files changed, 134 insertions(+), 12 deletions(-) rename python/{setup.cfg => .flake8} (100%) diff --git a/python/setup.cfg b/python/.flake8 similarity index 100% rename from python/setup.cfg rename to python/.flake8 diff --git a/python/.pre-commit-config.yaml b/python/.pre-commit-config.yaml index f479373a60..33b39942be 100644 --- a/python/.pre-commit-config.yaml +++ b/python/.pre-commit-config.yaml @@ -1,15 +1,16 @@ exclude: setup.py repos: - repo: https://github.com/psf/black - rev: 19.10b0 + rev: 20.8b1 hooks: - id: black - language_version: python3.6 + language_version: python3 - repo: https://gitlab.com/pycqa/flake8 rev: 3.8.3 hooks: - id: flake8 - language_version: python3.6 + language_version: python3 + args: [--config=python/.flake8] - repo: https://github.com/pre-commit/pre-commit-hooks rev: v2.4.0 hooks: diff --git a/python/hsfs/core/feature_group_api.py b/python/hsfs/core/feature_group_api.py index 3f6a74c61a..62960c741c 100644 --- a/python/hsfs/core/feature_group_api.py +++ b/python/hsfs/core/feature_group_api.py @@ -113,11 +113,27 @@ def delete(self, feature_group_instance): ] _client._send_request("DELETE", path_params) - def update_statistics_config(self, feature_group_instance): - """Update the statistics configuration of a feature group. + def update_metadata( + self, feature_group_instance, feature_group_copy, query_parameter + ): + """Update the metadata of a feature group. - :param feature_group_instance: metadata object of feature group - :type feature_group_instance: FeatureGroup + This only updates description and schema/features. The + `feature_group_copy` is the metadata object sent to the backend, while + `feature_group_instance` is the user object, which is only updated + after a successful REST call. + + # Arguments + feature_group_instance: FeatureGroup. User metadata object of the + feature group. + feature_group_copy: FeatureGroup. Metadata object of the feature + group with the information to be updated. + query_parameter: str. Query parameter that will be set to true to + control which information is updated. E.g. "updateMetadata" or + "updateStatsSettings". + + # Returns + FeatureGroup. The updated feature group metadata object. """ _client = client.get_instance() path_params = [ @@ -129,13 +145,13 @@ def update_statistics_config(self, feature_group_instance): feature_group_instance.id, ] headers = {"content-type": "application/json"} - query_params = {"updateStatsSettings": True} + query_params = {query_parameter: True} return feature_group_instance.update_from_response_json( _client._send_request( "PUT", path_params, query_params, headers=headers, - data=feature_group_instance.json(), + data=feature_group_copy.json(), ), ) diff --git a/python/hsfs/core/feature_group_engine.py b/python/hsfs/core/feature_group_engine.py index 72a95e9ee4..b149f7a903 100644 --- a/python/hsfs/core/feature_group_engine.py +++ b/python/hsfs/core/feature_group_engine.py @@ -15,6 +15,7 @@ # from hsfs import engine +from hsfs import feature_group as fg from hsfs.core import feature_group_api, storage_connector_api, tags_api @@ -105,7 +106,9 @@ def delete(self, feature_group): def update_statistics_config(self, feature_group): """Update the statistics configuration of a feature group.""" - self._feature_group_api.update_statistics_config(feature_group) + self._feature_group_api.update_statistics_config( + feature_group, feature_group, "updateStatsSettings" + ) def _get_table_name(self, feature_group): return ( @@ -139,3 +142,33 @@ def sql(self, query, feature_store_name, dataframe_type, storage): return engine.get_instance().sql( query, feature_store_name, online_conn, dataframe_type ) + + def append_features(self, feature_group, new_features): + """Appends features to a feature group.""" + # perform changes on copy in case the update fails, so we don't leave + # the user object in corrupted state + copy_feature_group = fg.FeatureGroup( + None, + None, + None, + None, + id=feature_group.id, + features=feature_group.features + new_features, + ) + self._feature_group_api.update_metadata( + feature_group, copy_feature_group, "updateMetadata" + ) + + def update_description(self, feature_group, description): + """Updates the description of a feature group.""" + copy_feature_group = fg.FeatureGroup( + None, + None, + description, + None, + id=feature_group.id, + features=feature_group.features, + ) + self._feature_group_api.update_metadata( + feature_group, copy_feature_group, "updateMetadata" + ) diff --git a/python/hsfs/feature.py b/python/hsfs/feature.py index 78e969496c..3898b67e48 100644 --- a/python/hsfs/feature.py +++ b/python/hsfs/feature.py @@ -26,6 +26,7 @@ def __init__( primary=None, partition=None, online_type=None, + default_value=None, ): self._name = name self._type = type @@ -34,6 +35,7 @@ def __init__( self._primary = primary or False self._partition = partition or False self._online_type = online_type + self._default_value = default_value def to_dict(self): return { @@ -43,6 +45,7 @@ def to_dict(self): "partition": self._partition, "primary": self._primary, "onlineType": self._online_type, + "defaultValue": self._default_value, } @classmethod @@ -54,10 +57,18 @@ def from_response_json(cls, json_dict): def name(self): return self._name + @name.setter + def name(self, name): + self._name = name + @property def type(self): return self._type + @type.setter + def type(self, type): + self._type = type + @property def primary(self): return self._primary @@ -73,3 +84,11 @@ def partition(self): @partition.setter def partition(self, partition): self._partition = partition + + @property + def default_value(self): + return self._default_value + + @default_value.setter + def default_value(self, default_value): + self._default_value = default_value diff --git a/python/hsfs/feature_group.py b/python/hsfs/feature_group.py index e828368b0b..e6e5461b79 100644 --- a/python/hsfs/feature_group.py +++ b/python/hsfs/feature_group.py @@ -60,7 +60,10 @@ def __init__( self._version = version self._name = name self._id = id - self._features = [feature.Feature.from_response_json(feat) for feat in features] + self._features = [ + feature.Feature.from_response_json(feat) if isinstance(feat, dict) else feat + for feat in features + ] self._location = location self._jobs = jobs self._online_enabled = online_enabled @@ -133,7 +136,10 @@ def save(self, features, write_options={}): user_version = self._version self._feature_group_engine.save( - self, feature_dataframe, self._default_storage, write_options, + self, + feature_dataframe, + self._default_storage, + write_options, ) if self.statistics_config.enabled: self._statistics_engine.compute_statistics(self, feature_dataframe) @@ -174,6 +180,18 @@ def update_statistics_config(self): self._feature_group_engine.update_statistics_config(self) return self + def update_description(self, description): + """Update the description of the feature gorup. + + # Arguments + description: str. New description string. + + # Returns + FeatureGroup. The updated feature group object. + """ + self._feature_group_engine.update_description(self, description) + return self + def compute_statistics(self): """Recompute the statistics for the feature group and save them to the feature store. @@ -195,6 +213,41 @@ def compute_statistics(self): util.StorageWarning, ) + def append_features(self, features): + """Append features to the schema of the feature group. + + It is only possible to append features to a feature group. Removing + features is considered a breaking change. + + # Arguments + features: Feature or list. A feature object or list thereof to append to + the schema of the feature group. + + # Returns + FeatureGroup. The updated feature group object. + """ + new_features = [] + if isinstance(features, feature.Feature): + new_features.append(features) + elif isinstance(features, list): + for feat in features: + if isinstance(feat, feature.Feature): + new_features.append(features) + else: + raise TypeError( + "The argument `features` has to be of type `Feature` or " + "a list thereof, but an element is of type: `{}`".format( + type(features) + ) + ) + else: + raise TypeError( + "The argument `features` has to be of type `Feature` or a list " + "thereof, but is of type: `{}`".format(type(features)) + ) + self._feature_group_engine.append_features(self, new_features) + return self + def add_tag(self, name, value=None): """Attach a name/value tag to a feature group.