From 1d923f3fb4e8492b5822e91cd2693770fcf94e41 Mon Sep 17 00:00:00 2001 From: tokoko Date: Tue, 2 Jul 2024 13:11:59 +0000 Subject: [PATCH] extend get_feature_view to return stream and on demand views Signed-off-by: tokoko --- protos/feast/registry/RegistryServer.proto | 10 +++++- sdk/python/feast/cli.py | 4 ++- sdk/python/feast/feature_logging.py | 32 +++++-------------- sdk/python/feast/feature_store.py | 19 ++++++++--- .../infra/materialization/kubernetes/main.py | 7 ++-- .../feast/infra/registry/base_registry.py | 2 +- .../feast/infra/registry/caching_registry.py | 5 +-- .../infra/registry/proto_registry_utils.py | 15 +++++++++ sdk/python/feast/infra/registry/registry.py | 2 +- sdk/python/feast/infra/registry/remote.py | 14 ++++++-- sdk/python/feast/infra/registry/snowflake.py | 30 +++++++++++++++-- sdk/python/feast/infra/registry/sql.py | 31 ++++++++++++++++-- sdk/python/feast/offline_server.py | 2 ++ sdk/python/feast/registry_server.py | 23 +++++++++++-- .../registration/test_universal_registry.py | 7 ++++ sdk/python/tests/utils/test_log_creator.py | 17 +++++----- 16 files changed, 163 insertions(+), 57 deletions(-) diff --git a/protos/feast/registry/RegistryServer.proto b/protos/feast/registry/RegistryServer.proto index 44529f5409..2bc9707fa0 100644 --- a/protos/feast/registry/RegistryServer.proto +++ b/protos/feast/registry/RegistryServer.proto @@ -30,7 +30,7 @@ service RegistryServer{ // FeatureView RPCs rpc ApplyFeatureView (ApplyFeatureViewRequest) returns (google.protobuf.Empty) {} - rpc GetFeatureView (GetFeatureViewRequest) returns (feast.core.FeatureView) {} + rpc GetFeatureView (GetFeatureViewRequest) returns (GetFeatureViewResponse) {} rpc ListFeatureViews (ListFeatureViewsRequest) returns (ListFeatureViewsResponse) {} rpc DeleteFeatureView (DeleteFeatureViewRequest) returns (google.protobuf.Empty) {} @@ -178,6 +178,14 @@ message GetFeatureViewRequest { bool allow_cache = 3; } +message GetFeatureViewResponse { + oneof base_feature_view { + feast.core.FeatureView feature_view = 1; + feast.core.OnDemandFeatureView on_demand_feature_view = 2; + feast.core.StreamFeatureView stream_feature_view = 3; + } +} + message ListFeatureViewsRequest { string project = 1; bool allow_cache = 2; diff --git a/sdk/python/feast/cli.py b/sdk/python/feast/cli.py index f0655c40f2..c32b97e355 100644 --- a/sdk/python/feast/cli.py +++ b/sdk/python/feast/cli.py @@ -401,7 +401,9 @@ def feature_view_list(ctx: click.Context, tags: list[str]): entities.update(feature_view.entities) elif isinstance(feature_view, OnDemandFeatureView): for backing_fv in feature_view.source_feature_view_projections.values(): - entities.update(store.get_feature_view(backing_fv.name).entities) + backing_feature_view = store.get_feature_view(backing_fv.name) + assert isinstance(backing_feature_view, FeatureView) + entities.update(backing_feature_view.entities) table.append( [ feature_view.name, diff --git a/sdk/python/feast/feature_logging.py b/sdk/python/feast/feature_logging.py index 2843f87121..a7a7de95ea 100644 --- a/sdk/python/feast/feature_logging.py +++ b/sdk/python/feast/feature_logging.py @@ -6,12 +6,8 @@ from feast.data_source import DataSource from feast.embedded_go.type_map import FEAST_TYPE_TO_ARROW_TYPE, PA_TIMESTAMP_TYPE -from feast.errors import ( - FeastObjectNotFoundException, - FeatureViewNotFoundException, - OnDemandFeatureViewNotFoundException, -) -from feast.feature_view import DUMMY_ENTITY_ID +from feast.feature_view import DUMMY_ENTITY_ID, FeatureView +from feast.on_demand_feature_view import OnDemandFeatureView from feast.protos.feast.core.FeatureService_pb2 import ( LoggingConfig as LoggingConfigProto, ) @@ -57,25 +53,9 @@ def get_schema(self, registry: "BaseRegistry") -> pa.Schema: # Otherwise, some offline stores might not accept parquet files (produced by Go). # Go code can be found here: # https://github.com/feast-dev/feast/blob/master/go/internal/feast/server/logging/memorybuffer.go#L51 - try: - feature_view = registry.get_feature_view(projection.name, self._project) - except FeatureViewNotFoundException: - try: - on_demand_feature_view = registry.get_on_demand_feature_view( - projection.name, self._project - ) - except OnDemandFeatureViewNotFoundException: - raise FeastObjectNotFoundException( - f"Can't recognize feature view with a name {projection.name}" - ) - - for ( - request_source - ) in on_demand_feature_view.source_request_sources.values(): - for field in request_source.schema: - fields[field.name] = FEAST_TYPE_TO_ARROW_TYPE[field.dtype] + feature_view = registry.get_feature_view(projection.name, self._project) - else: + if isinstance(feature_view, FeatureView): for entity_column in feature_view.entity_columns: if entity_column.name == DUMMY_ENTITY_ID: continue @@ -84,6 +64,10 @@ def get_schema(self, registry: "BaseRegistry") -> pa.Schema: entity_column.name, entity_column.name ) fields[join_key] = FEAST_TYPE_TO_ARROW_TYPE[entity_column.dtype] + elif isinstance(feature_view, OnDemandFeatureView): + for request_source in feature_view.source_request_sources.values(): + for field in request_source.schema: + fields[field.name] = FEAST_TYPE_TO_ARROW_TYPE[field.dtype] for feature in projection.features: fields[f"{projection.name_to_use()}__{feature.name}"] = ( diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index 9600732e17..704e31f932 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -456,7 +456,7 @@ def get_feature_service( def get_feature_view( self, name: str, allow_registry_cache: bool = False - ) -> FeatureView: + ) -> BaseFeatureView: """ Retrieves a feature view. @@ -477,11 +477,15 @@ def _get_feature_view( name: str, hide_dummy_entity: bool = True, allow_registry_cache: bool = False, - ) -> FeatureView: + ) -> BaseFeatureView: feature_view = self._registry.get_feature_view( name, self.project, allow_cache=allow_registry_cache ) - if hide_dummy_entity and feature_view.entities[0] == DUMMY_ENTITY_NAME: + if ( + isinstance(feature_view, FeatureView) + and hide_dummy_entity + and feature_view.entities[0] == DUMMY_ENTITY_NAME + ): feature_view.entities = [] return feature_view @@ -686,6 +690,7 @@ def _get_feature_views_to_materialize( name, hide_dummy_entity=False ) + assert isinstance(feature_view, FeatureView) if not feature_view.online: raise ValueError( f"FeatureView {feature_view.name} is not configured to be served online." @@ -1450,9 +1455,11 @@ def write_to_online_store( feature_view_name, allow_registry_cache=allow_registry_cache ) except FeatureViewNotFoundException: - feature_view = self.get_feature_view( + fv = self.get_feature_view( feature_view_name, allow_registry_cache=allow_registry_cache ) + assert isinstance(fv, FeatureView) + feature_view = fv if df is not None and inputs is not None: raise ValueError("Both df and inputs cannot be provided at the same time.") if df is None and inputs is not None: @@ -1487,9 +1494,11 @@ def write_to_offline_store( feature_view_name, allow_registry_cache=allow_registry_cache ) except FeatureViewNotFoundException: - feature_view = self.get_feature_view( + fv = self.get_feature_view( feature_view_name, allow_registry_cache=allow_registry_cache ) + assert isinstance(fv, FeatureView) + feature_view = fv # Get columns of the batch source and the input dataframe. column_names_and_types = ( diff --git a/sdk/python/feast/infra/materialization/kubernetes/main.py b/sdk/python/feast/infra/materialization/kubernetes/main.py index d80cad3edb..e49efc4a8b 100644 --- a/sdk/python/feast/infra/materialization/kubernetes/main.py +++ b/sdk/python/feast/infra/materialization/kubernetes/main.py @@ -75,11 +75,12 @@ def run(self): config = RepoConfig(**feast_config) store = FeatureStore(config=config) + fv = store.get_feature_view(materialization_cfg["feature_view"]) + assert isinstance(fv, FeatureView) + KubernetesMaterializer( config=config, - feature_view=store.get_feature_view( - materialization_cfg["feature_view"] - ), + feature_view=fv, paths=materialization_cfg["paths"], worker_index=int(os.environ["JOB_COMPLETION_INDEX"]), ).run() diff --git a/sdk/python/feast/infra/registry/base_registry.py b/sdk/python/feast/infra/registry/base_registry.py index 03bec64830..847e9acd43 100644 --- a/sdk/python/feast/infra/registry/base_registry.py +++ b/sdk/python/feast/infra/registry/base_registry.py @@ -352,7 +352,7 @@ def list_on_demand_feature_views( @abstractmethod def get_feature_view( self, name: str, project: str, allow_cache: bool = False - ) -> FeatureView: + ) -> BaseFeatureView: """ Retrieves a feature view. diff --git a/sdk/python/feast/infra/registry/caching_registry.py b/sdk/python/feast/infra/registry/caching_registry.py index f7eab7d70a..209fc24abb 100644 --- a/sdk/python/feast/infra/registry/caching_registry.py +++ b/sdk/python/feast/infra/registry/caching_registry.py @@ -4,6 +4,7 @@ from threading import Lock from typing import List, Optional +from feast.base_feature_view import BaseFeatureView from feast.data_source import DataSource from feast.entity import Entity from feast.feature_service import FeatureService @@ -99,12 +100,12 @@ def list_entities( return self._list_entities(project, tags) @abstractmethod - def _get_feature_view(self, name: str, project: str) -> FeatureView: + def _get_feature_view(self, name: str, project: str) -> BaseFeatureView: pass def get_feature_view( self, name: str, project: str, allow_cache: bool = False - ) -> FeatureView: + ) -> BaseFeatureView: if allow_cache: self._refresh_cached_registry_if_necessary() return proto_registry_utils.get_feature_view( diff --git a/sdk/python/feast/infra/registry/proto_registry_utils.py b/sdk/python/feast/infra/registry/proto_registry_utils.py index 0e85f5b0a9..35693b4dd7 100644 --- a/sdk/python/feast/infra/registry/proto_registry_utils.py +++ b/sdk/python/feast/infra/registry/proto_registry_utils.py @@ -106,6 +106,21 @@ def get_feature_view( and feature_view_proto.spec.project == project ): return FeatureView.from_proto(feature_view_proto) + + for feature_view_proto in registry_proto.stream_feature_views: + if ( + feature_view_proto.spec.name == name + and feature_view_proto.spec.project == project + ): + return StreamFeatureView.from_proto(feature_view_proto) + + for on_demand_feature_view in registry_proto.on_demand_feature_views: + if ( + on_demand_feature_view.spec.project == project + and on_demand_feature_view.spec.name == name + ): + return OnDemandFeatureView.from_proto(on_demand_feature_view) + raise FeatureViewNotFoundException(name, project) diff --git a/sdk/python/feast/infra/registry/registry.py b/sdk/python/feast/infra/registry/registry.py index fe44e6253a..2b31f76714 100644 --- a/sdk/python/feast/infra/registry/registry.py +++ b/sdk/python/feast/infra/registry/registry.py @@ -566,7 +566,7 @@ def list_feature_views( def get_feature_view( self, name: str, project: str, allow_cache: bool = False - ) -> FeatureView: + ) -> BaseFeatureView: registry_proto = self._get_registry_proto( project=project, allow_cache=allow_cache ) diff --git a/sdk/python/feast/infra/registry/remote.py b/sdk/python/feast/infra/registry/remote.py index 9fa6d8ebee..c11ea89f47 100644 --- a/sdk/python/feast/infra/registry/remote.py +++ b/sdk/python/feast/infra/registry/remote.py @@ -257,14 +257,24 @@ def list_on_demand_feature_views( def get_feature_view( self, name: str, project: str, allow_cache: bool = False - ) -> FeatureView: + ) -> BaseFeatureView: request = RegistryServer_pb2.GetFeatureViewRequest( name=name, project=project, allow_cache=allow_cache ) response = self.stub.GetFeatureView(request) - return FeatureView.from_proto(response) + feature_view_type = response.WhichOneof("base_feature_view") + if feature_view_type == "feature_view": + feature_view = FeatureView.from_proto(response.feature_view) + elif feature_view_type == "on_demand_feature_view": + feature_view = OnDemandFeatureView.from_proto( + response.on_demand_feature_view + ) + elif feature_view_type == "stream_feature_view": + feature_view = StreamFeatureView.from_proto(response.stream_feature_view) + + return feature_view def list_feature_views( self, diff --git a/sdk/python/feast/infra/registry/snowflake.py b/sdk/python/feast/infra/registry/snowflake.py index f2bc09e7e4..431fb676c2 100644 --- a/sdk/python/feast/infra/registry/snowflake.py +++ b/sdk/python/feast/infra/registry/snowflake.py @@ -476,13 +476,13 @@ def get_feature_service( def get_feature_view( self, name: str, project: str, allow_cache: bool = False - ) -> FeatureView: + ) -> BaseFeatureView: if allow_cache: self._refresh_cached_registry_if_necessary() return proto_registry_utils.get_feature_view( self.cached_registry_proto, name, project ) - return self._get_object( + fv = self._get_object( "FEATURE_VIEWS", name, project, @@ -490,9 +490,33 @@ def get_feature_view( FeatureView, "FEATURE_VIEW_NAME", "FEATURE_VIEW_PROTO", - FeatureViewNotFoundException, + None, ) + if not fv: + fv = self._get_object( + "STREAM_FEATURE_VIEWS", + name, + project, + StreamFeatureViewProto, + StreamFeatureView, + "STREAM_FEATURE_VIEW_NAME", + "STREAM_FEATURE_VIEW_PROTO", + None, + ) + if not fv: + fv = self._get_object( + "ON_DEMAND_FEATURE_VIEWS", + name, + project, + OnDemandFeatureViewProto, + OnDemandFeatureView, + "ON_DEMAND_FEATURE_VIEW_NAME", + "ON_DEMAND_FEATURE_VIEW_PROTO", + FeatureViewNotFoundException, + ) + return fv + def get_infra(self, project: str, allow_cache: bool = False) -> Infra: infra_object = self._get_object( "MANAGED_INFRA", diff --git a/sdk/python/feast/infra/registry/sql.py b/sdk/python/feast/infra/registry/sql.py index 6ef08989b7..e396099f44 100644 --- a/sdk/python/feast/infra/registry/sql.py +++ b/sdk/python/feast/infra/registry/sql.py @@ -255,8 +255,8 @@ def _get_entity(self, name: str, project: str) -> Entity: not_found_exception=EntityNotFoundException, ) - def _get_feature_view(self, name: str, project: str) -> FeatureView: - return self._get_object( + def _get_feature_view(self, name: str, project: str) -> BaseFeatureView: + fv = self._get_object( table=feature_views, name=name, project=project, @@ -264,9 +264,34 @@ def _get_feature_view(self, name: str, project: str) -> FeatureView: python_class=FeatureView, id_field_name="feature_view_name", proto_field_name="feature_view_proto", - not_found_exception=FeatureViewNotFoundException, + not_found_exception=None, ) + if not fv: + fv = self._get_object( + table=on_demand_feature_views, + name=name, + project=project, + proto_class=OnDemandFeatureViewProto, + python_class=OnDemandFeatureView, + id_field_name="feature_view_name", + proto_field_name="feature_view_proto", + not_found_exception=None, + ) + + if not fv: + fv = self._get_object( + table=stream_feature_views, + name=name, + project=project, + proto_class=StreamFeatureViewProto, + python_class=StreamFeatureView, + id_field_name="feature_view_name", + proto_field_name="feature_view_proto", + not_found_exception=FeatureViewNotFoundException, + ) + return fv + def _get_on_demand_feature_view( self, name: str, project: str ) -> OnDemandFeatureView: diff --git a/sdk/python/feast/offline_server.py b/sdk/python/feast/offline_server.py index be92620d68..aff4f6d960 100644 --- a/sdk/python/feast/offline_server.py +++ b/sdk/python/feast/offline_server.py @@ -127,6 +127,8 @@ def get_feature_view_by_name( f"Found matching FeatureService {fs.name} with projection {p}" ) fv = fv.with_projection(p) + + assert isinstance(fv, FeatureView) return fv except Exception: try: diff --git a/sdk/python/feast/registry_server.py b/sdk/python/feast/registry_server.py index 4a96ba76a8..5842587f09 100644 --- a/sdk/python/feast/registry_server.py +++ b/sdk/python/feast/registry_server.py @@ -94,9 +94,28 @@ def DeleteDataSource( def GetFeatureView( self, request: RegistryServer_pb2.GetFeatureViewRequest, context ): - return self.proxied_registry.get_feature_view( + feature_view = self.proxied_registry.get_feature_view( name=request.name, project=request.project, allow_cache=request.allow_cache - ).to_proto() + ) + + if isinstance(feature_view, StreamFeatureView): + arg_name = "stream_feature_view" + elif isinstance(feature_view, FeatureView): + arg_name = "feature_view" + elif isinstance(feature_view, OnDemandFeatureView): + arg_name = "on_demand_feature_view" + + return RegistryServer_pb2.GetFeatureViewResponse( + feature_view=feature_view.to_proto() + if arg_name == "feature_view" + else None, + stream_feature_view=feature_view.to_proto() + if arg_name == "stream_feature_view" + else None, + on_demand_feature_view=feature_view.to_proto() + if arg_name == "on_demand_feature_view" + else None, + ) def ApplyFeatureView( self, request: RegistryServer_pb2.ApplyFeatureViewRequest, context diff --git a/sdk/python/tests/integration/registration/test_universal_registry.py b/sdk/python/tests/integration/registration/test_universal_registry.py index c06ccf2d4d..1a6bc8fbd5 100644 --- a/sdk/python/tests/integration/registration/test_universal_registry.py +++ b/sdk/python/tests/integration/registration/test_universal_registry.py @@ -868,6 +868,13 @@ def odfv1(feature_df: pd.DataFrame) -> pd.DataFrame: existing_sfv = test_registry.get_stream_feature_view( "test kafka stream feature view", project ) + + existing_sfv_2 = test_registry.get_feature_view( + "test kafka stream feature view", project + ) + + assert existing_sfv == existing_sfv_2 + # Apply the modified sfv test_registry.apply_feature_view(sfv, project) diff --git a/sdk/python/tests/utils/test_log_creator.py b/sdk/python/tests/utils/test_log_creator.py index f072f4c886..50755ecb03 100644 --- a/sdk/python/tests/utils/test_log_creator.py +++ b/sdk/python/tests/utils/test_log_creator.py @@ -10,8 +10,8 @@ import pyarrow from feast import FeatureService, FeatureStore, FeatureView -from feast.errors import FeatureViewNotFoundException from feast.feature_logging import LOG_DATE_FIELD, LOG_TIMESTAMP_FIELD, REQUEST_ID_FIELD +from feast.on_demand_feature_view import OnDemandFeatureView from feast.protos.feast.serving.ServingService_pb2 import FieldStatus from feast.utils import _utc_now @@ -84,17 +84,16 @@ def prepare_logs( logs_df[LOG_DATE_FIELD] = logs_df[LOG_TIMESTAMP_FIELD].dt.date for projection in feature_service.feature_view_projections: - try: - view = store.get_feature_view(projection.name) - except FeatureViewNotFoundException: - view = store.get_on_demand_feature_view(projection.name) - for source in view.source_request_sources.values(): - for field in source.schema: - logs_df[field.name] = source_df[field.name] - else: + view = store.get_feature_view(projection.name) + + if isinstance(view, FeatureView): for entity_name in view.entities: entity = store.get_entity(entity_name) logs_df[entity.join_key] = source_df[entity.join_key] + elif isinstance(view, OnDemandFeatureView): + for source in view.source_request_sources.values(): + for field in source.schema: + logs_df[field.name] = source_df[field.name] for feature in projection.features: source_field = (