Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Modify feature_store.plan to produce an InfraDiff #2211

Merged
merged 8 commits into from
Jan 11, 2022
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 24 additions & 7 deletions sdk/python/feast/diff/FcoDiff.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,40 @@
from dataclasses import dataclass
from typing import Any, Iterable, List, Set, Tuple, TypeVar
from typing import Generic, Iterable, List, Set, Tuple, TypeVar

from feast.base_feature_view import BaseFeatureView
from feast.diff.property_diff import PropertyDiff, TransitionType
from feast.entity import Entity
from feast.feature_service import FeatureService
from feast.protos.feast.core.Entity_pb2 import Entity as EntityProto
from feast.protos.feast.core.FeatureService_pb2 import (
FeatureService as FeatureServiceProto,
)
from feast.protos.feast.core.FeatureTable_pb2 import FeatureTable as FeatureTableProto
felixwang9817 marked this conversation as resolved.
Show resolved Hide resolved
from feast.protos.feast.core.FeatureView_pb2 import FeatureView as FeatureViewProto
from feast.protos.feast.core.OnDemandFeatureView_pb2 import (
OnDemandFeatureView as OnDemandFeatureViewProto,
)
from feast.protos.feast.core.RequestFeatureView_pb2 import (
RequestFeatureView as RequestFeatureViewProto,
)

U = TypeVar(
felixwang9817 marked this conversation as resolved.
Show resolved Hide resolved
"U",
EntityProto,
FeatureViewProto,
FeatureServiceProto,
FeatureTableProto,
OnDemandFeatureViewProto,
RequestFeatureViewProto,
)


@dataclass
class FcoDiff:
class FcoDiff(Generic[U]):
name: str
fco_type: str
current_fco: Any
new_fco: Any
current_fco: U
new_fco: U
fco_property_diffs: List[PropertyDiff]
transition_type: TransitionType

