Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor tag methods to infer created, deleted, and kept repo objects #2142

Merged
merged 7 commits into from
Dec 15, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 76 additions & 0 deletions sdk/python/feast/diff/FcoDiff.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
from dataclasses import dataclass
from enum import Enum
from typing import Any, Iterable, List, Set, Tuple, TypeVar

from feast.base_feature_view import BaseFeatureView
from feast.entity import Entity
from feast.feature_service import FeatureService
from feast.feature_table import FeatureTable
from feast.protos.feast.core.Entity_pb2 import Entity as EntityProto
from feast.protos.feast.core.FeatureView_pb2 import FeatureView as FeatureViewProto


@dataclass
class PropertyDiff:
property_name: str
val_existing: str
val_declared: str


class TransitionType(Enum):
UNKNOWN = 0
CREATE = 1
DELETE = 2
UPDATE = 3
UNCHANGED = 4


@dataclass
class FcoDiff:
current_fco: Any
new_fco: Any
fco_property_diffs: List[PropertyDiff]
transition_type: TransitionType


@dataclass
class RegistryDiff:
fco_diffs: List[FcoDiff]

def __init__(self):
self.fco_diffs = []

def add_fco_diff(self, fco_diff: FcoDiff):
self.fco_diffs.append(fco_diff)


T = TypeVar("T", Entity, BaseFeatureView, FeatureService, FeatureTable)


def tag_objects_for_keep_delete_add(
existing_objs: Iterable[T], desired_objs: Iterable[T]
) -> Tuple[Set[T], Set[T], Set[T]]:
existing_obj_names = {e.name for e in existing_objs}
desired_obj_names = {e.name for e in desired_objs}

objs_to_add = {e for e in desired_objs if e.name not in existing_obj_names}
objs_to_keep = {e for e in desired_objs if e.name in existing_obj_names}
objs_to_delete = {e for e in existing_objs if e.name not in desired_obj_names}

return objs_to_keep, objs_to_delete, objs_to_add


U = TypeVar("U", EntityProto, FeatureViewProto)


def tag_proto_objects_for_keep_delete_add(
existing_objs: Iterable[U], desired_objs: Iterable[U]
) -> Tuple[Iterable[U], Iterable[U], Iterable[U]]:
existing_obj_names = {e.spec.name for e in existing_objs}
desired_obj_names = {e.spec.name for e in desired_objs}

objs_to_add = [e for e in desired_objs if e.spec.name not in existing_obj_names]
objs_to_keep = [e for e in desired_objs if e.spec.name in existing_obj_names]
objs_to_delete = [e for e in existing_objs if e.spec.name not in desired_obj_names]

