Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updating CLI apply to use FeatureStore #1745

Merged
merged 18 commits into from
Aug 2, 2021
Merged
24 changes: 23 additions & 1 deletion sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,11 @@ def version(self) -> str:
"""Returns the version of the current Feast SDK/CLI."""
return get_version()

@property
def registry(self) -> Registry:
"""Gets the registry of this feature store."""
return self._registry

@property
def project(self) -> str:
"""Gets the project of this feature store."""
Expand Down Expand Up @@ -224,6 +229,19 @@ def delete_feature_view(self, name: str):
"""
return self._registry.delete_feature_view(name, self.project)

@log_exceptions_and_usage
def delete_feature_service(self, name: str):
"""
Deletes a feature service.

Args:
name: Name of feature service.

Raises:
FeatureServiceNotFoundException: The feature view could not be found.
"""
return self._registry.delete_feature_service(name, self.project)

def _get_features(
self,
features: Optional[Union[List[str], FeatureService]],
Expand Down Expand Up @@ -252,6 +270,7 @@ def apply(
FeatureService,
List[Union[FeatureView, Entity, FeatureService]],
],
commit: bool = True,
):
"""Register objects to metadata store and update related infrastructure.

Expand All @@ -262,6 +281,7 @@ def apply(

Args:
objects: A single object, or a list of objects that should be registered with the Feature Store.
commit: whether to commit changes to the registry

Raises:
ValueError: The 'objects' parameter could not be parsed properly.
Expand Down Expand Up @@ -319,7 +339,6 @@ def apply(
self._registry.apply_entity(ent, project=self.project, commit=False)
for feature_service in services_to_update:
self._registry.apply_feature_service(feature_service, project=self.project)
self._registry.commit()

self._get_provider().update_infra(
project=self.project,
Expand All @@ -330,6 +349,9 @@ def apply(
partial=True,
)

if commit:
self._registry.commit()

@log_exceptions_and_usage
def teardown(self):
"""Tears down all local and cloud resources for the feature store."""
Expand Down
208 changes: 106 additions & 102 deletions sdk/python/feast/repo_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from datetime import timedelta
from importlib.abc import Loader
from pathlib import Path
from typing import List, NamedTuple, Set, Union
from typing import List, NamedTuple, Set, Tuple, Union

import click
from click.exceptions import BadParameter
Expand All @@ -15,10 +15,6 @@
from feast.feature_service import FeatureService
from feast.feature_store import FeatureStore, _validate_feature_views
from feast.feature_view import FeatureView
from feast.inference import (
update_data_sources_with_inferred_event_timestamp_col,
update_entities_with_inferred_types_from_feature_views,
)
from feast.infra.provider import get_provider
from feast.names import adjectives, animals
from feast.registry import Registry
Expand Down Expand Up @@ -116,103 +112,61 @@ def parse_repo(repo_root: Path) -> ParsedRepo:
return res


def apply_feature_services(
registry: Registry,
project: str,
repo: ParsedRepo,
existing_feature_services: List[FeatureService],
):
from colorama import Fore, Style

# Determine which feature services should be deleted.
for feature_service in repo.feature_services:
if feature_service in existing_feature_services:
existing_feature_services.remove(feature_service)

# The remaining features services in the list should be deleted.
for feature_service_to_delete in existing_feature_services:
registry.delete_feature_service(
feature_service_to_delete.name, project, commit=False
)
click.echo(
f"Deleted feature service {Style.BRIGHT + Fore.GREEN}{feature_service_to_delete.name}{Style.RESET_ALL} "
f"from registry"
)

for feature_service in repo.feature_services:
registry.apply_feature_service(feature_service, project=project)
click.echo(
f"Registered feature service {Style.BRIGHT + Fore.GREEN}{feature_service.name}{Style.RESET_ALL}"
)


@log_exceptions_and_usage
def apply_total(repo_config: RepoConfig, repo_path: Path, skip_source_validation: bool):
from colorama import Fore, Style

os.chdir(repo_path)
registry_config = repo_config.get_registry_config()
project = repo_config.project
store = FeatureStore(repo_path=str(repo_path))
project = store.project
if not is_valid_name(project):
print(
f"{project} is not valid. Project name should only have "
f"alphanumerical values and underscores but not start with an underscore."
)
sys.exit(1)
registry = Registry(
registry_path=registry_config.path,
repo_path=repo_path,
cache_ttl=timedelta(seconds=registry_config.cache_ttl_seconds),
)
registry = store.registry
registry._initialize_registry()
sys.dont_write_bytecode = True
repo = parse_repo(repo_path)
_validate_feature_views(repo.feature_views)
data_sources = [t.batch_source for t in repo.feature_views]

if not skip_source_validation:
data_sources = [t.batch_source for t in repo.feature_views]
# Make sure the data source used by this feature view is supported by Feast
for data_source in data_sources:
data_source.validate(repo_config)
data_source.validate(store.config)

# Make inferences
update_entities_with_inferred_types_from_feature_views(
repo.entities, repo.feature_views, repo_config
entities_to_keep, entities_to_delete = _tag_registry_entities_for_keep_delete(
project, registry, repo
)
views_to_keep, views_to_delete = _tag_registry_views_for_keep_delete(
project, registry, repo
)
tables_to_keep, tables_to_delete = _tag_registry_tables_for_keep_delete(
project, registry, repo
)
(services_to_keep, services_to_delete,) = _tag_registry_services_for_keep_delete(
project, registry, repo
)
update_data_sources_with_inferred_event_timestamp_col(data_sources, repo_config)
for view in repo.feature_views:
view.infer_features_from_batch_source(repo_config)

repo_table_names = set(t.name for t in repo.feature_tables)

for t in repo.feature_views:
repo_table_names.add(t.name)

tables_to_delete = []
for registry_table in registry.list_feature_tables(project=project):
if registry_table.name not in repo_table_names:
tables_to_delete.append(registry_table)

views_to_delete = []
for registry_view in registry.list_feature_views(project=project):
if registry_view.name not in repo_table_names:
views_to_delete.append(registry_view)

entities_to_delete: List[Entity] = []
repo_entities_names = set([e.name for e in repo.entities])
for registry_entity in registry.list_entities(project=project):
if registry_entity.name not in repo_entities_names:
entities_to_delete.append(registry_entity)

entities_to_keep: List[Entity] = repo.entities
sys.dont_write_bytecode = False

existing_feature_services = registry.list_feature_services(project)
# Delete views that should not exist
for registry_view in views_to_delete:
registry.delete_feature_view(registry_view.name, project=project, commit=False)
click.echo(
f"Deleted feature view {Style.BRIGHT + Fore.GREEN}{registry_view.name}{Style.RESET_ALL} from registry"
)

sys.dont_write_bytecode = False
for entity in repo.entities:
registry.apply_entity(entity, project=project, commit=False)
# Delete feature services that should not exist
for feature_service_to_delete in services_to_delete:
registry.delete_feature_service(
feature_service_to_delete.name, project=project, commit=False
)
click.echo(
f"Registered entity {Style.BRIGHT + Fore.GREEN}{entity.name}{Style.RESET_ALL}"
f"Deleted feature service {Style.BRIGHT + Fore.GREEN}{feature_service_to_delete.name}{Style.RESET_ALL} "
f"from registry"
)

# Delete tables that should not exist
Expand All @@ -224,39 +178,34 @@ def apply_total(repo_config: RepoConfig, repo_path: Path, skip_source_validation
f"Deleted feature table {Style.BRIGHT + Fore.GREEN}{registry_table.name}{Style.RESET_ALL} from registry"
)

# Create tables that should
for table in repo.feature_tables:
registry.apply_feature_table(table, project, commit=False)
click.echo(
f"Registered feature table {Style.BRIGHT + Fore.GREEN}{table.name}{Style.RESET_ALL}"
)
# TODO: delete entities from the registry too

# Delete views that should not exist
for registry_view in views_to_delete:
registry.delete_feature_view(registry_view.name, project=project, commit=False)
# Add / update views + entities + services
all_to_apply: List[Union[Entity, FeatureView, FeatureService]] = []
all_to_apply.extend(entities_to_keep)
all_to_apply.extend(views_to_keep)
all_to_apply.extend(services_to_keep)
store.apply(all_to_apply, commit=False)
for entity in entities_to_keep:
click.echo(
f"Deleted feature view {Style.BRIGHT + Fore.GREEN}{registry_view.name}{Style.RESET_ALL} from registry"
f"Registered entity {Style.BRIGHT + Fore.GREEN}{entity.name}{Style.RESET_ALL}"
)

# Create views that should exist
for view in repo.feature_views:
registry.apply_feature_view(view, project, commit=False)
for view in views_to_keep:
click.echo(
f"Registered feature view {Style.BRIGHT + Fore.GREEN}{view.name}{Style.RESET_ALL}"
)

apply_feature_services(registry, project, repo, existing_feature_services)
for feature_service in services_to_keep:
click.echo(
f"Registered feature service {Style.BRIGHT + Fore.GREEN}{feature_service.name}{Style.RESET_ALL}"
)
# Create tables that should exist
for table in tables_to_keep:
registry.apply_feature_table(table, project, commit=False)
click.echo(
f"Registered feature table {Style.BRIGHT + Fore.GREEN}{table.name}{Style.RESET_ALL}"
)

infra_provider = get_provider(repo_config, repo_path)

all_to_delete: List[Union[FeatureTable, FeatureView]] = []
all_to_delete.extend(tables_to_delete)
all_to_delete.extend(views_to_delete)

all_to_keep: List[Union[FeatureTable, FeatureView]] = []
all_to_keep.extend(repo.feature_tables)
all_to_keep.extend(repo.feature_views)

for name in [view.name for view in repo.feature_tables] + [
table.name for table in repo.feature_views
]:
Expand All @@ -269,7 +218,14 @@ def apply_total(repo_config: RepoConfig, repo_path: Path, skip_source_validation
click.echo(
f"Removing infrastructure for {Style.BRIGHT + Fore.GREEN}{name}{Style.RESET_ALL}"
)
# TODO: consider echoing also entities being deployed/removed

all_to_delete: List[Union[FeatureTable, FeatureView]] = []
all_to_delete.extend(tables_to_delete)
all_to_delete.extend(views_to_delete)
all_to_keep: List[Union[FeatureTable, FeatureView]] = []
all_to_keep.extend(tables_to_keep)
all_to_keep.extend(views_to_delete)
infra_provider.update_infra(
project,
tables_to_delete=all_to_delete,
Expand All @@ -283,6 +239,54 @@ def apply_total(repo_config: RepoConfig, repo_path: Path, skip_source_validation
registry.commit()


def _tag_registry_entities_for_keep_delete(
project: str, registry: Registry, repo: ParsedRepo
) -> Tuple[List[Entity], List[Entity]]:
entities_to_keep: List[Entity] = repo.entities
entities_to_delete: List[Entity] = []
repo_entities_names = set([e.name for e in repo.entities])
for registry_entity in registry.list_entities(project=project):
if registry_entity.name not in repo_entities_names:
entities_to_delete.append(registry_entity)
return entities_to_keep, entities_to_delete


def _tag_registry_views_for_keep_delete(
adchia marked this conversation as resolved.
Show resolved Hide resolved
project: str, registry: Registry, repo: ParsedRepo
) -> Tuple[List[FeatureView], List[FeatureView]]:
views_to_keep: List[FeatureView] = repo.feature_views
views_to_delete: List[FeatureView] = []
repo_feature_view_names = set(t.name for t in repo.feature_views)
for registry_view in registry.list_feature_views(project=project):
if registry_view.name not in repo_feature_view_names:
views_to_delete.append(registry_view)
return views_to_keep, views_to_delete


def _tag_registry_tables_for_keep_delete(
project: str, registry: Registry, repo: ParsedRepo
) -> Tuple[List[FeatureTable], List[FeatureTable]]:
tables_to_keep: List[FeatureTable] = repo.feature_tables
tables_to_delete: List[FeatureTable] = []
repo_table_names = set(t.name for t in repo.feature_tables)
for registry_table in registry.list_feature_tables(project=project):
if registry_table.name not in repo_table_names:
tables_to_delete.append(registry_table)
return tables_to_keep, tables_to_delete


def _tag_registry_services_for_keep_delete(
project: str, registry: Registry, repo: ParsedRepo
) -> Tuple[List[FeatureService], List[FeatureService]]:
services_to_keep: List[FeatureService] = repo.feature_services
services_to_delete: List[FeatureService] = []
repo_feature_service_names = set(t.name for t in repo.feature_services)
for registry_service in registry.list_feature_services(project=project):
if registry_service.name not in repo_feature_service_names:
services_to_delete.append(registry_service)
return services_to_keep, services_to_delete


@log_exceptions_and_usage
def teardown(repo_config: RepoConfig, repo_path: Path):
# Cannot pass in both repo_path and repo_config to FeatureStore.
Expand Down