From 2bee52ac40ee5c444581ddeacc9029a9e6d7ea2e Mon Sep 17 00:00:00 2001 From: Danny Chiao Date: Mon, 9 May 2022 17:40:27 -0400 Subject: [PATCH 1/3] fix: Infer features for feature services when they depend on feature views that have no schema Signed-off-by: Danny Chiao --- sdk/python/feast/feature_service.py | 31 +++++++++++++------ sdk/python/feast/feature_store.py | 13 ++++++-- .../registration/test_inference.py | 21 +++++++++++++ 3 files changed, 53 insertions(+), 12 deletions(-) diff --git a/sdk/python/feast/feature_service.py b/sdk/python/feast/feature_service.py index bfa48b3bf4..c5125bd1f4 100644 --- a/sdk/python/feast/feature_service.py +++ b/sdk/python/feast/feature_service.py @@ -39,6 +39,7 @@ class FeatureService: """ name: str + _features: List[Union[FeatureView, OnDemandFeatureView]] feature_view_projections: List[FeatureViewProjection] description: str tags: Dict[str, str] @@ -93,24 +94,34 @@ def __init__( _features = [] self.name = _name + self._features = _features self.feature_view_projections = [] + self.description = description + self.tags = tags or {} + self.owner = owner + self.created_timestamp = None + self.last_updated_timestamp = None + self.logging_config = logging_config + self.infer_features() - for feature_grouping in _features: + def infer_features(self, fvs_to_update: Optional[List[FeatureView]] = None): + self.feature_view_projections = [] + for feature_grouping in self._features: if isinstance(feature_grouping, BaseFeatureView): + # For feature services that depend on an unspecified feature view, apply inferred schema + if len(feature_grouping.projection.features) == 0: + if fvs_to_update is not None: + for fv in fvs_to_update: + if fv.name == feature_grouping.name: + feature_grouping.projection.features = fv.features + break self.feature_view_projections.append(feature_grouping.projection) else: raise ValueError( - f"The feature service {name} has been provided with an invalid type " + f"The feature service {self.name} has been provided with an invalid type " f'{type(feature_grouping)} as part of the "features" argument.)' ) - self.description = description - self.tags = tags or {} - self.owner = owner - self.created_timestamp = None - self.last_updated_timestamp = None - self.logging_config = logging_config - def __repr__(self): items = (f"{k} = {v}" for k, v in self.__dict__.items()) return f"<{self.__class__.__name__}({', '.join(items)})>" @@ -119,7 +130,7 @@ def __str__(self): return str(MessageToJson(self.to_proto())) def __hash__(self): - return hash((self.name)) + return hash(self.name) def __eq__(self, other): if not isinstance(other, FeatureService): diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index 4b015e8ab8..cb33949f82 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -475,8 +475,9 @@ def _make_inferences( entities_to_update: List[Entity], views_to_update: List[FeatureView], odfvs_to_update: List[OnDemandFeatureView], + feature_services_to_update: List[FeatureService], ): - """Makes inferences for entities, feature views, and odfvs.""" + """Makes inferences for entities, feature views, odfvs, and feature services.""" update_entities_with_inferred_types_from_feature_views( entities_to_update, views_to_update, self.config ) @@ -498,6 +499,9 @@ def _make_inferences( for odfv in odfvs_to_update: odfv.infer_features() + for feature_service in feature_services_to_update: + feature_service.infer_features(fvs_to_update=views_to_update) + @log_exceptions_and_usage def _plan( self, desired_repo_contents: RepoContents @@ -553,6 +557,7 @@ def _plan( desired_repo_contents.entities, desired_repo_contents.feature_views, desired_repo_contents.on_demand_feature_views, + desired_repo_contents.feature_services, ) # Compute the desired difference between the current objects in the registry and @@ -692,7 +697,11 @@ def apply( views_to_update, odfvs_to_update, request_views_to_update ) self._make_inferences( - data_sources_to_update, entities_to_update, views_to_update, odfvs_to_update + data_sources_to_update, + entities_to_update, + views_to_update, + odfvs_to_update, + services_to_update, ) # Handle all entityless feature views by using DUMMY_ENTITY as a placeholder entity. diff --git a/sdk/python/tests/integration/registration/test_inference.py b/sdk/python/tests/integration/registration/test_inference.py index 8b719eb733..04104e34fd 100644 --- a/sdk/python/tests/integration/registration/test_inference.py +++ b/sdk/python/tests/integration/registration/test_inference.py @@ -7,6 +7,7 @@ BigQuerySource, Entity, Feature, + FeatureService, FileSource, RedshiftSource, RepoConfig, @@ -332,3 +333,23 @@ def test_update_feature_views_with_inferred_features(): ) assert len(feature_view_2.schema) == 1 assert len(feature_view_2.features) == 1 + + +def test_update_feature_services_with_inferred_features(simple_dataset_1): + with prep_file_source(df=simple_dataset_1, timestamp_field="ts_1") as file_source: + entity1 = Entity(name="test1", join_keys=["id_join_key"]) + feature_view_1 = FeatureView( + name="test1", entities=[entity1], source=file_source, + ) + feature_service = FeatureService(name="fs_1", features=[feature_view_1]) + assert len(feature_service.feature_view_projections) == 1 + assert len(feature_service.feature_view_projections[0].features) == 0 + + update_feature_views_with_inferred_features( + [feature_view_1], [entity1], RepoConfig(provider="local", project="test") + ) + feature_service.infer_features(fvs_to_update=[feature_view_1]) + + assert len(feature_view_1.schema) == 3 + assert len(feature_view_1.features) == 3 + assert len(feature_service.feature_view_projections[0].features) == 3 From b3ad4d77a18a51e34f6cbaa1409a3193ee476aae Mon Sep 17 00:00:00 2001 From: Danny Chiao Date: Mon, 9 May 2022 17:47:16 -0400 Subject: [PATCH 2/3] fix Signed-off-by: Danny Chiao --- sdk/python/feast/feature_service.py | 10 +++++----- sdk/python/feast/feature_store.py | 3 ++- .../tests/integration/registration/test_inference.py | 4 +++- 3 files changed, 10 insertions(+), 7 deletions(-) diff --git a/sdk/python/feast/feature_service.py b/sdk/python/feast/feature_service.py index c5125bd1f4..b59bfad5cf 100644 --- a/sdk/python/feast/feature_service.py +++ b/sdk/python/feast/feature_service.py @@ -104,17 +104,17 @@ def __init__( self.logging_config = logging_config self.infer_features() - def infer_features(self, fvs_to_update: Optional[List[FeatureView]] = None): + def infer_features(self, fvs_to_update: Optional[Dict[str, FeatureView]] = None): self.feature_view_projections = [] for feature_grouping in self._features: if isinstance(feature_grouping, BaseFeatureView): # For feature services that depend on an unspecified feature view, apply inferred schema if len(feature_grouping.projection.features) == 0: if fvs_to_update is not None: - for fv in fvs_to_update: - if fv.name == feature_grouping.name: - feature_grouping.projection.features = fv.features - break + if feature_grouping.name in fvs_to_update: + feature_grouping.projection.features = fvs_to_update[ + feature_grouping.name + ].features self.feature_view_projections.append(feature_grouping.projection) else: raise ValueError( diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index cb33949f82..115f0fd971 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -499,8 +499,9 @@ def _make_inferences( for odfv in odfvs_to_update: odfv.infer_features() + fvs_to_update_map = {view.name: view for view in views_to_update} for feature_service in feature_services_to_update: - feature_service.infer_features(fvs_to_update=views_to_update) + feature_service.infer_features(fvs_to_update=fvs_to_update_map) @log_exceptions_and_usage def _plan( diff --git a/sdk/python/tests/integration/registration/test_inference.py b/sdk/python/tests/integration/registration/test_inference.py index 04104e34fd..b1fb509d75 100644 --- a/sdk/python/tests/integration/registration/test_inference.py +++ b/sdk/python/tests/integration/registration/test_inference.py @@ -348,7 +348,9 @@ def test_update_feature_services_with_inferred_features(simple_dataset_1): update_feature_views_with_inferred_features( [feature_view_1], [entity1], RepoConfig(provider="local", project="test") ) - feature_service.infer_features(fvs_to_update=[feature_view_1]) + feature_service.infer_features( + fvs_to_update={feature_view_1.name: feature_view_1} + ) assert len(feature_view_1.schema) == 3 assert len(feature_view_1.features) == 3 From 9443392b184f593e6fde653c476467c420ab9d12 Mon Sep 17 00:00:00 2001 From: Danny Chiao Date: Mon, 9 May 2022 17:52:40 -0400 Subject: [PATCH 3/3] lint Signed-off-by: Danny Chiao --- sdk/python/feast/feature_service.py | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/sdk/python/feast/feature_service.py b/sdk/python/feast/feature_service.py index b59bfad5cf..9490de38c9 100644 --- a/sdk/python/feast/feature_service.py +++ b/sdk/python/feast/feature_service.py @@ -109,12 +109,14 @@ def infer_features(self, fvs_to_update: Optional[Dict[str, FeatureView]] = None) for feature_grouping in self._features: if isinstance(feature_grouping, BaseFeatureView): # For feature services that depend on an unspecified feature view, apply inferred schema - if len(feature_grouping.projection.features) == 0: - if fvs_to_update is not None: - if feature_grouping.name in fvs_to_update: - feature_grouping.projection.features = fvs_to_update[ - feature_grouping.name - ].features + if ( + fvs_to_update is not None + and len(feature_grouping.projection.features) == 0 + and feature_grouping.name in fvs_to_update + ): + feature_grouping.projection.features = fvs_to_update[ + feature_grouping.name + ].features self.feature_view_projections.append(feature_grouping.projection) else: raise ValueError(