Expand Down Expand Up @@ -46,9 +66,6 @@ def tag_objects_for_keep_delete_add(
return objs_to_keep, objs_to_delete, objs_to_add


U = TypeVar("U", EntityProto, FeatureViewProto)


def tag_proto_objects_for_keep_delete_add(
existing_objs: Iterable[U], desired_objs: Iterable[U]
) -> Tuple[Iterable[U], Iterable[U], Iterable[U]]:
Expand Down
32 changes: 24 additions & 8 deletions sdk/python/feast/diff/infra_diff.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from dataclasses import dataclass
from typing import Any, Iterable, List, Tuple, TypeVar
from typing import Generic, Iterable, List, Tuple, TypeVar

from feast.diff.property_diff import PropertyDiff, TransitionType
from feast.infra.infra_object import (
Expand All @@ -17,13 +17,15 @@
from feast.protos.feast.core.InfraObject_pb2 import Infra as InfraProto
from feast.protos.feast.core.SqliteTable_pb2 import SqliteTable as SqliteTableProto

U = TypeVar("U", DatastoreTableProto, DynamoDBTableProto, SqliteTableProto)
felixwang9817 marked this conversation as resolved.
Show resolved Hide resolved


@dataclass
class InfraObjectDiff:
class InfraObjectDiff(Generic[U]):
name: str
infra_object_type: str
current_infra_object: Any
new_infra_object: Any
current_infra_object: U
new_infra_object: U
infra_object_property_diffs: List[PropertyDiff]
transition_type: TransitionType

Expand All @@ -36,15 +38,29 @@ def __init__(self):
self.infra_object_diffs = []

def update(self):
pass
"""Apply the infrastructure changes specified in this object."""
for infra_object_diff in self.infra_object_diffs:
if infra_object_diff.transition_type in [
TransitionType.DELETE,
TransitionType.UPDATE,
]:
infra_object = InfraObject.from_proto(
infra_object_diff.current_infra_object
)
infra_object.teardown()
adchia marked this conversation as resolved.
Show resolved Hide resolved
elif infra_object_diff.transition_type in [
TransitionType.CREATE,
TransitionType.UPDATE,
]:
infra_object = InfraObject.from_proto(
infra_object_diff.new_infra_object
)
infra_object.update()

def to_string(self):
pass


U = TypeVar("U", DatastoreTableProto, DynamoDBTableProto, SqliteTableProto)


def tag_infra_proto_objects_for_keep_delete_add(
existing_objs: Iterable[U], desired_objs: Iterable[U]
) -> Tuple[Iterable[U], Iterable[U], Iterable[U]]:
Expand Down
5 changes: 5 additions & 0 deletions sdk/python/feast/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,3 +293,8 @@ def __init__(self, actual_class: str, expected_class: str):
super().__init__(
f"The registry store class was expected to be {expected_class}, but was instead {actual_class}."
)


class FeastInvalidInfraObjectType(Exception):
def __init__(self):
super().__init__("Could not identify the type of the InfraObject.")
25 changes: 21 additions & 4 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
from feast import feature_server, flags, flags_helper, utils
from feast.base_feature_view import BaseFeatureView
from feast.diff.FcoDiff import RegistryDiff
from feast.diff.infra_diff import InfraDiff, diff_infra_protos
from feast.entity import Entity
from feast.errors import (
EntityNotFoundException,
Expand All @@ -63,6 +64,7 @@
from feast.infra.provider import Provider, RetrievalJob, get_provider
from feast.on_demand_feature_view import OnDemandFeatureView
from feast.online_response import OnlineResponse
from feast.protos.feast.core.InfraObject_pb2 import Infra as InfraProto
from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto
from feast.protos.feast.serving.ServingService_pb2 import (
FieldStatus,
Expand Down Expand Up @@ -405,7 +407,9 @@ def _get_features(
return _feature_refs

@log_exceptions_and_usage
def plan(self, desired_repo_objects: RepoContents) -> RegistryDiff:
def plan(
self, desired_repo_objects: RepoContents
) -> Tuple[RegistryDiff, InfraDiff]:
"""Dry-run registering objects to metadata store.

The plan method dry-runs registering one or more definitions (e.g., Entity, FeatureView), and produces
Expand Down Expand Up @@ -440,7 +444,7 @@ def plan(self, desired_repo_objects: RepoContents) -> RegistryDiff:
... ttl=timedelta(seconds=86400 * 1),
... batch_source=driver_hourly_stats,
... )
>>> diff = fs.plan(RepoContents({driver_hourly_stats_view}, set(), set(), {driver}, set())) # register entity and feature view
>>> registry_diff, infra_diff = fs.plan(RepoContents({driver_hourly_stats_view}, set(), set(), {driver}, set())) # register entity and feature view
"""

current_registry_proto = (
Expand All @@ -450,8 +454,21 @@ def plan(self, desired_repo_objects: RepoContents) -> RegistryDiff:
)

desired_registry_proto = desired_repo_objects.to_registry_proto()
diffs = Registry.diff_between(current_registry_proto, desired_registry_proto)
return diffs
registry_diff = Registry.diff_between(
current_registry_proto, desired_registry_proto
)

current_infra_proto = (
self._registry.cached_registry_proto.infra.__deepcopy__()
if self._registry.cached_registry_proto
else InfraProto()
)
new_infra_proto = self._provider.plan_infra(
self.config, desired_registry_proto
).to_proto()
infra_diff = diff_infra_protos(current_infra_proto, new_infra_proto)

return (registry_diff, infra_diff)

@log_exceptions_and_usage
def apply(
Expand Down
39 changes: 35 additions & 4 deletions sdk/python/feast/infra/infra_object.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,21 @@
from dataclasses import dataclass, field
from typing import Any, List

from feast.errors import FeastInvalidInfraObjectType
from feast.importer import import_class
from feast.protos.feast.core.DatastoreTable_pb2 import (
DatastoreTable as DatastoreTableProto,
)
from feast.protos.feast.core.DynamoDBTable_pb2 import (
DynamoDBTable as DynamoDBTableProto,
)
from feast.protos.feast.core.InfraObject_pb2 import Infra as InfraProto
from feast.protos.feast.core.InfraObject_pb2 import InfraObject as InfraObjectProto
from feast.protos.feast.core.SqliteTable_pb2 import SqliteTable as SqliteTableProto

DATASTORE_INFRA_OBJECT_CLASS_TYPE = "feast.infra.online_stores.datastore.DatastoreTable"
DYNAMODB_INFRA_OBJECT_CLASS_TYPE = "feast.infra.online_stores.dynamodb.DynamoDBTable"
SQLITE_INFRA_OBJECT_CLASS_TYPE = "feast.infra.online_store.sqlite.SqliteTable"
SQLITE_INFRA_OBJECT_CLASS_TYPE = "feast.infra.online_stores.sqlite.SqliteTable"


class InfraObject(ABC):
Expand Down Expand Up @@ -49,15 +57,38 @@ def from_infra_object_proto(infra_object_proto: InfraObjectProto) -> Any:
infra_object_proto: A protobuf representation of an InfraObject.

Raises:
ValueError: The type of InfraObject could not be identified.
FeastInvalidInfraObjectType: The type of InfraObject could not be identified.
"""
if infra_object_proto.infra_object_class_type:
cls = _get_infra_object_class_from_type(
infra_object_proto.infra_object_class_type
)
return cls.from_infra_object_proto(infra_object_proto)

raise ValueError("Could not identify the type of the InfraObject.")
raise FeastInvalidInfraObjectType()

@staticmethod
def from_proto(infra_object_proto: Any) -> Any:
adchia marked this conversation as resolved.
Show resolved Hide resolved
"""
Converts a protobuf representation of a subclass to an object of that subclass.

Args:
infra_object_proto: A protobuf representation of an InfraObject.

Raises:
FeastInvalidInfraObjectType: The type of InfraObject could not be identified.
"""
if isinstance(infra_object_proto, DatastoreTableProto):
adchia marked this conversation as resolved.
Show resolved Hide resolved
infra_object_class_type = DATASTORE_INFRA_OBJECT_CLASS_TYPE
elif isinstance(infra_object_proto, DynamoDBTableProto):
infra_object_class_type = DYNAMODB_INFRA_OBJECT_CLASS_TYPE
elif isinstance(infra_object_proto, SqliteTableProto):
infra_object_class_type = SQLITE_INFRA_OBJECT_CLASS_TYPE
else:
raise FeastInvalidInfraObjectType()

cls = _get_infra_object_class_from_type(infra_object_class_type)
return cls.from_proto(infra_object_proto)

@abstractmethod
def update(self):
Expand Down Expand Up @@ -94,7 +125,7 @@ def to_proto(self) -> InfraProto:
"""
infra_proto = InfraProto()
for infra_object in self.infra_objects:
infra_object_proto = infra_object.to_proto()
infra_object_proto = infra_object.to_infra_object_proto()
infra_proto.infra_objects.append(infra_object_proto)

return infra_proto
Expand Down
19 changes: 12 additions & 7 deletions sdk/python/feast/infra/local.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import uuid
from datetime import datetime
from pathlib import Path
from typing import List

from feast.feature_view import FeatureView
from feast.infra.infra_object import Infra, InfraObject
from feast.infra.passthrough_provider import PassthroughProvider
from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto
from feast.registry_store import RegistryStore
from feast.repo_config import RegistryConfig
from feast.repo_config import RegistryConfig, RepoConfig
from feast.usage import log_exceptions_and_usage


Expand All @@ -15,11 +16,15 @@ class LocalProvider(PassthroughProvider):
This class only exists for backwards compatibility.
"""

pass


def _table_id(project: str, table: FeatureView) -> str:
return f"{project}_{table.name}"
def plan_infra(
self, config: RepoConfig, desired_registry_proto: RegistryProto
) -> Infra:
infra_objects: List[InfraObject] = self.online_store.plan(
config, desired_registry_proto
)
infra = Infra()
infra.infra_objects += infra_objects
return infra


class LocalRegistryStore(RegistryStore):
Expand Down
13 changes: 13 additions & 0 deletions sdk/python/feast/infra/online_stores/datastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,19 @@ def from_infra_object_proto(infra_object_proto: InfraObjectProto) -> Any:

return datastore_table

@staticmethod
def from_proto(datastore_table_proto: DatastoreTableProto) -> Any:
datastore_table = DatastoreTable(
project=datastore_table_proto.project, name=datastore_table_proto.name,
)

if datastore_table_proto.HasField("project_id"):
felixwang9817 marked this conversation as resolved.
Show resolved Hide resolved
datastore_table.project_id = datastore_table_proto.project_id.value
if datastore_table_proto.HasField("namespace"):
datastore_table.namespace = datastore_table_proto.namespace.value

return datastore_table

def update(self):
client = _initialize_client(self.project_id, self.namespace)
key = client.key("Project", self.project, "Table", self.name)
Expand Down
6 changes: 6 additions & 0 deletions sdk/python/feast/infra/online_stores/dynamodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,12 @@ def from_infra_object_proto(infra_object_proto: InfraObjectProto) -> Any:
region=infra_object_proto.dynamodb_table.region,
)

@staticmethod
def from_proto(dynamodb_table_proto: DynamoDBTableProto) -> Any:
return DynamoDBTable(
name=dynamodb_table_proto.name, region=dynamodb_table_proto.region,
)

adchia marked this conversation as resolved.
Show resolved Hide resolved
def update(self):
dynamodb_client = _initialize_dynamodb_client(region=self.region)
dynamodb_resource = _initialize_dynamodb_resource(region=self.region)
Expand Down
14 changes: 14 additions & 0 deletions sdk/python/feast/infra/online_stores/online_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

from feast import Entity
from feast.feature_view import FeatureView
from feast.infra.infra_object import InfraObject
from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
from feast.protos.feast.types.Value_pb2 import Value as ValueProto
from feast.repo_config import RepoConfig
Expand Down Expand Up @@ -92,6 +94,18 @@ def update(
):
...

def plan(
self, config: RepoConfig, desired_registry_proto: RegistryProto
) -> List[InfraObject]:
"""
Returns the set of InfraObjects required to support the desired registry.

Args:
config: The RepoConfig for the current FeatureStore.
desired_registry_proto: The desired registry, in proto form.
"""
return []

@abstractmethod
def teardown(
self,
Expand Down
20 changes: 20 additions & 0 deletions sdk/python/feast/infra/online_stores/sqlite.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from feast.infra.key_encoding_utils import serialize_entity_key
from feast.infra.online_stores.online_store import OnlineStore
from feast.protos.feast.core.InfraObject_pb2 import InfraObject as InfraObjectProto
from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto
from feast.protos.feast.core.SqliteTable_pb2 import SqliteTable as SqliteTableProto
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
from feast.protos.feast.types.Value_pb2 import Value as ValueProto
Expand Down Expand Up @@ -199,6 +200,21 @@ def update(
for table in tables_to_delete:
conn.execute(f"DROP TABLE IF EXISTS {_table_id(project, table)}")

@log_exceptions_and_usage(online_store="sqlite")
def plan(
self, config: RepoConfig, desired_registry_proto: RegistryProto
) -> List[InfraObject]:
project = config.project

infra_objects: List[InfraObject] = [
SqliteTable(
path=self._get_db_path(config),
name=_table_id(project, FeatureView.from_proto(view)),
)
for view in desired_registry_proto.feature_views
]
return infra_objects

def teardown(
self,
config: RepoConfig,
Expand Down Expand Up @@ -261,6 +277,10 @@ def from_infra_object_proto(infra_object_proto: InfraObjectProto) -> Any:
name=infra_object_proto.sqlite_table.name,
)

@staticmethod
def from_proto(sqlite_table_proto: SqliteTableProto) -> Any:
return SqliteTable(path=sqlite_table_proto.path, name=sqlite_table_proto.name,)

def update(self):
self.conn.execute(
f"CREATE TABLE IF NOT EXISTS {self.name} (entity_key BLOB, feature_name TEXT, value BLOB, event_ts timestamp, created_ts timestamp, PRIMARY KEY(entity_key, feature_name))"
Expand Down
Loading