Skip to content

Commit

Permalink
fix: Update Feast object metadata in the registry (#4257)
Browse files Browse the repository at this point in the history
  • Loading branch information
msistla96 authored Jun 19, 2024
1 parent 89bc551 commit 8028ae0
Show file tree
Hide file tree
Showing 11 changed files with 570 additions and 9 deletions.
10 changes: 10 additions & 0 deletions sdk/python/feast/feature_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,16 @@ def with_join_key_map(self, join_key_map: Dict[str, str]):

return cp

def update_materialization_intervals(
self, existing_materialization_intervals: List[Tuple[datetime, datetime]]
):
if (
len(existing_materialization_intervals) > 0
and len(self.materialization_intervals) == 0
):
for interval in existing_materialization_intervals:
self.materialization_intervals.append((interval[0], interval[1]))

def to_proto(self) -> FeatureViewProto:
"""
Converts a feature view object to its protobuf representation.
Expand Down
28 changes: 28 additions & 0 deletions sdk/python/feast/infra/registry/base_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,19 @@
from feast.infra.infra_object import Infra
from feast.on_demand_feature_view import OnDemandFeatureView
from feast.project_metadata import ProjectMetadata
from feast.protos.feast.core.Entity_pb2 import Entity as EntityProto
from feast.protos.feast.core.FeatureService_pb2 import (
FeatureService as FeatureServiceProto,
)
from feast.protos.feast.core.FeatureView_pb2 import FeatureView as FeatureViewProto
from feast.protos.feast.core.OnDemandFeatureView_pb2 import (
OnDemandFeatureView as OnDemandFeatureViewProto,
)
from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto
from feast.protos.feast.core.SavedDataset_pb2 import SavedDataset as SavedDatasetProto
from feast.protos.feast.core.StreamFeatureView_pb2 import (
StreamFeatureView as StreamFeatureViewProto,
)
from feast.saved_dataset import SavedDataset, ValidationReference
from feast.stream_feature_view import StreamFeatureView
from feast.transformation.pandas_transformation import PandasTransformation
Expand Down Expand Up @@ -705,3 +717,19 @@ def to_dict(self, project: str) -> Dict[str, List[Any]]:
self._message_to_sorted_dict(infra_object.to_proto())
)
return registry_dict

@staticmethod
def deserialize_registry_values(serialized_proto, feast_obj_type) -> Any:
if feast_obj_type == Entity:
return EntityProto.FromString(serialized_proto)
if feast_obj_type == SavedDataset:
return SavedDatasetProto.FromString(serialized_proto)
if feast_obj_type == FeatureView:
return FeatureViewProto.FromString(serialized_proto)
if feast_obj_type == StreamFeatureView:
return StreamFeatureViewProto.FromString(serialized_proto)
if feast_obj_type == OnDemandFeatureView:
return OnDemandFeatureViewProto.FromString(serialized_proto)
if feast_obj_type == FeatureService:
return FeatureServiceProto.FromString(serialized_proto)
return None
34 changes: 33 additions & 1 deletion sdk/python/feast/infra/registry/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,9 +265,13 @@ def apply_entity(self, entity: Entity, project: str, commit: bool = True):
existing_entity_proto.spec.name == entity_proto.spec.name
and existing_entity_proto.spec.project == project
):
entity.created_timestamp = (
existing_entity_proto.meta.created_timestamp.ToDatetime()
)
entity_proto = entity.to_proto()
entity_proto.spec.project = project
del self.cached_registry_proto.entities[idx]
break

self.cached_registry_proto.entities.append(entity_proto)
if commit:
self.commit()
Expand Down Expand Up @@ -346,6 +350,11 @@ def apply_feature_service(
== feature_service_proto.spec.name
and existing_feature_service_proto.spec.project == project
):
feature_service.created_timestamp = (
existing_feature_service_proto.meta.created_timestamp.ToDatetime()
)
feature_service_proto = feature_service.to_proto()
feature_service_proto.spec.project = project
del registry.feature_services[idx]
registry.feature_services.append(feature_service_proto)
if commit:
Expand Down Expand Up @@ -421,6 +430,18 @@ def apply_feature_view(
):
return
else:
existing_feature_view = type(feature_view).from_proto(
existing_feature_view_proto
)
feature_view.created_timestamp = (
existing_feature_view.created_timestamp
)
if isinstance(feature_view, (FeatureView, StreamFeatureView)):
feature_view.update_materialization_intervals(
existing_feature_view.materialization_intervals
)
feature_view_proto = feature_view.to_proto()
feature_view_proto.spec.project = project
del existing_feature_views_of_same_type[idx]
break

Expand Down Expand Up @@ -660,6 +681,17 @@ def apply_saved_dataset(
existing_saved_dataset_proto.spec.name == saved_dataset_proto.spec.name
and existing_saved_dataset_proto.spec.project == project
):
saved_dataset.created_timestamp = (
existing_saved_dataset_proto.meta.created_timestamp.ToDatetime()
)
saved_dataset.min_event_timestamp = (
existing_saved_dataset_proto.meta.min_event_timestamp.ToDatetime()
)
saved_dataset.max_event_timestamp = (
existing_saved_dataset_proto.meta.max_event_timestamp.ToDatetime()
)
saved_dataset_proto = saved_dataset.to_proto()
saved_dataset_proto.spec.project = project
del self.cached_registry_proto.saved_datasets[idx]
break

Expand Down
1 change: 1 addition & 0 deletions sdk/python/feast/infra/registry/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,7 @@ def apply_materialization(
start_date_timestamp.FromDatetime(start_date)
end_date_timestamp.FromDatetime(end_date)

# TODO: for this to work for stream feature views, ApplyMaterializationRequest needs to be updated
request = RegistryServer_pb2.ApplyMaterializationRequest(
feature_view=feature_view.to_proto(),
project=project,
Expand Down
18 changes: 18 additions & 0 deletions sdk/python/feast/infra/registry/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -713,6 +713,24 @@ def _apply_object(
obj.last_updated_timestamp = update_datetime

if row:
if proto_field_name in [
"entity_proto",
"saved_dataset_proto",
"feature_view_proto",
"feature_service_proto",
]:
deserialized_proto = self.deserialize_registry_values(
row._mapping[proto_field_name], type(obj)
)
obj.created_timestamp = (
deserialized_proto.meta.created_timestamp.ToDatetime()
)
if isinstance(obj, (FeatureView, StreamFeatureView)):
obj.update_materialization_intervals(
type(obj)
.from_proto(deserialized_proto)
.materialization_intervals
)
values = {
proto_field_name: obj.to_proto().SerializeToString(),
"last_updated_timestamp": update_time,
Expand Down
5 changes: 3 additions & 2 deletions sdk/python/feast/registry_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

import grpc
from google.protobuf.empty_pb2 import Empty
from pytz import utc

from feast import FeatureStore
from feast.data_source import DataSource
Expand Down Expand Up @@ -313,10 +314,10 @@ def ApplyMaterialization(
feature_view=FeatureView.from_proto(request.feature_view),
project=request.project,
start_date=datetime.fromtimestamp(
request.start_date.seconds + request.start_date.nanos / 1e9
request.start_date.seconds + request.start_date.nanos / 1e9, tz=utc
),
end_date=datetime.fromtimestamp(
request.end_date.seconds + request.end_date.nanos / 1e9
request.end_date.seconds + request.end_date.nanos / 1e9, tz=utc
),
commit=request.commit,
)
Expand Down
Loading

0 comments on commit 8028ae0

Please sign in to comment.