Skip to content

Commit

Permalink
fix: Fix SQL Registry cache miss (#3482)
Browse files Browse the repository at this point in the history
Signed-off-by: Danny Chiao <danny@tecton.ai>
  • Loading branch information
adchia authored Feb 9, 2023
1 parent fd91cda commit 3249b97
Show file tree
Hide file tree
Showing 8 changed files with 160 additions and 48 deletions.
8 changes: 5 additions & 3 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,9 +166,9 @@ def __init__(

registry_config = self.config.get_registry_config()
if registry_config.registry_type == "sql":
self._registry = SqlRegistry(registry_config, None)
self._registry = SqlRegistry(registry_config, self.config.project, None)
else:
r = Registry(registry_config, repo_path=self.repo_path)
r = Registry(self.config.project, registry_config, repo_path=self.repo_path)
r._initialize_registry(self.config.project)
self._registry = r

Expand Down Expand Up @@ -210,7 +210,9 @@ def refresh_registry(self):
downloaded synchronously, which may increase latencies if the triggering method is get_online_features().
"""
registry_config = self.config.get_registry_config()
registry = Registry(registry_config, repo_path=self.repo_path)
registry = Registry(
self.config.project, registry_config, repo_path=self.repo_path
)
registry.refresh(self.config.project)

self._registry = registry
Expand Down
24 changes: 23 additions & 1 deletion sdk/python/feast/infra/registry/proto_registry_utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from typing import List
import uuid
from typing import List, Optional

from feast import usage
from feast.data_source import DataSource
from feast.entity import Entity
from feast.errors import (
Expand All @@ -15,12 +17,32 @@
from feast.feature_view import FeatureView
from feast.on_demand_feature_view import OnDemandFeatureView
from feast.project_metadata import ProjectMetadata
from feast.protos.feast.core.Registry_pb2 import ProjectMetadata as ProjectMetadataProto
from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto
from feast.request_feature_view import RequestFeatureView
from feast.saved_dataset import SavedDataset, ValidationReference
from feast.stream_feature_view import StreamFeatureView


def init_project_metadata(cached_registry_proto: RegistryProto, project: str):
new_project_uuid = f"{uuid.uuid4()}"
usage.set_current_project_uuid(new_project_uuid)
cached_registry_proto.project_metadata.append(
ProjectMetadata(project_name=project, project_uuid=new_project_uuid).to_proto()
)


def get_project_metadata(
registry_proto: Optional[RegistryProto], project: str
) -> Optional[ProjectMetadataProto]:
if not registry_proto:
return None
for pm in registry_proto.project_metadata:
if pm.project == project:
return pm
return None


def get_feature_service(
registry_proto: RegistryProto, name: str, project: str
) -> FeatureService:
Expand Down
61 changes: 29 additions & 32 deletions sdk/python/feast/infra/registry/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import uuid
from datetime import datetime, timedelta
from enum import Enum
from pathlib import Path
Expand Down Expand Up @@ -44,7 +43,6 @@
from feast.infra.registry.registry_store import NoopRegistryStore
from feast.on_demand_feature_view import OnDemandFeatureView
from feast.project_metadata import ProjectMetadata
from feast.protos.feast.core.Registry_pb2 import ProjectMetadata as ProjectMetadataProto
from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto
from feast.repo_config import RegistryConfig
from feast.repo_contents import RepoContents
Expand Down Expand Up @@ -143,25 +141,6 @@ def get_registry_store_class_from_scheme(registry_path: str):
return get_registry_store_class_from_type(registry_store_type)


def _get_project_metadata(
registry_proto: Optional[RegistryProto], project: str
) -> Optional[ProjectMetadataProto]:
if not registry_proto:
return None
for pm in registry_proto.project_metadata:
if pm.project == project:
return pm
return None


def _init_project_metadata(cached_registry_proto: RegistryProto, project: str):
new_project_uuid = f"{uuid.uuid4()}"
usage.set_current_project_uuid(new_project_uuid)
cached_registry_proto.project_metadata.append(
ProjectMetadata(project_name=project, project_uuid=new_project_uuid).to_proto()
)


class Registry(BaseRegistry):
def apply_user_metadata(
self,
Expand All @@ -184,19 +163,25 @@ def get_user_metadata(
cached_registry_proto_ttl: timedelta

def __new__(
cls, registry_config: Optional[RegistryConfig], repo_path: Optional[Path]
cls,
project: str,
registry_config: Optional[RegistryConfig],
repo_path: Optional[Path],
):
# We override __new__ so that we can inspect registry_config and create a SqlRegistry without callers
# needing to make any changes.
if registry_config and registry_config.registry_type == "sql":
from feast.infra.registry.sql import SqlRegistry

return SqlRegistry(registry_config, repo_path)
return SqlRegistry(registry_config, project, repo_path)
else:
return super(Registry, cls).__new__(cls)

def __init__(
self, registry_config: Optional[RegistryConfig], repo_path: Optional[Path]
self,
project: str,
registry_config: Optional[RegistryConfig],
repo_path: Optional[Path],
):
"""
Create the Registry object.
Expand Down Expand Up @@ -225,7 +210,7 @@ def __init__(
)

def clone(self) -> "Registry":
new_registry = Registry(None, None)
new_registry = Registry("project", None, None)
new_registry.cached_registry_proto_ttl = timedelta(seconds=0)
new_registry.cached_registry_proto = (
self.cached_registry_proto.__deepcopy__()
Expand All @@ -243,7 +228,7 @@ def _initialize_registry(self, project: str):
except FileNotFoundError:
registry_proto = RegistryProto()
registry_proto.registry_schema_version = REGISTRY_SCHEMA_VERSION
_init_project_metadata(registry_proto, project)
proto_registry_utils.init_project_metadata(registry_proto, project)
self._registry_store.update_registry_proto(registry_proto)

def update_infra(self, infra: Infra, project: str, commit: bool = True):
Expand Down Expand Up @@ -791,7 +776,12 @@ def _prepare_registry_for_changes(self, project: str):
"""Prepares the Registry for changes by refreshing the cache if necessary."""
try:
self._get_registry_proto(project=project, allow_cache=True)
if _get_project_metadata(self.cached_registry_proto, project) is None:
if (
proto_registry_utils.get_project_metadata(
self.cached_registry_proto, project
)
is None
):
# Project metadata not initialized yet. Try pulling without cache
self._get_registry_proto(project=project, allow_cache=False)
except FileNotFoundError:
Expand All @@ -802,8 +792,15 @@ def _prepare_registry_for_changes(self, project: str):

# Initialize project metadata if needed
assert self.cached_registry_proto
if _get_project_metadata(self.cached_registry_proto, project) is None:
_init_project_metadata(self.cached_registry_proto, project)
if (
proto_registry_utils.get_project_metadata(
self.cached_registry_proto, project
)
is None
):
proto_registry_utils.init_project_metadata(
self.cached_registry_proto, project
)
self.commit()

return self.cached_registry_proto
Expand Down Expand Up @@ -836,7 +833,7 @@ def _get_registry_proto(
)

if project:
old_project_metadata = _get_project_metadata(
old_project_metadata = proto_registry_utils.get_project_metadata(
registry_proto=self.cached_registry_proto, project=project
)

Expand All @@ -854,13 +851,13 @@ def _get_registry_proto(
if not project:
return registry_proto

project_metadata = _get_project_metadata(
project_metadata = proto_registry_utils.get_project_metadata(
registry_proto=registry_proto, project=project
)
if project_metadata:
usage.set_current_project_uuid(project_metadata.project_uuid)
else:
_init_project_metadata(registry_proto, project)
proto_registry_utils.init_project_metadata(registry_proto, project)
self.commit()

return registry_proto
Expand Down
25 changes: 23 additions & 2 deletions sdk/python/feast/infra/registry/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,19 +180,24 @@ class FeastMetadataKeys(Enum):

class SqlRegistry(BaseRegistry):
def __init__(
self, registry_config: Optional[RegistryConfig], repo_path: Optional[Path]
self,
registry_config: Optional[RegistryConfig],
project: str,
repo_path: Optional[Path],
):
assert registry_config is not None, "SqlRegistry needs a valid registry_config"
self.engine: Engine = create_engine(registry_config.path, echo=False)
metadata.create_all(self.engine)
self.cached_registry_proto = self.proto()
proto_registry_utils.init_project_metadata(self.cached_registry_proto, project)
self.cached_registry_proto_created = datetime.utcnow()
self._refresh_lock = Lock()
self.cached_registry_proto_ttl = timedelta(
seconds=registry_config.cache_ttl_seconds
if registry_config.cache_ttl_seconds is not None
else 0
)
self.project = project

def teardown(self):
for t in {
Expand All @@ -210,6 +215,16 @@ def teardown(self):
conn.execute(stmt)

def refresh(self, project: Optional[str] = None):
if project:
project_metadata = proto_registry_utils.get_project_metadata(
registry_proto=self.cached_registry_proto, project=project
)
if project_metadata:
usage.set_current_project_uuid(project_metadata.project_uuid)
else:
proto_registry_utils.init_project_metadata(
self.cached_registry_proto, project
)
self.cached_registry_proto = self.proto()
self.cached_registry_proto_created = datetime.utcnow()

Expand Down Expand Up @@ -816,7 +831,13 @@ def proto(self) -> RegistryProto:
]:
objs: List[Any] = lister(project) # type: ignore
if objs:
registry_proto_field.extend([obj.to_proto() for obj in objs])
obj_protos = [obj.to_proto() for obj in objs]
for obj_proto in obj_protos:
if "spec" in obj_proto.DESCRIPTOR.fields_by_name:
obj_proto.spec.project = project
else:
obj_proto.project = project
registry_proto_field.extend(obj_protos)

# This is suuuper jank. Because of https://github.com/feast-dev/feast/issues/2783,
# the registry proto only has a single infra field, which we're currently setting as the "last" project.
Expand Down
2 changes: 1 addition & 1 deletion sdk/python/feast/repo_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ def registry_dump(repo_config: RepoConfig, repo_path: Path) -> str:
"""For debugging only: output contents of the metadata registry"""
registry_config = repo_config.get_registry_config()
project = repo_config.project
registry = Registry(registry_config=registry_config, repo_path=repo_path)
registry = Registry(project, registry_config=registry_config, repo_path=repo_path)
registry_dict = registry.to_dict(project=project)
return json.dumps(registry_dict, indent=2, sort_keys=True)

Expand Down
4 changes: 2 additions & 2 deletions sdk/python/tests/integration/registration/test_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ def gcs_registry() -> Registry:
registry_config = RegistryConfig(
path=f"gs://{bucket_name}/registry.db", cache_ttl_seconds=600
)
return Registry(registry_config, None)
return Registry("project", registry_config, None)


@pytest.fixture
Expand All @@ -57,7 +57,7 @@ def s3_registry() -> Registry:
path=f"{aws_registry_path}/{int(time.time() * 1000)}/registry.db",
cache_ttl_seconds=600,
)
return Registry(registry_config, None)
return Registry("project", registry_config, None)


@pytest.mark.integration
Expand Down
8 changes: 4 additions & 4 deletions sdk/python/tests/unit/infra/test_local_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
def local_registry() -> Registry:
fd, registry_path = mkstemp()
registry_config = RegistryConfig(path=registry_path, cache_ttl_seconds=600)
return Registry(registry_config, None)
return Registry("project", registry_config, None)


@pytest.mark.parametrize(
Expand Down Expand Up @@ -443,7 +443,7 @@ def test_apply_data_source(test_registry: Registry):
def test_commit():
fd, registry_path = mkstemp()
registry_config = RegistryConfig(path=registry_path, cache_ttl_seconds=600)
test_registry = Registry(registry_config, None)
test_registry = Registry("project", registry_config, None)

entity = Entity(
name="driver_car_id",
Expand Down Expand Up @@ -484,7 +484,7 @@ def test_commit():
validate_project_uuid(project_uuid, test_registry)

# Create new registry that points to the same store
registry_with_same_store = Registry(registry_config, None)
registry_with_same_store = Registry("project", registry_config, None)

# Retrieving the entity should fail since the store is empty
entities = registry_with_same_store.list_entities(project)
Expand All @@ -495,7 +495,7 @@ def test_commit():
test_registry.commit()

# Reconstruct the new registry in order to read the newly written store
registry_with_same_store = Registry(registry_config, None)
registry_with_same_store = Registry("project", registry_config, None)

# Retrieving the entity should now succeed
entities = registry_with_same_store.list_entities(project)
Expand Down
Loading

0 comments on commit 3249b97

Please sign in to comment.