Skip to content

Commit

Permalink
feat: Add registry methods for dealing with all FV types (feast-dev#4435
Browse files Browse the repository at this point in the history
)

* add new registry method for working with any fv type

Signed-off-by: tokoko <togurgenidze@gmail.com>

* fix: different project for each test in test_universal_registry

Signed-off-by: tokoko <togurgenidze@gmail.com>

* revert project names to project in test_universal_registry

Signed-off-by: tokoko <togurgenidze@gmail.com>

* remove print statements from test_universal_registry

Signed-off-by: tokoko <togurgenidze@gmail.com>

---------

Signed-off-by: tokoko <togurgenidze@gmail.com>
  • Loading branch information
tokoko authored Sep 6, 2024
1 parent da24656 commit ac381b2
Show file tree
Hide file tree
Showing 14 changed files with 446 additions and 62 deletions.
35 changes: 34 additions & 1 deletion protos/feast/registry/RegistryServer.proto
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,13 @@ service RegistryServer{

// FeatureView RPCs
rpc ApplyFeatureView (ApplyFeatureViewRequest) returns (google.protobuf.Empty) {}
rpc DeleteFeatureView (DeleteFeatureViewRequest) returns (google.protobuf.Empty) {}
rpc GetAnyFeatureView (GetAnyFeatureViewRequest) returns (GetAnyFeatureViewResponse) {}
rpc ListAllFeatureViews (ListAllFeatureViewsRequest) returns (ListAllFeatureViewsResponse) {}

// plain FeatureView RPCs
rpc GetFeatureView (GetFeatureViewRequest) returns (feast.core.FeatureView) {}
rpc ListFeatureViews (ListFeatureViewsRequest) returns (ListFeatureViewsResponse) {}
rpc DeleteFeatureView (DeleteFeatureViewRequest) returns (google.protobuf.Empty) {}

// StreamFeatureView RPCs
rpc GetStreamFeatureView (GetStreamFeatureViewRequest) returns (feast.core.StreamFeatureView) {}
Expand Down Expand Up @@ -208,6 +212,35 @@ message DeleteFeatureViewRequest {
bool commit = 3;
}

message AnyFeatureView {
oneof any_feature_view {
feast.core.FeatureView feature_view = 1;
feast.core.OnDemandFeatureView on_demand_feature_view = 2;
feast.core.StreamFeatureView stream_feature_view = 3;
}
}

message GetAnyFeatureViewRequest {
string name = 1;
string project = 2;
bool allow_cache = 3;
}

message GetAnyFeatureViewResponse {
AnyFeatureView any_feature_view = 1;
}

message ListAllFeatureViewsRequest {
string project = 1;
bool allow_cache = 2;
map<string,string> tags = 3;
}

message ListAllFeatureViewsResponse {
repeated AnyFeatureView feature_views = 1;
}


// StreamFeatureView

message GetStreamFeatureViewRequest {
Expand Down
5 changes: 2 additions & 3 deletions sdk/python/feast/cli_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ def handle_fv_verbose_permissions_command(
tags=tags_filter # type: ignore[assignment]
)
for fv in feature_views:
if p.match_resource(fv):
if p.match_resource(fv): # type: ignore[arg-type]
feature_views_names.add(fv.name)
if len(feature_views_names) > 0:
Node(
Expand Down Expand Up @@ -207,8 +207,7 @@ def handle_not_verbose_permissions_command(
def fetch_all_feast_objects(store: FeatureStore) -> list[FeastObject]:
objects: list[FeastObject] = []
objects.extend(store.list_entities())
objects.extend(store.list_all_feature_views())
objects.extend(store.list_batch_feature_views())
objects.extend(store.list_all_feature_views()) # type: ignore[arg-type]
objects.extend(store.list_feature_services())
objects.extend(store.list_data_sources())
objects.extend(store.list_validation_references())
Expand Down
62 changes: 18 additions & 44 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import itertools
import logging
import os
import warnings
from datetime import datetime, timedelta
Expand Down Expand Up @@ -247,9 +246,26 @@ def list_feature_services(
"""
return self._registry.list_feature_services(self.project, tags=tags)

def _list_all_feature_views(
self, allow_cache: bool = False, tags: Optional[dict[str, str]] = None
) -> List[BaseFeatureView]:
feature_views = []
for fv in self.registry.list_all_feature_views(
self.project, allow_cache=allow_cache, tags=tags
):
if (
isinstance(fv, FeatureView)
and fv.entities
and fv.entities[0] == DUMMY_ENTITY_NAME
):
fv.entities = []
fv.entity_columns = []
feature_views.append(fv)
return feature_views

def list_all_feature_views(
self, allow_cache: bool = False, tags: Optional[dict[str, str]] = None
) -> List[Union[FeatureView, StreamFeatureView, OnDemandFeatureView]]:
) -> List[BaseFeatureView]:
"""
Retrieves the list of feature views from the registry.
Expand All @@ -274,10 +290,6 @@ def list_feature_views(
Returns:
A list of feature views.
"""
logging.warning(
"list_feature_views will make breaking changes. Please use list_batch_feature_views instead. "
"list_feature_views will behave like list_all_feature_views in the future."
)
return utils._list_feature_views(
self._registry, self.project, allow_cache, tags=tags
)
Expand All @@ -297,44 +309,6 @@ def list_batch_feature_views(
"""
return self._list_batch_feature_views(allow_cache=allow_cache, tags=tags)

def _list_all_feature_views(
self,
allow_cache: bool = False,
tags: Optional[dict[str, str]] = None,
) -> List[Union[FeatureView, StreamFeatureView, OnDemandFeatureView]]:
all_feature_views = (
utils._list_feature_views(
self._registry, self.project, allow_cache, tags=tags
)
+ self._list_stream_feature_views(allow_cache, tags=tags)
+ self.list_on_demand_feature_views(allow_cache, tags=tags)
)
return all_feature_views

def _list_feature_views(
self,
allow_cache: bool = False,
hide_dummy_entity: bool = True,
tags: Optional[dict[str, str]] = None,
) -> List[FeatureView]:
logging.warning(
"_list_feature_views will make breaking changes. Please use _list_batch_feature_views instead. "
"_list_feature_views will behave like _list_all_feature_views in the future."
)
feature_views = []
for fv in self._registry.list_feature_views(
self.project, allow_cache=allow_cache, tags=tags
):
if (
hide_dummy_entity
and fv.entities
and fv.entities[0] == DUMMY_ENTITY_NAME
):
fv.entities = []
fv.entity_columns = []
feature_views.append(fv)
return feature_views

def _list_batch_feature_views(
self,
allow_cache: bool = False,
Expand Down
38 changes: 38 additions & 0 deletions sdk/python/feast/infra/registry/base_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,44 @@ def list_feature_views(
"""
raise NotImplementedError

@abstractmethod
def get_any_feature_view(
self, name: str, project: str, allow_cache: bool = False
) -> BaseFeatureView:
"""
Retrieves a feature view of any type.
Args:
name: Name of feature view
project: Feast project that this feature view belongs to
allow_cache: Allow returning feature view from the cached registry
Returns:
Returns either the specified feature view, or raises an exception if
none is found
"""
raise NotImplementedError

@abstractmethod
def list_all_feature_views(
self,
project: str,
allow_cache: bool = False,
tags: Optional[dict[str, str]] = None,
) -> List[BaseFeatureView]:
"""
Retrieve a list of feature views of all types from the registry
Args:
allow_cache: Allow returning feature views from the cached registry
project: Filter feature views based on project name
tags: Filter by tags
Returns:
List of feature views
"""
raise NotImplementedError

@abstractmethod
def apply_materialization(
self,
Expand Down
34 changes: 34 additions & 0 deletions sdk/python/feast/infra/registry/caching_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from threading import Lock
from typing import List, Optional

from feast.base_feature_view import BaseFeatureView
from feast.data_source import DataSource
from feast.entity import Entity
from feast.feature_service import FeatureService
Expand Down Expand Up @@ -102,6 +103,39 @@ def list_entities(
)
return self._list_entities(project, tags)

@abstractmethod
def _get_any_feature_view(self, name: str, project: str) -> BaseFeatureView:
pass

def get_any_feature_view(
self, name: str, project: str, allow_cache: bool = False
) -> BaseFeatureView:
if allow_cache:
self._refresh_cached_registry_if_necessary()
return proto_registry_utils.get_any_feature_view(
self.cached_registry_proto, name, project
)
return self._get_any_feature_view(name, project)

@abstractmethod
def _list_all_feature_views(
self, project: str, tags: Optional[dict[str, str]]
) -> List[BaseFeatureView]:
pass

def list_all_feature_views(
self,
project: str,
allow_cache: bool = False,
tags: Optional[dict[str, str]] = None,
) -> List[BaseFeatureView]:
if allow_cache:
self._refresh_cached_registry_if_necessary()
return proto_registry_utils.list_all_feature_views(
self.cached_registry_proto, project, tags
)
return self._list_all_feature_views(project, tags)

@abstractmethod
def _get_feature_view(self, name: str, project: str) -> FeatureView:
pass
Expand Down
39 changes: 39 additions & 0 deletions sdk/python/feast/infra/registry/proto_registry_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from typing import List, Optional

from feast import utils
from feast.base_feature_view import BaseFeatureView
from feast.data_source import DataSource
from feast.entity import Entity
from feast.errors import (
Expand Down Expand Up @@ -93,6 +94,33 @@ def get_feature_service(
raise FeatureServiceNotFoundException(name, project=project)


def get_any_feature_view(
registry_proto: RegistryProto, name: str, project: str
) -> BaseFeatureView:
for feature_view_proto in registry_proto.feature_views:
if (
feature_view_proto.spec.name == name
and feature_view_proto.spec.project == project
):
return FeatureView.from_proto(feature_view_proto)

for feature_view_proto in registry_proto.stream_feature_views:
if (
feature_view_proto.spec.name == name
and feature_view_proto.spec.project == project
):
return StreamFeatureView.from_proto(feature_view_proto)

for on_demand_feature_view in registry_proto.on_demand_feature_views:
if (
on_demand_feature_view.spec.project == project
and on_demand_feature_view.spec.name == name
):
return OnDemandFeatureView.from_proto(on_demand_feature_view)

raise FeatureViewNotFoundException(name, project)


def get_feature_view(
registry_proto: RegistryProto, name: str, project: str
) -> FeatureView:
Expand Down Expand Up @@ -179,6 +207,17 @@ def list_feature_services(
return feature_services


@registry_proto_cache_with_tags
def list_all_feature_views(
registry_proto: RegistryProto, project: str, tags: Optional[dict[str, str]]
) -> List[BaseFeatureView]:
return (
list_feature_views(registry_proto, project, tags)
+ list_stream_feature_views(registry_proto, project, tags)
+ list_on_demand_feature_views(registry_proto, project, tags)
)


@registry_proto_cache_with_tags
def list_feature_views(
registry_proto: RegistryProto, project: str, tags: Optional[dict[str, str]]
Expand Down
21 changes: 20 additions & 1 deletion sdk/python/feast/infra/registry/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -585,7 +585,26 @@ def apply_materialization(
self.commit()
return

raise FeatureViewNotFoundException(feature_view.name, project)
def list_all_feature_views(
self,
project: str,
allow_cache: bool = False,
tags: Optional[dict[str, str]] = None,
) -> List[BaseFeatureView]:
registry_proto = self._get_registry_proto(
project=project, allow_cache=allow_cache
)
return proto_registry_utils.list_all_feature_views(
registry_proto, project, tags
)

def get_any_feature_view(
self, name: str, project: str, allow_cache: bool = False
) -> BaseFeatureView:
registry_proto = self._get_registry_proto(
project=project, allow_cache=allow_cache
)
return proto_registry_utils.get_any_feature_view(registry_proto, name, project)

def list_feature_views(
self,
Expand Down
Loading

0 comments on commit ac381b2

Please sign in to comment.