Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Modify feature_store.plan to produce an InfraDiff #2211

Merged
merged 8 commits into from
Jan 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 28 additions & 13 deletions sdk/python/feast/diff/FcoDiff.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,38 @@
from dataclasses import dataclass
from typing import Any, Iterable, List, Set, Tuple, TypeVar
from typing import Generic, Iterable, List, Set, Tuple, TypeVar

from feast.base_feature_view import BaseFeatureView
from feast.diff.property_diff import PropertyDiff, TransitionType
from feast.entity import Entity
from feast.feature_service import FeatureService
from feast.protos.feast.core.Entity_pb2 import Entity as EntityProto
from feast.protos.feast.core.FeatureService_pb2 import (
FeatureService as FeatureServiceProto,
)
from feast.protos.feast.core.FeatureView_pb2 import FeatureView as FeatureViewProto
from feast.protos.feast.core.OnDemandFeatureView_pb2 import (
OnDemandFeatureView as OnDemandFeatureViewProto,
)
from feast.protos.feast.core.RequestFeatureView_pb2 import (
RequestFeatureView as RequestFeatureViewProto,
)

FcoProto = TypeVar(
"FcoProto",
EntityProto,
FeatureViewProto,
FeatureServiceProto,
OnDemandFeatureViewProto,
RequestFeatureViewProto,
)


@dataclass
class FcoDiff:
class FcoDiff(Generic[FcoProto]):
name: str
fco_type: str
current_fco: Any
new_fco: Any
current_fco: FcoProto
new_fco: FcoProto
fco_property_diffs: List[PropertyDiff]
transition_type: TransitionType

Expand All @@ -30,12 +48,12 @@ def add_fco_diff(self, fco_diff: FcoDiff):
self.fco_diffs.append(fco_diff)


T = TypeVar("T", Entity, BaseFeatureView, FeatureService)
Fco = TypeVar("Fco", Entity, BaseFeatureView, FeatureService)


def tag_objects_for_keep_delete_add(
existing_objs: Iterable[T], desired_objs: Iterable[T]
) -> Tuple[Set[T], Set[T], Set[T]]:
existing_objs: Iterable[Fco], desired_objs: Iterable[Fco]
) -> Tuple[Set[Fco], Set[Fco], Set[Fco]]:
existing_obj_names = {e.name for e in existing_objs}
desired_obj_names = {e.name for e in desired_objs}

Expand All @@ -46,12 +64,9 @@ def tag_objects_for_keep_delete_add(
return objs_to_keep, objs_to_delete, objs_to_add


U = TypeVar("U", EntityProto, FeatureViewProto)


def tag_proto_objects_for_keep_delete_add(
existing_objs: Iterable[U], desired_objs: Iterable[U]
) -> Tuple[Iterable[U], Iterable[U], Iterable[U]]:
existing_objs: Iterable[FcoProto], desired_objs: Iterable[FcoProto]
) -> Tuple[Iterable[FcoProto], Iterable[FcoProto], Iterable[FcoProto]]:
existing_obj_names = {e.spec.name for e in existing_objs}
desired_obj_names = {e.spec.name for e in desired_objs}

