Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Add columns for user metadata in the tables #2760

Merged
merged 7 commits into from
Jun 7, 2022
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions sdk/python/feast/diff/registry_diff.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from feast.protos.feast.core.ValidationProfile_pb2 import (
ValidationReference as ValidationReferenceProto,
)
from feast.registry import FEAST_OBJECT_TYPES, FeastObjectType, Registry
from feast.registry import FEAST_OBJECT_TYPES, BaseRegistry, FeastObjectType
from feast.repo_contents import RepoContents


Expand Down Expand Up @@ -161,7 +161,7 @@ def diff_registry_objects(


def extract_objects_for_keep_delete_update_add(
registry: Registry, current_project: str, desired_repo_contents: RepoContents,
registry: BaseRegistry, current_project: str, desired_repo_contents: RepoContents,
) -> Tuple[
Dict[FeastObjectType, Set[FeastObject]],
Dict[FeastObjectType, Set[FeastObject]],
Expand Down Expand Up @@ -208,7 +208,7 @@ def extract_objects_for_keep_delete_update_add(


def diff_between(
registry: Registry, current_project: str, desired_repo_contents: RepoContents,
registry: BaseRegistry, current_project: str, desired_repo_contents: RepoContents,
) -> RegistryDiff:
"""
Returns the difference between the current and desired repo states.
Expand Down Expand Up @@ -267,7 +267,10 @@ def diff_between(


def apply_diff_to_registry(
registry: Registry, registry_diff: RegistryDiff, project: str, commit: bool = True
registry: BaseRegistry,
registry_diff: RegistryDiff,
project: str,
commit: bool = True,
):
"""
Applies the given diff to the given Feast project in the registry.
Expand Down
8 changes: 4 additions & 4 deletions sdk/python/feast/feature_logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
)

if TYPE_CHECKING:
from feast import FeatureService
from feast.registry import Registry
from feast.feature_service import FeatureService
from feast.registry import BaseRegistry


REQUEST_ID_FIELD = "__request_id"
Expand All @@ -33,7 +33,7 @@ class LoggingSource:
"""

@abc.abstractmethod
def get_schema(self, registry: "Registry") -> pa.Schema:
def get_schema(self, registry: "BaseRegistry") -> pa.Schema:
""" Generate schema for logs destination. """
raise NotImplementedError

Expand All @@ -48,7 +48,7 @@ def __init__(self, feature_service: "FeatureService", project: str):
self._feature_service = feature_service
self._project = project

def get_schema(self, registry: "Registry") -> pa.Schema:
def get_schema(self, registry: "BaseRegistry") -> pa.Schema:
fields: Dict[str, pa.DataType] = {}

for projection in self._feature_service.feature_view_projections:
Expand Down
19 changes: 7 additions & 12 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,14 +74,13 @@
from feast.infra.registry_stores.sql import SqlRegistry
from feast.on_demand_feature_view import OnDemandFeatureView
from feast.online_response import OnlineResponse
from feast.protos.feast.core.InfraObject_pb2 import Infra as InfraProto
from feast.protos.feast.serving.ServingService_pb2 import (
FieldStatus,
GetOnlineFeaturesResponse,
)
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
from feast.protos.feast.types.Value_pb2 import RepeatedValue, Value
from feast.registry import Registry
from feast.registry import BaseRegistry, Registry
from feast.repo_config import RepoConfig, load_repo_config
from feast.repo_contents import RepoContents
from feast.request_feature_view import RequestFeatureView
Expand Down Expand Up @@ -113,7 +112,7 @@ class FeatureStore:

config: RepoConfig
repo_path: Path
_registry: Registry
_registry: BaseRegistry
_provider: Provider
_go_server: "EmbeddedOnlineFeatureServer"

Expand Down Expand Up @@ -142,8 +141,9 @@ def __init__(
if registry_config.registry_type == "sql":
self._registry = SqlRegistry(registry_config, None)
else:
self._registry = Registry(registry_config, repo_path=self.repo_path)
self._registry._initialize_registry()
r = Registry(registry_config, repo_path=self.repo_path)
felixwang9817 marked this conversation as resolved.
Show resolved Hide resolved
r._initialize_registry()
self._registry = r
self._provider = get_provider(self.config, self.repo_path)
self._go_server = None

Expand All @@ -153,7 +153,7 @@ def version(self) -> str:
return get_version()

@property
def registry(self) -> Registry:
def registry(self) -> BaseRegistry:
"""Gets the registry of this feature store."""
return self._registry

Expand Down Expand Up @@ -644,12 +644,7 @@ def _plan(
# Compute the desired difference between the current infra, as stored in the registry,
# and the desired infra.
self._registry.refresh()
current_infra_proto = (
self._registry.cached_registry_proto.infra.__deepcopy__()
if hasattr(self._registry, "cached_registry_proto")
and self._registry.cached_registry_proto
else InfraProto()
)
current_infra_proto = self._registry.proto().infra.__deepcopy__()
desired_registry_proto = desired_repo_contents.to_registry_proto()
new_infra = self._provider.plan_infra(self.config, desired_registry_proto)
new_infra_proto = new_infra.to_proto()
Expand Down
6 changes: 3 additions & 3 deletions sdk/python/feast/infra/offline_stores/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
RetrievalMetadata,
)
from feast.on_demand_feature_view import OnDemandFeatureView
from feast.registry import Registry
from feast.registry import BaseRegistry
from feast.repo_config import FeastConfigBaseModel, RepoConfig

from ...saved_dataset import SavedDatasetStorage
Expand Down Expand Up @@ -169,7 +169,7 @@ def get_historical_features(
feature_views: List[FeatureView],
feature_refs: List[str],
entity_df: Union[pd.DataFrame, str],
registry: Registry,
registry: BaseRegistry,
project: str,
full_feature_names: bool = False,
) -> RetrievalJob:
Expand Down Expand Up @@ -262,7 +262,7 @@ def write_logged_features(
data: Union[pyarrow.Table, Path],
source: LoggingSource,
logging_config: LoggingConfig,
registry: Registry,
registry: BaseRegistry,
):
destination = logging_config.destination
assert isinstance(destination, BigQueryLoggingDestination)
Expand Down
6 changes: 3 additions & 3 deletions sdk/python/feast/infra/offline_stores/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
_get_requested_feature_views_to_features_dict,
_run_dask_field_mapping,
)
from feast.registry import Registry
from feast.registry import BaseRegistry
from feast.repo_config import FeastConfigBaseModel, RepoConfig
from feast.saved_dataset import SavedDatasetStorage
from feast.usage import log_exceptions_and_usage
Expand Down Expand Up @@ -113,7 +113,7 @@ def get_historical_features(
feature_views: List[FeatureView],
feature_refs: List[str],
entity_df: Union[pd.DataFrame, str],
registry: Registry,
registry: BaseRegistry,
project: str,
full_feature_names: bool = False,
) -> RetrievalJob:
Expand Down Expand Up @@ -380,7 +380,7 @@ def write_logged_features(
data: Union[pyarrow.Table, Path],
source: LoggingSource,
logging_config: LoggingConfig,
registry: Registry,
registry: BaseRegistry,
):
destination = logging_config.destination
assert isinstance(destination, FileLoggingDestination)
Expand Down
6 changes: 3 additions & 3 deletions sdk/python/feast/infra/offline_stores/offline_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from feast.feature_logging import LoggingConfig, LoggingSource
from feast.feature_view import FeatureView
from feast.on_demand_feature_view import OnDemandFeatureView
from feast.registry import Registry
from feast.registry import BaseRegistry
from feast.repo_config import RepoConfig
from feast.saved_dataset import SavedDatasetStorage

Expand Down Expand Up @@ -211,7 +211,7 @@ def get_historical_features(
feature_views: List[FeatureView],
feature_refs: List[str],
entity_df: Union[pd.DataFrame, str],
registry: Registry,
registry: BaseRegistry,
project: str,
full_feature_names: bool = False,
) -> RetrievalJob:
Expand Down Expand Up @@ -252,7 +252,7 @@ def write_logged_features(
data: Union[pyarrow.Table, Path],
source: LoggingSource,
logging_config: LoggingConfig,
registry: Registry,
registry: BaseRegistry,
):
"""
Write logged features to a specified destination (taken from logging_config) in the offline store.
Expand Down
8 changes: 4 additions & 4 deletions sdk/python/feast/infra/offline_stores/offline_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
from jinja2 import BaseLoader, Environment
from pandas import Timestamp

import feast
from feast.errors import (
EntityTimestampInferenceException,
FeastEntityDFMissingColumnsError,
Expand All @@ -17,7 +16,7 @@
from feast.importer import import_class
from feast.infra.offline_stores.offline_store import OfflineStore
from feast.infra.provider import _get_requested_feature_views_to_features_dict
from feast.registry import Registry
from feast.registry import BaseRegistry
from feast.utils import to_naive_utc

DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL = "event_timestamp"
Expand Down Expand Up @@ -55,8 +54,9 @@ def assert_expected_columns_in_entity_df(
raise FeastEntityDFMissingColumnsError(expected_columns, missing_keys)


# TODO: Remove project and registry from the interface and call sites.
def get_expected_join_keys(
project: str, feature_views: List["feast.FeatureView"], registry: Registry
project: str, feature_views: List[FeatureView], registry: BaseRegistry
) -> Set[str]:
join_keys = set()
for feature_view in feature_views:
Expand Down Expand Up @@ -95,7 +95,7 @@ class FeatureViewQueryContext:
def get_feature_view_query_context(
feature_refs: List[str],
feature_views: List[FeatureView],
registry: Registry,
registry: BaseRegistry,
project: str,
entity_df_timestamp_range: Tuple[datetime, datetime],
) -> List[FeatureViewQueryContext]:
Expand Down
6 changes: 3 additions & 3 deletions sdk/python/feast/infra/offline_stores/redshift.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
SavedDatasetRedshiftStorage,
)
from feast.infra.utils import aws_utils
from feast.registry import Registry
from feast.registry import BaseRegistry
from feast.repo_config import FeastConfigBaseModel, RepoConfig
from feast.saved_dataset import SavedDatasetStorage
from feast.usage import log_exceptions_and_usage
Expand Down Expand Up @@ -176,7 +176,7 @@ def get_historical_features(
feature_views: List[FeatureView],
feature_refs: List[str],
entity_df: Union[pd.DataFrame, str],
registry: Registry,
registry: BaseRegistry,
project: str,
full_feature_names: bool = False,
) -> RetrievalJob:
Expand Down Expand Up @@ -269,7 +269,7 @@ def write_logged_features(
data: Union[pyarrow.Table, Path],
source: LoggingSource,
logging_config: LoggingConfig,
registry: Registry,
registry: BaseRegistry,
):
destination = logging_config.destination
assert isinstance(destination, RedshiftLoggingDestination)
Expand Down
6 changes: 3 additions & 3 deletions sdk/python/feast/infra/offline_stores/snowflake.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
write_pandas,
write_parquet,
)
from feast.registry import Registry
from feast.registry import BaseRegistry
from feast.repo_config import FeastConfigBaseModel, RepoConfig
from feast.saved_dataset import SavedDatasetStorage
from feast.usage import log_exceptions_and_usage
Expand Down Expand Up @@ -206,7 +206,7 @@ def get_historical_features(
feature_views: List[FeatureView],
feature_refs: List[str],
entity_df: Union[pd.DataFrame, str],
registry: Registry,
registry: BaseRegistry,
project: str,
full_feature_names: bool = False,
) -> RetrievalJob:
Expand Down Expand Up @@ -284,7 +284,7 @@ def write_logged_features(
data: Union[pyarrow.Table, Path],
source: LoggingSource,
logging_config: LoggingConfig,
registry: Registry,
registry: BaseRegistry,
):
assert isinstance(logging_config.destination, SnowflakeLoggingDestination)

Expand Down
10 changes: 5 additions & 5 deletions sdk/python/feast/infra/passthrough_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
)
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
from feast.protos.feast.types.Value_pb2 import Value as ValueProto
from feast.registry import Registry
from feast.registry import BaseRegistry
from feast.repo_config import RepoConfig
from feast.saved_dataset import SavedDataset
from feast.usage import RatioSampler, log_exceptions_and_usage, set_usage_attribute
Expand Down Expand Up @@ -138,7 +138,7 @@ def materialize_single_feature_view(
feature_view: FeatureView,
start_date: datetime,
end_date: datetime,
registry: Registry,
registry: BaseRegistry,
project: str,
tqdm_builder: Callable[[int], tqdm],
) -> None:
Expand Down Expand Up @@ -194,7 +194,7 @@ def get_historical_features(
feature_views: List[FeatureView],
feature_refs: List[str],
entity_df: Union[pandas.DataFrame, str],
registry: Registry,
registry: BaseRegistry,
project: str,
full_feature_names: bool,
) -> RetrievalJob:
Expand Down Expand Up @@ -240,7 +240,7 @@ def write_feature_service_logs(
feature_service: FeatureService,
logs: Union[pyarrow.Table, str],
config: RepoConfig,
registry: Registry,
registry: BaseRegistry,
):
assert (
feature_service.logging_config is not None
Expand All @@ -260,7 +260,7 @@ def retrieve_feature_service_logs(
start_date: datetime,
end_date: datetime,
config: RepoConfig,
registry: Registry,
registry: BaseRegistry,
) -> RetrievalJob:
assert (
feature_service.logging_config is not None
Expand Down
10 changes: 5 additions & 5 deletions sdk/python/feast/infra/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
from feast.protos.feast.types.Value_pb2 import Value as ValueProto
from feast.registry import Registry
from feast.registry import BaseRegistry
from feast.repo_config import RepoConfig
from feast.saved_dataset import SavedDataset
from feast.type_map import python_values_to_proto_values
Expand Down Expand Up @@ -133,7 +133,7 @@ def materialize_single_feature_view(
feature_view: FeatureView,
start_date: datetime,
end_date: datetime,
registry: Registry,
registry: BaseRegistry,
project: str,
tqdm_builder: Callable[[int], tqdm],
) -> None:
Expand All @@ -146,7 +146,7 @@ def get_historical_features(
feature_views: List[FeatureView],
feature_refs: List[str],
entity_df: Union[pandas.DataFrame, str],
registry: Registry,
registry: BaseRegistry,
project: str,
full_feature_names: bool,
) -> RetrievalJob:
Expand Down Expand Up @@ -192,7 +192,7 @@ def write_feature_service_logs(
feature_service: FeatureService,
logs: Union[pyarrow.Table, Path],
config: RepoConfig,
registry: Registry,
registry: BaseRegistry,
):
"""
Write features and entities logged by a feature server to an offline store.
Expand All @@ -211,7 +211,7 @@ def retrieve_feature_service_logs(
start_date: datetime,
end_date: datetime,
config: RepoConfig,
registry: Registry,
registry: BaseRegistry,
) -> RetrievalJob:
"""
Read logged features from an offline store for a given time window [from, to).
Expand Down
Loading