Skip to content

Commit

Permalink
feat: Adding support for Native Python feature transformations for OD…
Browse files Browse the repository at this point in the history
…FVs (#4045)
  • Loading branch information
franciscojavierarceo authored Mar 28, 2024
1 parent ef62def commit 73bc853
Show file tree
Hide file tree
Showing 10 changed files with 488 additions and 38 deletions.
5 changes: 5 additions & 0 deletions sdk/python/feast/embedded_go/online_features_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,11 @@ def transformation_callback(
# the typeguard requirement.
full_feature_names = bool(full_feature_names)

if odfv.mode != "pandas":
raise Exception(
f"OnDemandFeatureView mode '{odfv.mode} not supported by EmbeddedOnlineFeatureServer."
)

output = odfv.get_transformed_features_df(
input_record.to_pandas(), full_feature_names=full_feature_names
)
Expand Down
55 changes: 41 additions & 14 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -1995,26 +1995,53 @@ def _augment_response_with_on_demand_transforms(
)

initial_response = OnlineResponse(online_features_response)
initial_response_df = initial_response.to_df()
initial_response_df: Optional[pd.DataFrame] = None
initial_response_dict: Optional[Dict[str, List[Any]]] = None

# Apply on demand transformations and augment the result rows
odfv_result_names = set()
for odfv_name, _feature_refs in odfv_feature_refs.items():
odfv = requested_odfv_map[odfv_name]
transformed_features_df = odfv.get_transformed_features_df(
initial_response_df,
full_feature_names,
)
selected_subset = [
f for f in transformed_features_df.columns if f in _feature_refs
]

proto_values = [
python_values_to_proto_values(
transformed_features_df[feature].values, ValueType.UNKNOWN
if odfv.mode == "python":
if initial_response_dict is None:
initial_response_dict = initial_response.to_dict()
transformed_features_dict: Dict[
str, List[Any]
] = odfv.get_transformed_features(
initial_response_dict,
full_feature_names,
)
for feature in selected_subset
]
elif odfv.mode in {"pandas", "substrait"}:
if initial_response_df is None:
initial_response_df = initial_response.to_df()
transformed_features_df: pd.DataFrame = odfv.get_transformed_features(
initial_response_df,
full_feature_names,
)
else:
raise Exception(
f"Invalid OnDemandFeatureMode: {odfv.mode}. Expected one of 'pandas', 'python', or 'substrait'."
)

transformed_features = (
transformed_features_dict
if odfv.mode == "python"
else transformed_features_df
)
transformed_columns = (
transformed_features.columns
if isinstance(transformed_features, pd.DataFrame)
else transformed_features
)
selected_subset = [f for f in transformed_columns if f in _feature_refs]

proto_values = []
for selected_feature in selected_subset:
if odfv.mode in ["python", "pandas"]:
feature_vector = transformed_features[selected_feature]
proto_values.append(
python_values_to_proto_values(feature_vector, ValueType.UNKNOWN)
)

odfv_result_names |= set(selected_subset)

Expand Down
8 changes: 8 additions & 0 deletions sdk/python/feast/infra/offline_stores/offline_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,10 @@ def to_df(
if self.on_demand_feature_views:
# TODO(adchia): Fix requirement to specify dependent feature views in feature_refs
for odfv in self.on_demand_feature_views:
if odfv.mode not in {"pandas", "substrait"}:
raise Exception(
f'OnDemandFeatureView mode "{odfv.mode}" not supported for offline processing.'
)
features_df = features_df.join(
odfv.get_transformed_features_df(
features_df,
Expand Down Expand Up @@ -124,6 +128,10 @@ def to_arrow(
features_df = self._to_df_internal(timeout=timeout)
if self.on_demand_feature_views:
for odfv in self.on_demand_feature_views:
if odfv.mode != "pandas":
raise Exception(
f'OnDemandFeatureView mode "{odfv.mode}" not supported for offline processing.'
)
features_df = features_df.join(
odfv.get_transformed_features_df(
features_df,
Expand Down
Loading

0 comments on commit 73bc853

Please sign in to comment.