Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Infer features for feature services when they depend on feature views without schemas #2653

Merged
merged 3 commits into from
May 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 23 additions & 10 deletions sdk/python/feast/feature_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ class FeatureService:
"""

name: str
_features: List[Union[FeatureView, OnDemandFeatureView]]
feature_view_projections: List[FeatureViewProjection]
description: str
tags: Dict[str, str]
Expand Down Expand Up @@ -93,24 +94,36 @@ def __init__(
_features = []

self.name = _name
self._features = _features
self.feature_view_projections = []
self.description = description
self.tags = tags or {}
self.owner = owner
self.created_timestamp = None
self.last_updated_timestamp = None
self.logging_config = logging_config
self.infer_features()

for feature_grouping in _features:
def infer_features(self, fvs_to_update: Optional[Dict[str, FeatureView]] = None):
self.feature_view_projections = []
for feature_grouping in self._features:
if isinstance(feature_grouping, BaseFeatureView):
# For feature services that depend on an unspecified feature view, apply inferred schema
if (
fvs_to_update is not None
and len(feature_grouping.projection.features) == 0
and feature_grouping.name in fvs_to_update
):
feature_grouping.projection.features = fvs_to_update[
feature_grouping.name
].features
self.feature_view_projections.append(feature_grouping.projection)
else:
raise ValueError(
f"The feature service {name} has been provided with an invalid type "
f"The feature service {self.name} has been provided with an invalid type "
f'{type(feature_grouping)} as part of the "features" argument.)'
)

self.description = description
self.tags = tags or {}
self.owner = owner
self.created_timestamp = None
self.last_updated_timestamp = None
self.logging_config = logging_config

def __repr__(self):
items = (f"{k} = {v}" for k, v in self.__dict__.items())
return f"<{self.__class__.__name__}({', '.join(items)})>"
Expand All @@ -119,7 +132,7 @@ def __str__(self):
return str(MessageToJson(self.to_proto()))

def __hash__(self):
return hash((self.name))
return hash(self.name)

def __eq__(self, other):
if not isinstance(other, FeatureService):
Expand Down
14 changes: 12 additions & 2 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -475,8 +475,9 @@ def _make_inferences(
entities_to_update: List[Entity],
views_to_update: List[FeatureView],
odfvs_to_update: List[OnDemandFeatureView],
feature_services_to_update: List[FeatureService],
):
"""Makes inferences for entities, feature views, and odfvs."""
"""Makes inferences for entities, feature views, odfvs, and feature services."""
update_entities_with_inferred_types_from_feature_views(
entities_to_update, views_to_update, self.config
)
Expand All @@ -498,6 +499,10 @@ def _make_inferences(
for odfv in odfvs_to_update:
odfv.infer_features()

fvs_to_update_map = {view.name: view for view in views_to_update}
for feature_service in feature_services_to_update:
feature_service.infer_features(fvs_to_update=fvs_to_update_map)

@log_exceptions_and_usage
def _plan(
self, desired_repo_contents: RepoContents
Expand Down Expand Up @@ -553,6 +558,7 @@ def _plan(
desired_repo_contents.entities,
desired_repo_contents.feature_views,
desired_repo_contents.on_demand_feature_views,
desired_repo_contents.feature_services,
)

# Compute the desired difference between the current objects in the registry and
Expand Down Expand Up @@ -692,7 +698,11 @@ def apply(
views_to_update, odfvs_to_update, request_views_to_update
)
self._make_inferences(
data_sources_to_update, entities_to_update, views_to_update, odfvs_to_update
data_sources_to_update,
entities_to_update,
views_to_update,
odfvs_to_update,
services_to_update,
)

# Handle all entityless feature views by using DUMMY_ENTITY as a placeholder entity.
Expand Down
23 changes: 23 additions & 0 deletions sdk/python/tests/integration/registration/test_inference.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
BigQuerySource,
Entity,
Feature,
FeatureService,
FileSource,
RedshiftSource,
RepoConfig,
Expand Down Expand Up @@ -332,3 +333,25 @@ def test_update_feature_views_with_inferred_features():
)
assert len(feature_view_2.schema) == 1
assert len(feature_view_2.features) == 1


def test_update_feature_services_with_inferred_features(simple_dataset_1):
with prep_file_source(df=simple_dataset_1, timestamp_field="ts_1") as file_source:
entity1 = Entity(name="test1", join_keys=["id_join_key"])
feature_view_1 = FeatureView(
name="test1", entities=[entity1], source=file_source,
)
feature_service = FeatureService(name="fs_1", features=[feature_view_1])
assert len(feature_service.feature_view_projections) == 1
assert len(feature_service.feature_view_projections[0].features) == 0

update_feature_views_with_inferred_features(
[feature_view_1], [entity1], RepoConfig(provider="local", project="test")
)
feature_service.infer_features(
fvs_to_update={feature_view_1.name: feature_view_1}
)

assert len(feature_view_1.schema) == 3
assert len(feature_view_1.features) == 3
assert len(feature_service.feature_view_projections[0].features) == 3