Expand All @@ -65,7 +80,7 @@ def tag_proto_objects_for_keep_delete_add(
FIELDS_TO_IGNORE = {"project"}


def diff_between(current: U, new: U, object_type: str) -> FcoDiff:
def diff_between(current: FcoProto, new: FcoProto, object_type: str) -> FcoDiff:
assert current.DESCRIPTOR.full_name == new.DESCRIPTOR.full_name
property_diffs = []
transition: TransitionType = TransitionType.UNCHANGED
Expand Down
46 changes: 34 additions & 12 deletions sdk/python/feast/diff/infra_diff.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from dataclasses import dataclass
from typing import Any, Iterable, List, Tuple, TypeVar
from typing import Generic, Iterable, List, Tuple, TypeVar

from feast.diff.property_diff import PropertyDiff, TransitionType
from feast.infra.infra_object import (
Expand All @@ -17,13 +17,17 @@
from feast.protos.feast.core.InfraObject_pb2 import Infra as InfraProto
from feast.protos.feast.core.SqliteTable_pb2 import SqliteTable as SqliteTableProto

InfraObjectProto = TypeVar(
"InfraObjectProto", DatastoreTableProto, DynamoDBTableProto, SqliteTableProto
)


@dataclass
class InfraObjectDiff:
class InfraObjectDiff(Generic[InfraObjectProto]):
name: str
infra_object_type: str
current_infra_object: Any
new_infra_object: Any
current_infra_object: InfraObjectProto
new_infra_object: InfraObjectProto
infra_object_property_diffs: List[PropertyDiff]
transition_type: TransitionType

Expand All @@ -36,18 +40,34 @@ def __init__(self):
self.infra_object_diffs = []

def update(self):
pass
"""Apply the infrastructure changes specified in this object."""
for infra_object_diff in self.infra_object_diffs:
if infra_object_diff.transition_type in [
TransitionType.DELETE,
TransitionType.UPDATE,
]:
infra_object = InfraObject.from_proto(
infra_object_diff.current_infra_object
)
infra_object.teardown()
adchia marked this conversation as resolved.
Show resolved Hide resolved
elif infra_object_diff.transition_type in [
TransitionType.CREATE,
TransitionType.UPDATE,
]:
infra_object = InfraObject.from_proto(
infra_object_diff.new_infra_object
)
infra_object.update()

def to_string(self):
pass


U = TypeVar("U", DatastoreTableProto, DynamoDBTableProto, SqliteTableProto)


def tag_infra_proto_objects_for_keep_delete_add(
existing_objs: Iterable[U], desired_objs: Iterable[U]
) -> Tuple[Iterable[U], Iterable[U], Iterable[U]]:
existing_objs: Iterable[InfraObjectProto], desired_objs: Iterable[InfraObjectProto]
) -> Tuple[
Iterable[InfraObjectProto], Iterable[InfraObjectProto], Iterable[InfraObjectProto]
]:
existing_obj_names = {e.name for e in existing_objs}
desired_obj_names = {e.name for e in desired_objs}

Expand Down Expand Up @@ -123,7 +143,7 @@ def diff_infra_protos(

def get_infra_object_protos_by_type(
infra_proto: InfraProto, infra_object_class_type: str
) -> List[U]:
) -> List[InfraObjectProto]:
return [
InfraObject.from_infra_object_proto(infra_object).to_proto()
for infra_object in infra_proto.infra_objects
Expand All @@ -134,7 +154,9 @@ def get_infra_object_protos_by_type(
FIELDS_TO_IGNORE = {"project"}


def diff_between(current: U, new: U, infra_object_type: str) -> InfraObjectDiff:
def diff_between(
current: InfraObjectProto, new: InfraObjectProto, infra_object_type: str
) -> InfraObjectDiff:
assert current.DESCRIPTOR.full_name == new.DESCRIPTOR.full_name
property_diffs = []
transition: TransitionType = TransitionType.UNCHANGED
Expand Down
5 changes: 5 additions & 0 deletions sdk/python/feast/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,3 +293,8 @@ def __init__(self, actual_class: str, expected_class: str):
super().__init__(
f"The registry store class was expected to be {expected_class}, but was instead {actual_class}."
)


class FeastInvalidInfraObjectType(Exception):
def __init__(self):
super().__init__("Could not identify the type of the InfraObject.")
25 changes: 21 additions & 4 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
from feast import feature_server, flags, flags_helper, utils
from feast.base_feature_view import BaseFeatureView
from feast.diff.FcoDiff import RegistryDiff
from feast.diff.infra_diff import InfraDiff, diff_infra_protos
from feast.entity import Entity
from feast.errors import (
EntityNotFoundException,
Expand All @@ -63,6 +64,7 @@
from feast.infra.provider import Provider, RetrievalJob, get_provider
from feast.on_demand_feature_view import OnDemandFeatureView
from feast.online_response import OnlineResponse
from feast.protos.feast.core.InfraObject_pb2 import Infra as InfraProto
from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto
from feast.protos.feast.serving.ServingService_pb2 import (
FieldStatus,
Expand Down Expand Up @@ -405,7 +407,9 @@ def _get_features(
return _feature_refs

@log_exceptions_and_usage
def plan(self, desired_repo_objects: RepoContents) -> RegistryDiff:
def plan(
self, desired_repo_objects: RepoContents
) -> Tuple[RegistryDiff, InfraDiff]:
"""Dry-run registering objects to metadata store.

The plan method dry-runs registering one or more definitions (e.g., Entity, FeatureView), and produces
Expand Down Expand Up @@ -440,7 +444,7 @@ def plan(self, desired_repo_objects: RepoContents) -> RegistryDiff:
... ttl=timedelta(seconds=86400 * 1),
... batch_source=driver_hourly_stats,
... )
>>> diff = fs.plan(RepoContents({driver_hourly_stats_view}, set(), set(), {driver}, set())) # register entity and feature view
>>> registry_diff, infra_diff = fs.plan(RepoContents({driver_hourly_stats_view}, set(), set(), {driver}, set())) # register entity and feature view
"""

current_registry_proto = (
Expand All @@ -450,8 +454,21 @@ def plan(self, desired_repo_objects: RepoContents) -> RegistryDiff:
)

desired_registry_proto = desired_repo_objects.to_registry_proto()
diffs = Registry.diff_between(current_registry_proto, desired_registry_proto)
return diffs
registry_diff = Registry.diff_between(
current_registry_proto, desired_registry_proto
)

current_infra_proto = (
self._registry.cached_registry_proto.infra.__deepcopy__()
if self._registry.cached_registry_proto
else InfraProto()
)
new_infra_proto = self._provider.plan_infra(
self.config, desired_registry_proto
).to_proto()
infra_diff = diff_infra_protos(current_infra_proto, new_infra_proto)

return (registry_diff, infra_diff)

@log_exceptions_and_usage
def apply(
Expand Down
39 changes: 35 additions & 4 deletions sdk/python/feast/infra/infra_object.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,21 @@
from dataclasses import dataclass, field
from typing import Any, List

from feast.errors import FeastInvalidInfraObjectType
from feast.importer import import_class
from feast.protos.feast.core.DatastoreTable_pb2 import (
DatastoreTable as DatastoreTableProto,
)
from feast.protos.feast.core.DynamoDBTable_pb2 import (
DynamoDBTable as DynamoDBTableProto,
)
from feast.protos.feast.core.InfraObject_pb2 import Infra as InfraProto
from feast.protos.feast.core.InfraObject_pb2 import InfraObject as InfraObjectProto
from feast.protos.feast.core.SqliteTable_pb2 import SqliteTable as SqliteTableProto

DATASTORE_INFRA_OBJECT_CLASS_TYPE = "feast.infra.online_stores.datastore.DatastoreTable"
DYNAMODB_INFRA_OBJECT_CLASS_TYPE = "feast.infra.online_stores.dynamodb.DynamoDBTable"
SQLITE_INFRA_OBJECT_CLASS_TYPE = "feast.infra.online_store.sqlite.SqliteTable"
SQLITE_INFRA_OBJECT_CLASS_TYPE = "feast.infra.online_stores.sqlite.SqliteTable"


class InfraObject(ABC):
Expand Down Expand Up @@ -49,15 +57,38 @@ def from_infra_object_proto(infra_object_proto: InfraObjectProto) -> Any:
infra_object_proto: A protobuf representation of an InfraObject.

Raises:
ValueError: The type of InfraObject could not be identified.
FeastInvalidInfraObjectType: The type of InfraObject could not be identified.
"""
if infra_object_proto.infra_object_class_type:
cls = _get_infra_object_class_from_type(
infra_object_proto.infra_object_class_type
)
return cls.from_infra_object_proto(infra_object_proto)

raise ValueError("Could not identify the type of the InfraObject.")
raise FeastInvalidInfraObjectType()

@staticmethod
def from_proto(infra_object_proto: Any) -> Any:
adchia marked this conversation as resolved.
Show resolved Hide resolved
"""
Converts a protobuf representation of a subclass to an object of that subclass.

Args:
infra_object_proto: A protobuf representation of an InfraObject.

Raises:
FeastInvalidInfraObjectType: The type of InfraObject could not be identified.
"""
if isinstance(infra_object_proto, DatastoreTableProto):
adchia marked this conversation as resolved.
Show resolved Hide resolved
infra_object_class_type = DATASTORE_INFRA_OBJECT_CLASS_TYPE
elif isinstance(infra_object_proto, DynamoDBTableProto):
infra_object_class_type = DYNAMODB_INFRA_OBJECT_CLASS_TYPE
elif isinstance(infra_object_proto, SqliteTableProto):
infra_object_class_type = SQLITE_INFRA_OBJECT_CLASS_TYPE
else:
raise FeastInvalidInfraObjectType()

cls = _get_infra_object_class_from_type(infra_object_class_type)
return cls.from_proto(infra_object_proto)

@abstractmethod
def update(self):
Expand Down Expand Up @@ -94,7 +125,7 @@ def to_proto(self) -> InfraProto:
"""
infra_proto = InfraProto()
for infra_object in self.infra_objects:
infra_object_proto = infra_object.to_proto()
infra_object_proto = infra_object.to_infra_object_proto()
infra_proto.infra_objects.append(infra_object_proto)

return infra_proto
Expand Down
19 changes: 12 additions & 7 deletions sdk/python/feast/infra/local.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import uuid
from datetime import datetime
from pathlib import Path
from typing import List

from feast.feature_view import FeatureView
from feast.infra.infra_object import Infra, InfraObject
from feast.infra.passthrough_provider import PassthroughProvider
from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto
from feast.registry_store import RegistryStore
from feast.repo_config import RegistryConfig
from feast.repo_config import RegistryConfig, RepoConfig
from feast.usage import log_exceptions_and_usage


Expand All @@ -15,11 +16,15 @@ class LocalProvider(PassthroughProvider):
This class only exists for backwards compatibility.
"""

pass


def _table_id(project: str, table: FeatureView) -> str:
return f"{project}_{table.name}"
def plan_infra(
self, config: RepoConfig, desired_registry_proto: RegistryProto
) -> Infra:
infra_objects: List[InfraObject] = self.online_store.plan(
config, desired_registry_proto
)
infra = Infra()
infra.infra_objects += infra_objects
return infra


class LocalRegistryStore(RegistryStore):
Expand Down
15 changes: 15 additions & 0 deletions sdk/python/feast/infra/online_stores/datastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,7 @@ def from_infra_object_proto(infra_object_proto: InfraObjectProto) -> Any:
name=infra_object_proto.datastore_table.name,
)

# Distinguish between null and empty string, since project_id and namespace are StringValues.
if infra_object_proto.datastore_table.HasField("project_id"):
datastore_table.project_id = (
infra_object_proto.datastore_table.project_id.value
Expand All @@ -387,6 +388,20 @@ def from_infra_object_proto(infra_object_proto: InfraObjectProto) -> Any:

return datastore_table

@staticmethod
def from_proto(datastore_table_proto: DatastoreTableProto) -> Any:
datastore_table = DatastoreTable(
project=datastore_table_proto.project, name=datastore_table_proto.name,
)

# Distinguish between null and empty string, since project_id and namespace are StringValues.
if datastore_table_proto.HasField("project_id"):
felixwang9817 marked this conversation as resolved.
Show resolved Hide resolved
datastore_table.project_id = datastore_table_proto.project_id.value
if datastore_table_proto.HasField("namespace"):
datastore_table.namespace = datastore_table_proto.namespace.value

return datastore_table

def update(self):
client = _initialize_client(self.project_id, self.namespace)
key = client.key("Project", self.project, "Table", self.name)
Expand Down
6 changes: 6 additions & 0 deletions sdk/python/feast/infra/online_stores/dynamodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,12 @@ def from_infra_object_proto(infra_object_proto: InfraObjectProto) -> Any:
region=infra_object_proto.dynamodb_table.region,
)

@staticmethod
def from_proto(dynamodb_table_proto: DynamoDBTableProto) -> Any:
return DynamoDBTable(
name=dynamodb_table_proto.name, region=dynamodb_table_proto.region,
)

adchia marked this conversation as resolved.
Show resolved Hide resolved
def update(self):
dynamodb_client = _initialize_dynamodb_client(region=self.region)
dynamodb_resource = _initialize_dynamodb_resource(region=self.region)
Expand Down
Loading