Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Odfv logic #2088

Merged
merged 3 commits into from
Dec 1, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 34 additions & 18 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -1084,17 +1084,14 @@ def get_online_features(
union_of_entity_keys,
)

initial_response = OnlineResponse(
GetOnlineFeaturesResponse(field_values=result_rows)
)
return self._augment_response_with_on_demand_transforms(
self._augment_response_with_on_demand_transforms(
_feature_refs,
requested_result_row_names,
requested_on_demand_feature_views,
full_feature_names,
initial_response,
result_rows,
)
return OnlineResponse(GetOnlineFeaturesResponse(field_values=result_rows))

def _get_requested_result_fields(
self,
Expand Down Expand Up @@ -1238,28 +1235,49 @@ def _augment_response_with_on_demand_transforms(
self,
feature_refs: List[str],
requested_result_row_names: Set[str],
odfvs: List[OnDemandFeatureView],
requested_on_demand_feature_views: List[OnDemandFeatureView],
full_feature_names: bool,
initial_response: OnlineResponse,
result_rows: List[GetOnlineFeaturesResponse.FieldValues],
) -> OnlineResponse:
all_on_demand_feature_views = {view.name: view for view in odfvs}
all_odfv_feature_names = all_on_demand_feature_views.keys()
):
"""Computes on demand feature values and adds them to the result rows.

if len(all_on_demand_feature_views) == 0:
return initial_response
initial_response_df = initial_response.to_df()
Assumes that 'result_rows' already contains the necessary request data and input feature
views for the on demand feature views. Unneeded feature values such as request data and
unrequested input feature views will be removed from 'result_rows'.

Args:
feature_refs: List of all feature references to be returned.
requested_result_row_names: Fields from 'result_rows' that have been requested, and
therefore should not be dropped.
requested_on_demand_feature_views: List of all odfvs that have been requested.
full_feature_names: A boolean that provides the option to add the feature view prefixes to the feature names,
changing them from the format "feature" to "feature_view__feature" (e.g., "daily_transactions" changes to
"customer_fv__daily_transactions").
result_rows: List of result rows to be augmented with on demand feature values.
"""
if len(requested_on_demand_feature_views) == 0:
return

requested_odfv_map = {
odfv.name: odfv for odfv in requested_on_demand_feature_views
}
requested_odfv_feature_names = requested_odfv_map.keys()

odfv_feature_refs = defaultdict(list)
for feature_ref in feature_refs:
view_name, feature_name = feature_ref.split(":")
if view_name in all_odfv_feature_names:
if view_name in requested_odfv_feature_names:
odfv_feature_refs[view_name].append(feature_name)

# Apply on demand transformations
initial_response = OnlineResponse(
GetOnlineFeaturesResponse(field_values=result_rows)
)
initial_response_df = initial_response.to_df()

# Apply on demand transformations and augment the result rows
odfv_result_names = set()
for odfv_name, _feature_refs in odfv_feature_refs.items():
odfv = all_on_demand_feature_views[odfv_name]
odfv = requested_odfv_map[odfv_name]
transformed_features_df = odfv.get_transformed_features_df(
initial_response_df
)
Expand Down Expand Up @@ -1297,8 +1315,6 @@ def _augment_response_with_on_demand_transforms(
result_row.fields.pop(unneeded_feature)
result_row.statuses.pop(unneeded_feature)

return OnlineResponse(GetOnlineFeaturesResponse(field_values=result_rows))

def _get_feature_views_to_use(
self,
features: Optional[Union[List[str], FeatureService]],
Expand Down