diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index 9f2cd433df..d6ad1bb50d 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -98,6 +98,11 @@ def version(self) -> str: """Returns the version of the current Feast SDK/CLI.""" return get_version() + @property + def registry(self) -> Registry: + """Gets the registry of this feature store.""" + return self._registry + @property def project(self) -> str: """Gets the project of this feature store.""" @@ -224,6 +229,19 @@ def delete_feature_view(self, name: str): """ return self._registry.delete_feature_view(name, self.project) + @log_exceptions_and_usage + def delete_feature_service(self, name: str): + """ + Deletes a feature service. + + Args: + name: Name of feature service. + + Raises: + FeatureServiceNotFoundException: The feature view could not be found. + """ + return self._registry.delete_feature_service(name, self.project) + def _get_features( self, features: Optional[Union[List[str], FeatureService]], @@ -252,6 +270,7 @@ def apply( FeatureService, List[Union[FeatureView, Entity, FeatureService]], ], + commit: bool = True, ): """Register objects to metadata store and update related infrastructure. @@ -262,6 +281,7 @@ def apply( Args: objects: A single object, or a list of objects that should be registered with the Feature Store. + commit: whether to commit changes to the registry Raises: ValueError: The 'objects' parameter could not be parsed properly. @@ -319,7 +339,6 @@ def apply( self._registry.apply_entity(ent, project=self.project, commit=False) for feature_service in services_to_update: self._registry.apply_feature_service(feature_service, project=self.project) - self._registry.commit() self._get_provider().update_infra( project=self.project, @@ -330,6 +349,9 @@ def apply( partial=True, ) + if commit: + self._registry.commit() + @log_exceptions_and_usage def teardown(self): """Tears down all local and cloud resources for the feature store.""" diff --git a/sdk/python/feast/repo_operations.py b/sdk/python/feast/repo_operations.py index 87c9d5754d..ea0e79931b 100644 --- a/sdk/python/feast/repo_operations.py +++ b/sdk/python/feast/repo_operations.py @@ -6,7 +6,7 @@ from datetime import timedelta from importlib.abc import Loader from pathlib import Path -from typing import List, NamedTuple, Set, Union +from typing import List, NamedTuple, Set, Tuple, Union import click from click.exceptions import BadParameter @@ -15,10 +15,6 @@ from feast.feature_service import FeatureService from feast.feature_store import FeatureStore, _validate_feature_views from feast.feature_view import FeatureView -from feast.inference import ( - update_data_sources_with_inferred_event_timestamp_col, - update_entities_with_inferred_types_from_feature_views, -) from feast.infra.provider import get_provider from feast.names import adjectives, animals from feast.registry import Registry @@ -116,103 +112,61 @@ def parse_repo(repo_root: Path) -> ParsedRepo: return res -def apply_feature_services( - registry: Registry, - project: str, - repo: ParsedRepo, - existing_feature_services: List[FeatureService], -): - from colorama import Fore, Style - - # Determine which feature services should be deleted. - for feature_service in repo.feature_services: - if feature_service in existing_feature_services: - existing_feature_services.remove(feature_service) - - # The remaining features services in the list should be deleted. - for feature_service_to_delete in existing_feature_services: - registry.delete_feature_service( - feature_service_to_delete.name, project, commit=False - ) - click.echo( - f"Deleted feature service {Style.BRIGHT + Fore.GREEN}{feature_service_to_delete.name}{Style.RESET_ALL} " - f"from registry" - ) - - for feature_service in repo.feature_services: - registry.apply_feature_service(feature_service, project=project) - click.echo( - f"Registered feature service {Style.BRIGHT + Fore.GREEN}{feature_service.name}{Style.RESET_ALL}" - ) - - @log_exceptions_and_usage def apply_total(repo_config: RepoConfig, repo_path: Path, skip_source_validation: bool): from colorama import Fore, Style os.chdir(repo_path) - registry_config = repo_config.get_registry_config() - project = repo_config.project + store = FeatureStore(repo_path=str(repo_path)) + project = store.project if not is_valid_name(project): print( f"{project} is not valid. Project name should only have " f"alphanumerical values and underscores but not start with an underscore." ) sys.exit(1) - registry = Registry( - registry_path=registry_config.path, - repo_path=repo_path, - cache_ttl=timedelta(seconds=registry_config.cache_ttl_seconds), - ) + registry = store.registry registry._initialize_registry() sys.dont_write_bytecode = True repo = parse_repo(repo_path) _validate_feature_views(repo.feature_views) - data_sources = [t.batch_source for t in repo.feature_views] if not skip_source_validation: + data_sources = [t.batch_source for t in repo.feature_views] # Make sure the data source used by this feature view is supported by Feast for data_source in data_sources: - data_source.validate(repo_config) + data_source.validate(store.config) - # Make inferences - update_entities_with_inferred_types_from_feature_views( - repo.entities, repo.feature_views, repo_config + entities_to_keep, entities_to_delete = _tag_registry_entities_for_keep_delete( + project, registry, repo + ) + views_to_keep, views_to_delete = _tag_registry_views_for_keep_delete( + project, registry, repo + ) + tables_to_keep, tables_to_delete = _tag_registry_tables_for_keep_delete( + project, registry, repo + ) + (services_to_keep, services_to_delete,) = _tag_registry_services_for_keep_delete( + project, registry, repo ) - update_data_sources_with_inferred_event_timestamp_col(data_sources, repo_config) - for view in repo.feature_views: - view.infer_features_from_batch_source(repo_config) - - repo_table_names = set(t.name for t in repo.feature_tables) - - for t in repo.feature_views: - repo_table_names.add(t.name) - - tables_to_delete = [] - for registry_table in registry.list_feature_tables(project=project): - if registry_table.name not in repo_table_names: - tables_to_delete.append(registry_table) - - views_to_delete = [] - for registry_view in registry.list_feature_views(project=project): - if registry_view.name not in repo_table_names: - views_to_delete.append(registry_view) - - entities_to_delete: List[Entity] = [] - repo_entities_names = set([e.name for e in repo.entities]) - for registry_entity in registry.list_entities(project=project): - if registry_entity.name not in repo_entities_names: - entities_to_delete.append(registry_entity) - entities_to_keep: List[Entity] = repo.entities + sys.dont_write_bytecode = False - existing_feature_services = registry.list_feature_services(project) + # Delete views that should not exist + for registry_view in views_to_delete: + registry.delete_feature_view(registry_view.name, project=project, commit=False) + click.echo( + f"Deleted feature view {Style.BRIGHT + Fore.GREEN}{registry_view.name}{Style.RESET_ALL} from registry" + ) - sys.dont_write_bytecode = False - for entity in repo.entities: - registry.apply_entity(entity, project=project, commit=False) + # Delete feature services that should not exist + for feature_service_to_delete in services_to_delete: + registry.delete_feature_service( + feature_service_to_delete.name, project=project, commit=False + ) click.echo( - f"Registered entity {Style.BRIGHT + Fore.GREEN}{entity.name}{Style.RESET_ALL}" + f"Deleted feature service {Style.BRIGHT + Fore.GREEN}{feature_service_to_delete.name}{Style.RESET_ALL} " + f"from registry" ) # Delete tables that should not exist @@ -224,39 +178,34 @@ def apply_total(repo_config: RepoConfig, repo_path: Path, skip_source_validation f"Deleted feature table {Style.BRIGHT + Fore.GREEN}{registry_table.name}{Style.RESET_ALL} from registry" ) - # Create tables that should - for table in repo.feature_tables: - registry.apply_feature_table(table, project, commit=False) - click.echo( - f"Registered feature table {Style.BRIGHT + Fore.GREEN}{table.name}{Style.RESET_ALL}" - ) + # TODO: delete entities from the registry too - # Delete views that should not exist - for registry_view in views_to_delete: - registry.delete_feature_view(registry_view.name, project=project, commit=False) + # Add / update views + entities + services + all_to_apply: List[Union[Entity, FeatureView, FeatureService]] = [] + all_to_apply.extend(entities_to_keep) + all_to_apply.extend(views_to_keep) + all_to_apply.extend(services_to_keep) + store.apply(all_to_apply, commit=False) + for entity in entities_to_keep: click.echo( - f"Deleted feature view {Style.BRIGHT + Fore.GREEN}{registry_view.name}{Style.RESET_ALL} from registry" + f"Registered entity {Style.BRIGHT + Fore.GREEN}{entity.name}{Style.RESET_ALL}" ) - - # Create views that should exist - for view in repo.feature_views: - registry.apply_feature_view(view, project, commit=False) + for view in views_to_keep: click.echo( f"Registered feature view {Style.BRIGHT + Fore.GREEN}{view.name}{Style.RESET_ALL}" ) - - apply_feature_services(registry, project, repo, existing_feature_services) + for feature_service in services_to_keep: + click.echo( + f"Registered feature service {Style.BRIGHT + Fore.GREEN}{feature_service.name}{Style.RESET_ALL}" + ) + # Create tables that should exist + for table in tables_to_keep: + registry.apply_feature_table(table, project, commit=False) + click.echo( + f"Registered feature table {Style.BRIGHT + Fore.GREEN}{table.name}{Style.RESET_ALL}" + ) infra_provider = get_provider(repo_config, repo_path) - - all_to_delete: List[Union[FeatureTable, FeatureView]] = [] - all_to_delete.extend(tables_to_delete) - all_to_delete.extend(views_to_delete) - - all_to_keep: List[Union[FeatureTable, FeatureView]] = [] - all_to_keep.extend(repo.feature_tables) - all_to_keep.extend(repo.feature_views) - for name in [view.name for view in repo.feature_tables] + [ table.name for table in repo.feature_views ]: @@ -269,7 +218,14 @@ def apply_total(repo_config: RepoConfig, repo_path: Path, skip_source_validation click.echo( f"Removing infrastructure for {Style.BRIGHT + Fore.GREEN}{name}{Style.RESET_ALL}" ) + # TODO: consider echoing also entities being deployed/removed + all_to_delete: List[Union[FeatureTable, FeatureView]] = [] + all_to_delete.extend(tables_to_delete) + all_to_delete.extend(views_to_delete) + all_to_keep: List[Union[FeatureTable, FeatureView]] = [] + all_to_keep.extend(tables_to_keep) + all_to_keep.extend(views_to_delete) infra_provider.update_infra( project, tables_to_delete=all_to_delete, @@ -283,6 +239,54 @@ def apply_total(repo_config: RepoConfig, repo_path: Path, skip_source_validation registry.commit() +def _tag_registry_entities_for_keep_delete( + project: str, registry: Registry, repo: ParsedRepo +) -> Tuple[List[Entity], List[Entity]]: + entities_to_keep: List[Entity] = repo.entities + entities_to_delete: List[Entity] = [] + repo_entities_names = set([e.name for e in repo.entities]) + for registry_entity in registry.list_entities(project=project): + if registry_entity.name not in repo_entities_names: + entities_to_delete.append(registry_entity) + return entities_to_keep, entities_to_delete + + +def _tag_registry_views_for_keep_delete( + project: str, registry: Registry, repo: ParsedRepo +) -> Tuple[List[FeatureView], List[FeatureView]]: + views_to_keep: List[FeatureView] = repo.feature_views + views_to_delete: List[FeatureView] = [] + repo_feature_view_names = set(t.name for t in repo.feature_views) + for registry_view in registry.list_feature_views(project=project): + if registry_view.name not in repo_feature_view_names: + views_to_delete.append(registry_view) + return views_to_keep, views_to_delete + + +def _tag_registry_tables_for_keep_delete( + project: str, registry: Registry, repo: ParsedRepo +) -> Tuple[List[FeatureTable], List[FeatureTable]]: + tables_to_keep: List[FeatureTable] = repo.feature_tables + tables_to_delete: List[FeatureTable] = [] + repo_table_names = set(t.name for t in repo.feature_tables) + for registry_table in registry.list_feature_tables(project=project): + if registry_table.name not in repo_table_names: + tables_to_delete.append(registry_table) + return tables_to_keep, tables_to_delete + + +def _tag_registry_services_for_keep_delete( + project: str, registry: Registry, repo: ParsedRepo +) -> Tuple[List[FeatureService], List[FeatureService]]: + services_to_keep: List[FeatureService] = repo.feature_services + services_to_delete: List[FeatureService] = [] + repo_feature_service_names = set(t.name for t in repo.feature_services) + for registry_service in registry.list_feature_services(project=project): + if registry_service.name not in repo_feature_service_names: + services_to_delete.append(registry_service) + return services_to_keep, services_to_delete + + @log_exceptions_and_usage def teardown(repo_config: RepoConfig, repo_path: Path): # Cannot pass in both repo_path and repo_config to FeatureStore.