return objs_to_keep, objs_to_delete, objs_to_add
Empty file.
34 changes: 18 additions & 16 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
from feast.feature_service import FeatureService
from feast.feature_table import FeatureTable
from feast.feature_view import (
DUMMY_ENTITY,
DUMMY_ENTITY_ID,
DUMMY_ENTITY_NAME,
DUMMY_ENTITY_VAL,
Expand All @@ -61,7 +62,6 @@
from feast.request_feature_view import RequestFeatureView
from feast.type_map import python_value_to_proto_value
from feast.usage import log_exceptions, log_exceptions_and_usage, set_usage_attribute
from feast.value_type import ValueType
from feast.version import get_version

warnings.simplefilter("once", DeprecationWarning)
Expand Down Expand Up @@ -379,16 +379,18 @@ def apply(
]
],
],
objects_to_delete: List[
Union[
FeatureView,
OnDemandFeatureView,
RequestFeatureView,
Entity,
FeatureService,
FeatureTable,
objects_to_delete: Optional[
List[
Union[
FeatureView,
OnDemandFeatureView,
RequestFeatureView,
Entity,
FeatureService,
FeatureTable,
]
]
] = [],
] = None,
partial: bool = True,
):
"""Register objects to metadata store and update related infrastructure.
Expand Down Expand Up @@ -435,6 +437,9 @@ def apply(

assert isinstance(objects, list)

if not objects_to_delete:
objects_to_delete = []

# Separate all objects into entities, feature services, and different feature view types.
entities_to_update = [ob for ob in objects if isinstance(ob, Entity)]
views_to_update = [ob for ob in objects if isinstance(ob, FeatureView)]
Expand Down Expand Up @@ -484,11 +489,6 @@ def apply(
odfv.infer_features()

# Handle all entityless feature views by using DUMMY_ENTITY as a placeholder entity.
DUMMY_ENTITY = Entity(
name=DUMMY_ENTITY_NAME,
join_key=DUMMY_ENTITY_ID,
value_type=ValueType.INT32,
)
entities_to_update.append(DUMMY_ENTITY)

# Add all objects to the registry and update the provider's infrastructure.
Expand Down Expand Up @@ -1560,7 +1560,9 @@ def _validate_feature_views(feature_views: List[BaseFeatureView]):
case_insensitive_fv_name = fv.name.lower()
if case_insensitive_fv_name in fv_names:
raise ValueError(
f"More than one feature view with name {case_insensitive_fv_name} found. Please ensure that all feature view names are case-insensitively unique. It may be necessary to ignore certain files in your feature repository by using a .feastignore file."
f"More than one feature view with name {case_insensitive_fv_name} found. "
f"Please ensure that all feature view names are case-insensitively unique. "
f"It may be necessary to ignore certain files in your feature repository by using a .feastignore file."
)
else:
fv_names.add(case_insensitive_fv_name)
4 changes: 4 additions & 0 deletions sdk/python/feast/feature_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from feast import utils
from feast.base_feature_view import BaseFeatureView
from feast.data_source import DataSource
from feast.entity import Entity
from feast.feature import Feature
from feast.feature_view_projection import FeatureViewProjection
from feast.protos.feast.core.FeatureView_pb2 import FeatureView as FeatureViewProto
Expand All @@ -42,6 +43,9 @@
DUMMY_ENTITY_ID = "__dummy_id"
DUMMY_ENTITY_NAME = "__dummy"
DUMMY_ENTITY_VAL = ""
DUMMY_ENTITY = Entity(
name=DUMMY_ENTITY_NAME, join_key=DUMMY_ENTITY_ID, value_type=ValueType.INT32,
)


class FeatureView(BaseFeatureView):
Expand Down
67 changes: 54 additions & 13 deletions sdk/python/feast/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging
from collections import defaultdict
from datetime import datetime, timedelta
from pathlib import Path
Expand All @@ -24,6 +24,12 @@

from feast import importer
from feast.base_feature_view import BaseFeatureView
from feast.diff.FcoDiff import (
FcoDiff,
RegistryDiff,
TransitionType,
tag_proto_objects_for_keep_delete_add,
)
from feast.entity import Entity
from feast.errors import (
ConflictingFeatureViewNames,
Expand Down Expand Up @@ -57,6 +63,8 @@
"": "LocalRegistryStore",
}

logger = logging.getLogger(__name__)


def get_registry_store_class_from_type(registry_store_type: str):
if not registry_store_type.endswith("RegistryStore"):
Expand Down Expand Up @@ -95,7 +103,9 @@ class Registry:
cached_registry_proto_ttl: timedelta
cache_being_updated: bool = False

def __init__(self, registry_config: RegistryConfig, repo_path: Path):
def __init__(
self, registry_config: Optional[RegistryConfig], repo_path: Optional[Path]
):
"""
Create the Registry object.

Expand All @@ -104,20 +114,50 @@ def __init__(self, registry_config: RegistryConfig, repo_path: Path):
repo_path: Path to the base of the Feast repository
or where it will be created if it does not exist yet.
"""
registry_store_type = registry_config.registry_store_type
registry_path = registry_config.path
if registry_store_type is None:
cls = get_registry_store_class_from_scheme(registry_path)
else:
cls = get_registry_store_class_from_type(str(registry_store_type))

self._registry_store = cls(registry_config, repo_path)
self.cached_registry_proto_ttl = timedelta(
seconds=registry_config.cache_ttl_seconds
if registry_config.cache_ttl_seconds is not None
else 0
if registry_config:
registry_store_type = registry_config.registry_store_type
registry_path = registry_config.path
if registry_store_type is None:
cls = get_registry_store_class_from_scheme(registry_path)
else:
cls = get_registry_store_class_from_type(str(registry_store_type))

self._registry_store = cls(registry_config, repo_path)
self.cached_registry_proto_ttl = timedelta(
seconds=registry_config.cache_ttl_seconds
if registry_config.cache_ttl_seconds is not None
else 0
)

# TODO(achals): This method needs to be filled out and used in the feast plan/apply methods.
@staticmethod
def diff_between(
current_registry: RegistryProto, new_registry: RegistryProto
) -> RegistryDiff:
diff = RegistryDiff()

# Handle Entities
(
entities_to_keep,
entities_to_delete,
entities_to_add,
) = tag_proto_objects_for_keep_delete_add(
current_registry.entities, new_registry.entities,
)

for e in entities_to_add:
diff.add_fco_diff(FcoDiff(None, e, [], TransitionType.CREATE))
for e in entities_to_delete:
diff.add_fco_diff(FcoDiff(e, None, [], TransitionType.DELETE))

# Handle Feature Views
# Handle On Demand Feature Views
# Handle Request Feature Views
# Handle Feature Services
logger.info(f"Diff: {diff}")
return diff

def _initialize_registry(self):
"""Explicitly initializes the registry with an empty proto if it doesn't exist."""
try:
Expand Down Expand Up @@ -752,6 +792,7 @@ def _get_registry_proto(self, allow_cache: bool = False) -> RegistryProto:
> (self.cached_registry_proto_created + self.cached_registry_proto_ttl)
)
)

if allow_cache and (not expired or self.cache_being_updated):
assert isinstance(self.cached_registry_proto, RegistryProto)
return self.cached_registry_proto
Expand Down
Loading