diff --git a/protos/feast/core/OnDemandFeatureView.proto b/protos/feast/core/OnDemandFeatureView.proto index 50bf8b6f55..741d46e39e 100644 --- a/protos/feast/core/OnDemandFeatureView.proto +++ b/protos/feast/core/OnDemandFeatureView.proto @@ -48,7 +48,9 @@ message OnDemandFeatureViewSpec { // Map of sources for this feature view. map sources = 4; - UserDefinedFunction user_defined_function = 5; + oneof transformation { + UserDefinedFunction user_defined_function = 5; + } // Description of the on demand feature view. string description = 6; diff --git a/protos/feast/registry/RegistryServer.proto b/protos/feast/registry/RegistryServer.proto index 3e7773e89a..ab324f9bd1 100644 --- a/protos/feast/registry/RegistryServer.proto +++ b/protos/feast/registry/RegistryServer.proto @@ -2,7 +2,6 @@ syntax = "proto3"; package feast.registry; -import "google/protobuf/timestamp.proto"; import "google/protobuf/empty.proto"; import "feast/core/Registry.proto"; import "feast/core/Entity.proto"; diff --git a/sdk/python/feast/infra/registry/base_registry.py b/sdk/python/feast/infra/registry/base_registry.py index f23a820d23..d0ab74812e 100644 --- a/sdk/python/feast/infra/registry/base_registry.py +++ b/sdk/python/feast/infra/registry/base_registry.py @@ -665,7 +665,7 @@ def to_dict(self, project: str) -> Dict[str, List[Any]]: odfv_dict["spec"]["userDefinedFunction"][ "body" - ] = on_demand_feature_view.udf_string + ] = on_demand_feature_view.transformation.udf_string registry_dict["onDemandFeatureViews"].append(odfv_dict) for request_feature_view in sorted( self.list_request_feature_views(project=project), diff --git a/sdk/python/feast/on_demand_feature_view.py b/sdk/python/feast/on_demand_feature_view.py index fcafeaa2bc..706f2ec4e4 100644 --- a/sdk/python/feast/on_demand_feature_view.py +++ b/sdk/python/feast/on_demand_feature_view.py @@ -16,6 +16,7 @@ from feast.feature_view import FeatureView from feast.feature_view_projection import FeatureViewProjection from feast.field import Field, from_value_type +from feast.on_demand_pandas_transformation import OnDemandPandasTransformation from feast.protos.feast.core.OnDemandFeatureView_pb2 import ( OnDemandFeatureView as OnDemandFeatureViewProto, ) @@ -24,9 +25,6 @@ OnDemandFeatureViewSpec, OnDemandSource, ) -from feast.protos.feast.core.OnDemandFeatureView_pb2 import ( - UserDefinedFunction as UserDefinedFunctionProto, -) from feast.type_map import ( feast_value_type_to_pandas_type, python_type_to_feast_value_type, @@ -51,8 +49,7 @@ class OnDemandFeatureView(BaseFeatureView): sources with type FeatureViewProjection. source_request_sources: A map from input source names to the actual input sources with type RequestSource. - udf: The user defined transformation function, which must take pandas dataframes - as inputs. + transformation: The user defined transformation. description: A human-readable description. tags: A dictionary of key-value pairs to store arbitrary metadata. owner: The owner of the on demand feature view, typically the email of the primary @@ -63,8 +60,7 @@ class OnDemandFeatureView(BaseFeatureView): features: List[Field] source_feature_view_projections: Dict[str, FeatureViewProjection] source_request_sources: Dict[str, RequestSource] - udf: FunctionType - udf_string: str + transformation: Union[OnDemandPandasTransformation] description: str tags: Dict[str, str] owner: str @@ -82,8 +78,9 @@ def __init__( # noqa: C901 FeatureViewProjection, ] ], - udf: FunctionType, + udf: Optional[FunctionType] = None, udf_string: str = "", + transformation: Optional[Union[OnDemandPandasTransformation]] = None, description: str = "", tags: Optional[Dict[str, str]] = None, owner: str = "", @@ -98,9 +95,10 @@ def __init__( # noqa: C901 sources: A map from input source names to the actual input sources, which may be feature views, or request data sources. These sources serve as inputs to the udf, which will refer to them by name. - udf: The user defined transformation function, which must take pandas + udf (deprecated): The user defined transformation function, which must take pandas dataframes as inputs. - udf_string: The source code version of the udf (for diffing and displaying in Web UI) + udf_string (deprecated): The source code version of the udf (for diffing and displaying in Web UI) + transformation: The user defined transformation. description (optional): A human-readable description. tags (optional): A dictionary of key-value pairs to store arbitrary metadata. owner (optional): The owner of the on demand feature view, typically the email @@ -114,6 +112,18 @@ def __init__( # noqa: C901 owner=owner, ) + if not transformation: + if udf: + warnings.warn( + "udf and udf_string parameters are deprecated. Please use transformation=OnDemandPandasTransformation(udf, udf_string) instead.", + DeprecationWarning, + ) + transformation = OnDemandPandasTransformation(udf, udf_string) + else: + raise Exception( + "OnDemandFeatureView needs to be initialized with either transformation or udf arguments" + ) + self.source_feature_view_projections: Dict[str, FeatureViewProjection] = {} self.source_request_sources: Dict[str, RequestSource] = {} for odfv_source in sources: @@ -126,8 +136,7 @@ def __init__( # noqa: C901 odfv_source.name ] = odfv_source.projection - self.udf = udf # type: ignore - self.udf_string = udf_string + self.transformation = transformation @property def proto_class(self) -> Type[OnDemandFeatureViewProto]: @@ -139,8 +148,7 @@ def __copy__(self): schema=self.features, sources=list(self.source_feature_view_projections.values()) + list(self.source_request_sources.values()), - udf=self.udf, - udf_string=self.udf_string, + transformation=self.transformation, description=self.description, tags=self.tags, owner=self.owner, @@ -161,8 +169,7 @@ def __eq__(self, other): self.source_feature_view_projections != other.source_feature_view_projections or self.source_request_sources != other.source_request_sources - or self.udf_string != other.udf_string - or self.udf.__code__.co_code != other.udf.__code__.co_code + or self.transformation != other.transformation ): return False @@ -200,11 +207,9 @@ def to_proto(self) -> OnDemandFeatureViewProto: name=self.name, features=[feature.to_proto() for feature in self.features], sources=sources, - user_defined_function=UserDefinedFunctionProto( - name=self.udf.__name__, - body=dill.dumps(self.udf, recurse=True), - body_text=self.udf_string, - ), + user_defined_function=self.transformation.to_proto() + if type(self.transformation) == OnDemandPandasTransformation + else None, description=self.description, tags=self.tags, owner=self.owner, @@ -243,6 +248,16 @@ def from_proto(cls, on_demand_feature_view_proto: OnDemandFeatureViewProto): RequestSource.from_proto(on_demand_source.request_data_source) ) + if ( + on_demand_feature_view_proto.spec.WhichOneof("transformation") + == "user_defined_function" + ): + transformation = OnDemandPandasTransformation.from_proto( + on_demand_feature_view_proto.spec.user_defined_function + ) + else: + raise Exception("At least one transformation type needs to be provided") + on_demand_feature_view_obj = cls( name=on_demand_feature_view_proto.spec.name, schema=[ @@ -253,10 +268,7 @@ def from_proto(cls, on_demand_feature_view_proto: OnDemandFeatureViewProto): for feature in on_demand_feature_view_proto.spec.features ], sources=sources, - udf=dill.loads( - on_demand_feature_view_proto.spec.user_defined_function.body - ), - udf_string=on_demand_feature_view_proto.spec.user_defined_function.body_text, + transformation=transformation, description=on_demand_feature_view_proto.spec.description, tags=dict(on_demand_feature_view_proto.spec.tags), owner=on_demand_feature_view_proto.spec.owner, @@ -315,7 +327,8 @@ def get_transformed_features_df( columns_to_cleanup.append(full_feature_ref) # Compute transformed values and apply to each result row - df_with_transformed_features = self.udf.__call__(df_with_features) + + df_with_transformed_features = self.transformation.transform(df_with_features) # Work out whether the correct columns names are used. rename_columns: Dict[str, str] = {} @@ -335,7 +348,7 @@ def get_transformed_features_df( df_with_features.drop(columns=columns_to_cleanup, inplace=True) return df_with_transformed_features.rename(columns=rename_columns) - def infer_features(self): + def infer_features(self) -> None: """ Infers the set of features associated to this feature view from the input source. @@ -365,7 +378,7 @@ def infer_features(self): dtype = feast_value_type_to_pandas_type(field.dtype.to_value_type()) sample_val = rand_df_value[dtype] if dtype in rand_df_value else None df[f"{field.name}"] = pd.Series(sample_val, dtype=dtype) - output_df: pd.DataFrame = self.udf.__call__(df) + output_df: pd.DataFrame = self.transformation.transform(df) inferred_features = [] for f, dt in zip(output_df.columns, output_df.dtypes): inferred_features.append( @@ -396,7 +409,9 @@ def infer_features(self): ) @staticmethod - def get_requested_odfvs(feature_refs, project, registry): + def get_requested_odfvs( + feature_refs, project, registry + ) -> List["OnDemandFeatureView"]: all_on_demand_feature_views = registry.list_on_demand_feature_views( project, allow_cache=True ) @@ -438,7 +453,7 @@ def on_demand_feature_view( of the primary maintainer. """ - def mainify(obj): + def mainify(obj) -> None: # Needed to allow dill to properly serialize the udf. Otherwise, clients will need to have a file with the same # name as the original file defining the ODFV. if obj.__module__ != "__main__": @@ -447,15 +462,17 @@ def mainify(obj): def decorator(user_function): udf_string = dill.source.getsource(user_function) mainify(user_function) + + transformation = OnDemandPandasTransformation(user_function, udf_string) + on_demand_feature_view_obj = OnDemandFeatureView( name=user_function.__name__, sources=sources, schema=schema, - udf=user_function, + transformation=transformation, description=description, tags=tags, owner=owner, - udf_string=udf_string, ) functools.update_wrapper( wrapper=on_demand_feature_view_obj, wrapped=user_function diff --git a/sdk/python/feast/on_demand_pandas_transformation.py b/sdk/python/feast/on_demand_pandas_transformation.py new file mode 100644 index 0000000000..52d45893c5 --- /dev/null +++ b/sdk/python/feast/on_demand_pandas_transformation.py @@ -0,0 +1,56 @@ +from types import FunctionType + +import dill +import pandas as pd + +from feast.protos.feast.core.OnDemandFeatureView_pb2 import ( + UserDefinedFunction as UserDefinedFunctionProto, +) + + +class OnDemandPandasTransformation: + def __init__(self, udf: FunctionType, udf_string: str = ""): + """ + Creates an OnDemandPandasTransformation object. + + Args: + udf: The user defined transformation function, which must take pandas + dataframes as inputs. + udf_string: The source code version of the udf (for diffing and displaying in Web UI) + """ + self.udf = udf + self.udf_string = udf_string + + def transform(self, df: pd.DataFrame) -> pd.DataFrame: + return self.udf.__call__(df) + + def __eq__(self, other): + if not isinstance(other, OnDemandPandasTransformation): + raise TypeError( + "Comparisons should only involve OnDemandPandasTransformation class objects." + ) + + if not super().__eq__(other): + return False + + if ( + self.udf_string != other.udf_string + or self.udf.__code__.co_code != other.udf.__code__.co_code + ): + return False + + return True + + def to_proto(self) -> UserDefinedFunctionProto: + return UserDefinedFunctionProto( + name=self.udf.__name__, + body=dill.dumps(self.udf, recurse=True), + body_text=self.udf_string, + ) + + @classmethod + def from_proto(cls, user_defined_function_proto: UserDefinedFunctionProto): + return OnDemandPandasTransformation( + udf=dill.loads(user_defined_function_proto.body), + udf_string=user_defined_function_proto.body_text, + ) diff --git a/sdk/python/tests/integration/feature_repos/universal/feature_views.py b/sdk/python/tests/integration/feature_repos/universal/feature_views.py index 9bb8aae77f..421ef41601 100644 --- a/sdk/python/tests/integration/feature_repos/universal/feature_views.py +++ b/sdk/python/tests/integration/feature_repos/universal/feature_views.py @@ -15,6 +15,7 @@ ) from feast.data_source import DataSource, RequestSource from feast.feature_view_projection import FeatureViewProjection +from feast.on_demand_feature_view import OnDemandPandasTransformation from feast.types import Array, FeastType, Float32, Float64, Int32, Int64 from tests.integration.feature_repos.universal.entities import ( customer, @@ -70,8 +71,9 @@ def conv_rate_plus_100_feature_view( name=conv_rate_plus_100.__name__, schema=[] if infer_features else _features, sources=sources, - udf=conv_rate_plus_100, - udf_string="raw udf source", + transformation=OnDemandPandasTransformation( + udf=conv_rate_plus_100, udf_string="raw udf source" + ), ) @@ -108,8 +110,9 @@ def similarity_feature_view( name=similarity.__name__, sources=sources, schema=[] if infer_features else _fields, - udf=similarity, - udf_string="similarity raw udf", + transformation=OnDemandPandasTransformation( + udf=similarity, udf_string="similarity raw udf" + ), ) diff --git a/sdk/python/tests/unit/test_on_demand_feature_view.py b/sdk/python/tests/unit/test_on_demand_feature_view.py index ca8e7b25cb..721026ea46 100644 --- a/sdk/python/tests/unit/test_on_demand_feature_view.py +++ b/sdk/python/tests/unit/test_on_demand_feature_view.py @@ -17,7 +17,10 @@ from feast.feature_view import FeatureView from feast.field import Field from feast.infra.offline_stores.file_source import FileSource -from feast.on_demand_feature_view import OnDemandFeatureView +from feast.on_demand_feature_view import ( + OnDemandFeatureView, + OnDemandPandasTransformation, +) from feast.types import Float32 @@ -54,8 +57,9 @@ def test_hash(): Field(name="output1", dtype=Float32), Field(name="output2", dtype=Float32), ], - udf=udf1, - udf_string="udf1 source code", + transformation=OnDemandPandasTransformation( + udf=udf1, udf_string="udf1 source code" + ), ) on_demand_feature_view_2 = OnDemandFeatureView( name="my-on-demand-feature-view", @@ -64,8 +68,9 @@ def test_hash(): Field(name="output1", dtype=Float32), Field(name="output2", dtype=Float32), ], - udf=udf1, - udf_string="udf1 source code", + transformation=OnDemandPandasTransformation( + udf=udf1, udf_string="udf1 source code" + ), ) on_demand_feature_view_3 = OnDemandFeatureView( name="my-on-demand-feature-view", @@ -74,10 +79,23 @@ def test_hash(): Field(name="output1", dtype=Float32), Field(name="output2", dtype=Float32), ], - udf=udf2, - udf_string="udf2 source code", + transformation=OnDemandPandasTransformation( + udf=udf2, udf_string="udf2 source code" + ), ) on_demand_feature_view_4 = OnDemandFeatureView( + name="my-on-demand-feature-view", + sources=sources, + schema=[ + Field(name="output1", dtype=Float32), + Field(name="output2", dtype=Float32), + ], + transformation=OnDemandPandasTransformation( + udf=udf2, udf_string="udf2 source code" + ), + description="test", + ) + on_demand_feature_view_5 = OnDemandFeatureView( name="my-on-demand-feature-view", sources=sources, schema=[ @@ -105,3 +123,7 @@ def test_hash(): on_demand_feature_view_4, } assert len(s4) == 3 + + assert on_demand_feature_view_5.transformation == OnDemandPandasTransformation( + udf2, "udf2 source code" + )