Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Make arrow primary interchange for online ODFV execution #4143

Merged
merged 2 commits into from
Apr 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 14 additions & 14 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -2138,7 +2138,7 @@ def _augment_response_with_on_demand_transforms(
)

initial_response = OnlineResponse(online_features_response)
initial_response_df: Optional[pd.DataFrame] = None
initial_response_arrow: Optional[pa.Table] = None
initial_response_dict: Optional[Dict[str, List[Any]]] = None

# Apply on demand transformations and augment the result rows
Expand All @@ -2148,18 +2148,14 @@ def _augment_response_with_on_demand_transforms(
if odfv.mode == "python":
if initial_response_dict is None:
initial_response_dict = initial_response.to_dict()
transformed_features_dict: Dict[str, List[Any]] = (
odfv.get_transformed_features(
initial_response_dict,
full_feature_names,
)
transformed_features_dict: Dict[str, List[Any]] = odfv.transform_dict(
initial_response_dict
)
elif odfv.mode in {"pandas", "substrait"}:
if initial_response_df is None:
initial_response_df = initial_response.to_df()
transformed_features_df: pd.DataFrame = odfv.get_transformed_features(
initial_response_df,
full_feature_names,
if initial_response_arrow is None:
initial_response_arrow = initial_response.to_arrow()
transformed_features_arrow = odfv.transform_arrow(
initial_response_arrow, full_feature_names
)
else:
raise Exception(
Expand All @@ -2169,11 +2165,11 @@ def _augment_response_with_on_demand_transforms(
transformed_features = (
transformed_features_dict
if odfv.mode == "python"
else transformed_features_df
else transformed_features_arrow
)
transformed_columns = (
transformed_features.columns
if isinstance(transformed_features, pd.DataFrame)
transformed_features.column_names
if isinstance(transformed_features, pa.Table)
else transformed_features
)
selected_subset = [f for f in transformed_columns if f in _feature_refs]
Expand All @@ -2183,6 +2179,10 @@ def _augment_response_with_on_demand_transforms(
feature_vector = transformed_features[selected_feature]
proto_values.append(
python_values_to_proto_values(feature_vector, ValueType.UNKNOWN)
if odfv.mode == "python"
else python_values_to_proto_values(
feature_vector.to_numpy(), ValueType.UNKNOWN
)
)

odfv_result_names |= set(selected_subset)
Expand Down
89 changes: 1 addition & 88 deletions sdk/python/feast/on_demand_feature_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
from typeguard import typechecked

from feast.base_feature_view import BaseFeatureView
from feast.batch_feature_view import BatchFeatureView
from feast.data_source import RequestSource
from feast.errors import RegistryInferenceFailure, SpecifiedFeaturesNotPresentError
from feast.feature_view import FeatureView
Expand Down Expand Up @@ -493,53 +492,7 @@ def transform_arrow(
]
)

def get_transformed_features_df(
self,
df_with_features: pd.DataFrame,
full_feature_names: bool = False,
) -> pd.DataFrame:
# Apply on demand transformations
if not isinstance(df_with_features, pd.DataFrame):
raise TypeError("get_transformed_features_df only accepts pd.DataFrame")
columns_to_cleanup = []
for source_fv_projection in self.source_feature_view_projections.values():
for feature in source_fv_projection.features:
full_feature_ref = f"{source_fv_projection.name}__{feature.name}"
if full_feature_ref in df_with_features.keys():
# Make sure the partial feature name is always present
df_with_features[feature.name] = df_with_features[full_feature_ref]
columns_to_cleanup.append(feature.name)
elif feature.name in df_with_features.keys():
# Make sure the full feature name is always present
df_with_features[full_feature_ref] = df_with_features[feature.name]
columns_to_cleanup.append(full_feature_ref)

# Compute transformed values and apply to each result row
df_with_transformed_features: pd.DataFrame = (
self.feature_transformation.transform(df_with_features)
)

# Work out whether the correct columns names are used.
rename_columns: Dict[str, str] = {}
for feature in self.features:
short_name = feature.name
long_name = self._get_projected_feature_name(feature.name)
if (
short_name in df_with_transformed_features.columns
and full_feature_names
):
rename_columns[short_name] = long_name
elif not full_feature_names:
# Long name must be in dataframe.
rename_columns[long_name] = short_name

# Cleanup extra columns used for transformation
df_with_transformed_features = df_with_transformed_features[
[f.name for f in self.features]
]
return df_with_transformed_features.rename(columns=rename_columns)

def get_transformed_features_dict(
def transform_dict(
self,
feature_dict: Dict[str, Any], # type: ignore
) -> Dict[str, Any]:
Expand All @@ -566,29 +519,6 @@ def get_transformed_features_dict(
del output_dict[feature_name]
return output_dict

def get_transformed_features(
self,
features: Union[Dict[str, Any], pd.DataFrame],
full_feature_names: bool = False,
) -> Union[Dict[str, Any], pd.DataFrame]:
# TODO: classic inheritance pattern....maybe fix this
if self.mode == "python" and isinstance(features, Dict):
# note full_feature_names is not needed for the dictionary
return self.get_transformed_features_dict(
feature_dict=features,
)
elif self.mode in {"pandas", "substrait"} and isinstance(
features, pd.DataFrame
):
return self.get_transformed_features_df(
df_with_features=features,
full_feature_names=full_feature_names,
)
else:
raise Exception(
f'Invalid OnDemandFeatureMode: {self.mode}. Expected one of "pandas" or "python".'
)

def infer_features(self) -> None:
inferred_features = self.feature_transformation.infer_features(
self._construct_random_input()
Expand Down Expand Up @@ -745,23 +675,6 @@ def decorator(user_function):
return decorator


def feature_view_to_batch_feature_view(fv: FeatureView) -> BatchFeatureView:
bfv = BatchFeatureView(
name=fv.name,
entities=fv.entities,
ttl=fv.ttl,
tags=fv.tags,
online=fv.online,
owner=fv.owner,
schema=fv.schema,
source=fv.batch_source,
)

bfv.features = copy.copy(fv.features)
bfv.entities = copy.copy(fv.entities)
return bfv


def _empty_odfv_udf_fn(x: Any) -> Any:
# just an identity mapping, otherwise we risk tripping some downstream tests
return x
11 changes: 11 additions & 0 deletions sdk/python/feast/online_response.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from typing import Any, Dict, List

import pandas as pd
import pyarrow as pa

from feast.feature_view import DUMMY_ENTITY_ID
from feast.protos.feast.serving.ServingService_pb2 import GetOnlineFeaturesResponse
Expand Down Expand Up @@ -77,3 +78,13 @@ def to_df(self, include_event_timestamps: bool = False) -> pd.DataFrame:
"""

return pd.DataFrame(self.to_dict(include_event_timestamps))

def to_arrow(self, include_event_timestamps: bool = False) -> pa.Table:
"""
Converts GetOnlineFeaturesResponse features into pyarrow Table.

Args:
is_with_event_timestamps: bool Optionally include feature timestamps in the table
"""

return pa.Table.from_pydict(self.to_dict(include_event_timestamps))
3 changes: 0 additions & 3 deletions sdk/python/feast/transformation/python_transformation.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,6 @@ def __eq__(self, other):
"Comparisons should only involve PythonTransformation class objects."
)

if not super().__eq__(other):
return False

if (
self.udf_string != other.udf_string
or self.udf.__code__.co_code != other.udf.__code__.co_code
Expand Down
3 changes: 0 additions & 3 deletions sdk/python/feast/transformation/substrait_transformation.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,6 @@ def __eq__(self, other):
"Comparisons should only involve SubstraitTransformation class objects."
)

if not super().__eq__(other):
return False

return (
self.substrait_plan == other.substrait_plan
and self.ibis_function.__code__.co_code
Expand Down
5 changes: 2 additions & 3 deletions sdk/python/feast/transformation_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,14 @@ def TransformFeatures(self, request, context):
context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
raise

df = pa.ipc.open_file(request.transformation_input.arrow_value).read_pandas()
df = pa.ipc.open_file(request.transformation_input.arrow_value).read_all()

if odfv.mode != "pandas":
raise Exception(
f'OnDemandFeatureView mode "{odfv.mode}" not supported by TransformationServer.'
)

result_df = odfv.get_transformed_features_df(df, True)
result_arrow = pa.Table.from_pandas(result_df)
result_arrow = odfv.transform_arrow(df, True)
sink = pa.BufferOutputStream()
writer = pa.ipc.new_file(sink, result_arrow.schema)
writer.write_table(result_arrow)
Expand Down
2 changes: 1 addition & 1 deletion sdk/python/tests/unit/test_on_demand_feature_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ def test_python_native_transformation_mode():
}
)

assert on_demand_feature_view_python_native.get_transformed_features(
assert on_demand_feature_view_python_native.transform_dict(
{
"feature1": 0,
"feature2": 1,
Expand Down
Loading