From 6638d8db6cbf41b6c3104b8e439f40b2f37fa7fc Mon Sep 17 00:00:00 2001 From: Achal Shah Date: Tue, 14 Dec 2021 21:49:20 -0800 Subject: [PATCH 1/9] Print changes in the repo objects in the new style during feast apply Signed-off-by: Achal Shah --- sdk/python/feast/diff/FcoDiff.py | 2 + sdk/python/feast/feature_store.py | 15 ++++++ sdk/python/feast/registry.py | 75 ++++++++++++++++++++++------- sdk/python/feast/repo_operations.py | 55 +++++---------------- 4 files changed, 87 insertions(+), 60 deletions(-) diff --git a/sdk/python/feast/diff/FcoDiff.py b/sdk/python/feast/diff/FcoDiff.py index bb466c33e6..3f02559d08 100644 --- a/sdk/python/feast/diff/FcoDiff.py +++ b/sdk/python/feast/diff/FcoDiff.py @@ -27,6 +27,8 @@ class TransitionType(Enum): @dataclass class FcoDiff: + name: str + fco_type: str current_fco: Any new_fco: Any fco_property_diffs: List[PropertyDiff] diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index 7ab7817d61..413475db94 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -52,6 +52,7 @@ from feast.infra.provider import Provider, RetrievalJob, get_provider from feast.on_demand_feature_view import OnDemandFeatureView from feast.online_response import OnlineResponse, _infer_online_entity_rows +from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto from feast.protos.feast.serving.ServingService_pb2 import ( GetOnlineFeaturesRequestV2, GetOnlineFeaturesResponse, @@ -440,6 +441,12 @@ def apply( if not objects_to_delete: objects_to_delete = [] + current_registry_proto = ( + self._registry.cached_registry_proto.__deepcopy__() + if self._registry.cached_registry_proto + else RegistryProto() + ) + # Separate all objects into entities, feature services, and different feature view types. entities_to_update = [ob for ob in objects if isinstance(ob, Entity)] views_to_update = [ob for ob in objects if isinstance(ob, FeatureView)] @@ -562,8 +569,16 @@ def apply( partial=partial, ) + new_registry_proto = ( + self._registry.cached_registry_proto + if self._registry.cached_registry_proto + else RegistryProto() + ) + self._registry.commit() + return Registry.diff_between(current_registry_proto, new_registry_proto) + @log_exceptions_and_usage def teardown(self): """Tears down all local and cloud resources for the feature store.""" diff --git a/sdk/python/feast/registry.py b/sdk/python/feast/registry.py index 3a54568d45..761d09fb02 100644 --- a/sdk/python/feast/registry.py +++ b/sdk/python/feast/registry.py @@ -137,25 +137,66 @@ def diff_between( ) -> RegistryDiff: diff = RegistryDiff() - # Handle Entities - ( - entities_to_keep, - entities_to_delete, - entities_to_add, - ) = tag_proto_objects_for_keep_delete_add( - current_registry.entities, new_registry.entities, - ) + attribute_to_object_type_str = { + "entities": "Entity", + "feature_views": "Feature View", + "feature_tables": "Feature Table", + "on_demand_feature_views": "On Demand Feature View", + "request_feature_views": "Request Feature View", + "feature_services": "Feature Service", + } + + for object_type in [ + "entities", + "feature_views", + "feature_tables", + "on_demand_feature_views", + "request_feature_views", + "feature_services", + ]: + ( + objects_to_keep, + objects_to_delete, + objects_to_add, + ) = tag_proto_objects_for_keep_delete_add( + getattr(current_registry, object_type), + getattr(new_registry, object_type), + ) - for e in entities_to_add: - diff.add_fco_diff(FcoDiff(None, e, [], TransitionType.CREATE)) - for e in entities_to_delete: - diff.add_fco_diff(FcoDiff(e, None, [], TransitionType.DELETE)) + for e in objects_to_add: + diff.add_fco_diff( + FcoDiff( + e.spec.name, + attribute_to_object_type_str[object_type], + None, + e, + [], + TransitionType.CREATE, + ) + ) + for e in objects_to_delete: + diff.add_fco_diff( + FcoDiff( + e.spec.name, + attribute_to_object_type_str[object_type], + e, + None, + [], + TransitionType.DELETE, + ) + ) + for e in objects_to_keep: + diff.add_fco_diff( + FcoDiff( + e.spec.name, + attribute_to_object_type_str[object_type], + e, + e, + [], + TransitionType.UNCHANGED, + ) + ) - # Handle Feature Views - # Handle On Demand Feature Views - # Handle Request Feature Views - # Handle Feature Services - logger.info(f"Diff: {diff}") return diff def _initialize_registry(self): diff --git a/sdk/python/feast/repo_operations.py b/sdk/python/feast/repo_operations.py index ef0953feb2..598f9f2afd 100644 --- a/sdk/python/feast/repo_operations.py +++ b/sdk/python/feast/repo_operations.py @@ -12,7 +12,7 @@ from click.exceptions import BadParameter from feast.base_feature_view import BaseFeatureView -from feast.diff.FcoDiff import tag_objects_for_keep_delete_add +from feast.diff.FcoDiff import TransitionType, tag_objects_for_keep_delete_add from feast.entity import Entity from feast.feature_service import FeatureService from feast.feature_store import FeatureStore @@ -249,51 +249,20 @@ def apply_total(repo_config: RepoConfig, repo_path: Path, skip_source_validation all_to_delete.extend(odfvs_to_delete) all_to_delete.extend(tables_to_delete) - store.apply(all_to_apply, objects_to_delete=all_to_delete, partial=False) + diff = store.apply(all_to_apply, objects_to_delete=all_to_delete, partial=False) - for entity in entities_to_delete: - click.echo( - f"Deleted entity {Style.BRIGHT + Fore.GREEN}{entity.name}{Style.RESET_ALL} from registry" - ) - for view in base_views_to_delete: - click.echo( - f"Deleted feature view {Style.BRIGHT + Fore.GREEN}{view.name}{Style.RESET_ALL} from registry" - ) - for odfv in odfvs_to_delete: - click.echo( - f"Deleted on demand feature view {Style.BRIGHT + Fore.GREEN}{odfv.name}{Style.RESET_ALL} from registry" - ) - for table in tables_to_delete: - click.echo( - f"Deleted feature table {Style.BRIGHT + Fore.GREEN}{table.name}{Style.RESET_ALL} from registry" - ) - for feature_service in services_to_delete: - click.echo( - f"Deleted feature service {Style.BRIGHT + Fore.GREEN}{feature_service.name}{Style.RESET_ALL} " - f"from registry" - ) + message_action_map = { + TransitionType.CREATE: ("Created", Fore.GREEN), + TransitionType.DELETE: ("Deleted", Fore.RED), + TransitionType.UNCHANGED: ("Unchanged", Fore.LIGHTBLUE_EX), + } - for entity in entities_to_keep: - if entity.name != DUMMY_ENTITY_NAME: - click.echo( - f"Registered entity {Style.BRIGHT + Fore.GREEN}{entity.name}{Style.RESET_ALL}" - ) - for view in base_views_to_keep: - click.echo( - f"Registered feature view {Style.BRIGHT + Fore.GREEN}{view.name}{Style.RESET_ALL}" - ) - for odfv in odfvs_to_keep: - click.echo( - f"Registered on demand feature view {Style.BRIGHT + Fore.GREEN}{odfv.name}{Style.RESET_ALL}" - ) - for feature_service in services_to_keep: - click.echo( - f"Registered feature service {Style.BRIGHT + Fore.GREEN}{feature_service.name}{Style.RESET_ALL}" - ) - # Create tables that should exist - for table in tables_to_keep: + for fco_diff in diff.fco_diffs: + if fco_diff.name == DUMMY_ENTITY_NAME: + continue + action, color = message_action_map[fco_diff.transition_type] click.echo( - f"Registered feature table {Style.BRIGHT + Fore.GREEN}{table.name}{Style.RESET_ALL}" + f"{action} {fco_diff.fco_type} {Style.BRIGHT + color}{fco_diff.name}{Style.RESET_ALL}" ) views_to_keep_in_infra = [ From 74c303ce9fda25b2e3e8b82a609c0e630b7a95d3 Mon Sep 17 00:00:00 2001 From: Achal Shah Date: Tue, 14 Dec 2021 22:01:35 -0800 Subject: [PATCH 2/9] change color for deleted infra Signed-off-by: Achal Shah --- sdk/python/feast/repo_operations.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/python/feast/repo_operations.py b/sdk/python/feast/repo_operations.py index 598f9f2afd..d6bd23fcb4 100644 --- a/sdk/python/feast/repo_operations.py +++ b/sdk/python/feast/repo_operations.py @@ -281,7 +281,7 @@ def apply_total(repo_config: RepoConfig, repo_path: Path, skip_source_validation table.name for table in tables_to_delete ]: click.echo( - f"Removing infrastructure for {Style.BRIGHT + Fore.GREEN}{name}{Style.RESET_ALL}" + f"Removing infrastructure for {Style.BRIGHT + Fore.RED}{name}{Style.RESET_ALL}" ) # TODO: consider echoing also entities being deployed/removed From 9402c772740744fb3aa1c33f5bb1b6aac4fe3a08 Mon Sep 17 00:00:00 2001 From: Achal Shah Date: Wed, 15 Dec 2021 00:58:38 -0800 Subject: [PATCH 3/9] Add a feast plan command Signed-off-by: Achal Shah --- sdk/python/feast/cli.py | 23 ++- sdk/python/feast/feature_store.py | 237 +++++++++++++++++----------- sdk/python/feast/repo_operations.py | 66 +++++--- 3 files changed, 212 insertions(+), 114 deletions(-) diff --git a/sdk/python/feast/cli.py b/sdk/python/feast/cli.py index 6acd7ec55b..d1c980c461 100644 --- a/sdk/python/feast/cli.py +++ b/sdk/python/feast/cli.py @@ -35,7 +35,7 @@ generate_project_name, init_repo, registry_dump, - teardown, + teardown, plan, ) _logger = logging.getLogger(__name__) @@ -351,6 +351,27 @@ def on_demand_feature_view_list(ctx: click.Context): print(tabulate(table, headers=["NAME"], tablefmt="plain")) +@cli.command("plan", cls=NoOptionDefaultFormat) +@click.option( + "--skip-source-validation", + is_flag=True, + help="Don't validate the data sources by checking for that the tables exist.", +) +@click.pass_context +def plan_command(ctx: click.Context, skip_source_validation: bool): + """ + Create or update a feature store deployment + """ + repo = ctx.obj["CHDIR"] + cli_check_repo(repo) + repo_config = load_repo_config(repo) + try: + plan(repo_config, repo, skip_source_validation) + except FeastProviderLoginError as e: + print(str(e)) + + + @cli.command("apply", cls=NoOptionDefaultFormat) @click.option( "--skip-source-validation", diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index 413475db94..446aaa8d15 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -16,7 +16,7 @@ import os import warnings from collections import Counter, OrderedDict, defaultdict -from datetime import datetime +from datetime import datetime, timedelta from pathlib import Path from typing import Any, Dict, Iterable, List, Optional, Set, Tuple, Union, cast @@ -26,6 +26,7 @@ from feast import feature_server, flags, flags_helper, utils from feast.base_feature_view import BaseFeatureView +from feast.diff.FcoDiff import RegistryDiff, TransitionType from feast.entity import Entity from feast.errors import ( EntityNotFoundException, @@ -360,79 +361,39 @@ def _get_features(self, features: Union[List[str], FeatureService],) -> List[str return _feature_refs @log_exceptions_and_usage - def apply( - self, - objects: Union[ - Entity, - FeatureView, - OnDemandFeatureView, - RequestFeatureView, - FeatureService, - FeatureTable, - List[ - Union[ - FeatureView, - OnDemandFeatureView, - RequestFeatureView, - Entity, - FeatureService, - FeatureTable, - ] - ], - ], - objects_to_delete: Optional[ - List[ - Union[ - FeatureView, - OnDemandFeatureView, - RequestFeatureView, - Entity, - FeatureService, - FeatureTable, - ] - ] - ] = None, - partial: bool = True, - ): - """Register objects to metadata store and update related infrastructure. - - The apply method registers one or more definitions (e.g., Entity, FeatureView) and registers or updates these - objects in the Feast registry. Once the apply method has updated the infrastructure (e.g., create tables in - an online store), it will commit the updated registry. All operations are idempotent, meaning they can safely - be rerun. - - Args: - objects: A single object, or a list of objects that should be registered with the Feature Store. - objects_to_delete: A list of objects to be deleted from the registry and removed from the - provider's infrastructure. This deletion will only be performed if partial is set to False. - partial: If True, apply will only handle the specified objects; if False, apply will also delete - all the objects in objects_to_delete, and tear down any associated cloud resources. - - Raises: - ValueError: The 'objects' parameter could not be parsed properly. - - Examples: - Register an Entity and a FeatureView. - - >>> from feast import FeatureStore, Entity, FeatureView, Feature, ValueType, FileSource, RepoConfig - >>> from datetime import timedelta - >>> fs = FeatureStore(repo_path="feature_repo") - >>> driver = Entity(name="driver_id", value_type=ValueType.INT64, description="driver id") - >>> driver_hourly_stats = FileSource( - ... path="feature_repo/data/driver_stats.parquet", - ... event_timestamp_column="event_timestamp", - ... created_timestamp_column="created", - ... ) - >>> driver_hourly_stats_view = FeatureView( - ... name="driver_hourly_stats", - ... entities=["driver_id"], - ... ttl=timedelta(seconds=86400 * 1), - ... batch_source=driver_hourly_stats, - ... ) - >>> fs.apply([driver_hourly_stats_view, driver]) # register entity and feature view - """ - # TODO: Add locking - + def plan(self, + objects: Union[ + Entity, + FeatureView, + OnDemandFeatureView, + RequestFeatureView, + FeatureService, + FeatureTable, + List[ + Union[ + FeatureView, + OnDemandFeatureView, + RequestFeatureView, + Entity, + FeatureService, + FeatureTable, + ] + ], + ], + objects_to_delete: Optional[ + List[ + Union[ + FeatureView, + OnDemandFeatureView, + RequestFeatureView, + Entity, + FeatureService, + FeatureTable, + ] + ] + ] = None, + partial: bool = True, + ) -> Tuple[Registry, RegistryDiff]: if not isinstance(objects, Iterable): objects = [objects] @@ -446,6 +407,8 @@ def apply( if self._registry.cached_registry_proto else RegistryProto() ) + new_registry = copy.deepcopy(self._registry) + new_registry.cached_registry_proto_ttl = timedelta() # Separate all objects into entities, feature services, and different feature view types. entities_to_update = [ob for ob in objects if isinstance(ob, Entity)] @@ -458,7 +421,7 @@ def apply( tables_to_update = [ob for ob in objects if isinstance(ob, FeatureTable)] if len(entities_to_update) + len(views_to_update) + len( - request_views_to_update + request_views_to_update ) + len(odfvs_to_update) + len(services_to_update) + len( tables_to_update ) != len( @@ -468,8 +431,8 @@ def apply( # Validate all types of feature views. if ( - not flags_helper.enable_on_demand_feature_views(self.config) - and len(odfvs_to_update) > 0 + not flags_helper.enable_on_demand_feature_views(self.config) + and len(odfvs_to_update) > 0 ): raise ExperimentalFeatureNotEnabled(flags.FLAG_ON_DEMAND_TRANSFORM_NAME) @@ -502,15 +465,15 @@ def apply( for view in itertools.chain( views_to_update, odfvs_to_update, request_views_to_update ): - self._registry.apply_feature_view(view, project=self.project, commit=False) + new_registry.apply_feature_view(view, project=self.project, commit=False) for ent in entities_to_update: - self._registry.apply_entity(ent, project=self.project, commit=False) + new_registry.apply_entity(ent, project=self.project, commit=False) for feature_service in services_to_update: - self._registry.apply_feature_service( + new_registry.apply_feature_service( feature_service, project=self.project, commit=False ) for table in tables_to_update: - self._registry.apply_feature_table( + new_registry.apply_feature_table( table, project=self.project, commit=False ) @@ -536,30 +499,123 @@ def apply( ] for entity in entities_to_delete: - self._registry.delete_entity( + new_registry.delete_entity( entity.name, project=self.project, commit=False ) for view in views_to_delete: - self._registry.delete_feature_view( + new_registry.delete_feature_view( view.name, project=self.project, commit=False ) for request_view in request_views_to_delete: - self._registry.delete_feature_view( + new_registry.delete_feature_view( request_view.name, project=self.project, commit=False ) for odfv in odfvs_to_delete: - self._registry.delete_feature_view( + new_registry.delete_feature_view( odfv.name, project=self.project, commit=False ) for service in services_to_delete: - self._registry.delete_feature_service( + new_registry.delete_feature_service( service.name, project=self.project, commit=False ) for table in tables_to_delete: - self._registry.delete_feature_table( + new_registry.delete_feature_table( table.name, project=self.project, commit=False ) + new_registry_proto = ( + new_registry.cached_registry_proto + if new_registry.cached_registry_proto + else RegistryProto() + ) + + return new_registry, Registry.diff_between(current_registry_proto, new_registry_proto) + + @log_exceptions_and_usage + def apply( + self, + objects: Union[ + Entity, + FeatureView, + OnDemandFeatureView, + RequestFeatureView, + FeatureService, + FeatureTable, + List[ + Union[ + FeatureView, + OnDemandFeatureView, + RequestFeatureView, + Entity, + FeatureService, + FeatureTable, + ] + ], + ], + objects_to_delete: Optional[ + List[ + Union[ + FeatureView, + OnDemandFeatureView, + RequestFeatureView, + Entity, + FeatureService, + FeatureTable, + ] + ] + ] = None, + partial: bool = True, + ): + """Register objects to metadata store and update related infrastructure. + + The apply method registers one or more definitions (e.g., Entity, FeatureView) and registers or updates these + objects in the Feast registry. Once the apply method has updated the infrastructure (e.g., create tables in + an online store), it will commit the updated registry. All operations are idempotent, meaning they can safely + be rerun. + + Args: + objects: A single object, or a list of objects that should be registered with the Feature Store. + objects_to_delete: A list of objects to be deleted from the registry and removed from the + provider's infrastructure. This deletion will only be performed if partial is set to False. + partial: If True, apply will only handle the specified objects; if False, apply will also delete + all the objects in objects_to_delete, and tear down any associated cloud resources. + + Raises: + ValueError: The 'objects' parameter could not be parsed properly. + + Examples: + Register an Entity and a FeatureView. + + >>> from feast import FeatureStore, Entity, FeatureView, Feature, ValueType, FileSource, RepoConfig + >>> from datetime import timedelta + >>> fs = FeatureStore(repo_path="feature_repo") + >>> driver = Entity(name="driver_id", value_type=ValueType.INT64, description="driver id") + >>> driver_hourly_stats = FileSource( + ... path="feature_repo/data/driver_stats.parquet", + ... event_timestamp_column="event_timestamp", + ... created_timestamp_column="created", + ... ) + >>> driver_hourly_stats_view = FeatureView( + ... name="driver_hourly_stats", + ... entities=["driver_id"], + ... ttl=timedelta(seconds=86400 * 1), + ... batch_source=driver_hourly_stats, + ... ) + >>> fs.apply([driver_hourly_stats_view, driver]) # register entity and feature view + """ + # TODO: Add locking + new_registry, diffs = self.plan(objects, objects_to_delete, partial) + new_registry.cached_registry_proto_ttl = self._registry.cached_registry_proto_ttl + self._registry = new_registry + + entities_to_update = [ob for ob in objects if isinstance(ob, Entity)] + views_to_update = [ob for ob in objects if isinstance(ob, FeatureView)] + tables_to_update = [ob for ob in objects if isinstance(ob, FeatureTable)] + + entities_to_delete = [ob for ob in objects_to_delete if isinstance(ob, Entity)] + views_to_delete = [ob for ob in objects_to_delete if isinstance(ob, FeatureView)] + tables_to_delete = [ob for ob in objects_to_delete if isinstance(ob, FeatureTable)] + self._get_provider().update_infra( project=self.project, tables_to_delete=views_to_delete + tables_to_delete if not partial else [], @@ -569,15 +625,8 @@ def apply( partial=partial, ) - new_registry_proto = ( - self._registry.cached_registry_proto - if self._registry.cached_registry_proto - else RegistryProto() - ) - self._registry.commit() - return Registry.diff_between(current_registry_proto, new_registry_proto) @log_exceptions_and_usage def teardown(self): diff --git a/sdk/python/feast/repo_operations.py b/sdk/python/feast/repo_operations.py index d6bd23fcb4..588aa630c4 100644 --- a/sdk/python/feast/repo_operations.py +++ b/sdk/python/feast/repo_operations.py @@ -129,10 +129,28 @@ def parse_repo(repo_root: Path) -> ParsedRepo: @log_exceptions_and_usage -def apply_total(repo_config: RepoConfig, repo_path: Path, skip_source_validation: bool): - from colorama import Fore, Style +def plan(repo_config: RepoConfig, repo_path: Path, skip_source_validation: bool): os.chdir(repo_path) + project, registry, repo, store = _prepare_registry_and_repo(repo_config, repo_path) + + if not skip_source_validation: + data_sources = [t.batch_source for t in repo.feature_views] + # Make sure the data source used by this feature view is supported by Feast + for data_source in data_sources: + data_source.validate(store.config) + + # For each object in the registry, determine whether it should be kept or deleted, + # and whether new objects need to be added. + all_to_apply, all_to_delete, tables_to_delete, views_to_delete, views_to_keep = extract_objects_for_apply_delete( + project, registry, repo) + + _, diff = store.plan(all_to_apply, objects_to_delete=all_to_delete, partial=False) + + log_cli_output(diff, repo, tables_to_delete, views_to_delete, views_to_keep) + + +def _prepare_registry_and_repo(repo_config, repo_path): store = FeatureStore(config=repo_config) project = store.project if not is_valid_name(project): @@ -145,14 +163,10 @@ def apply_total(repo_config: RepoConfig, repo_path: Path, skip_source_validation registry._initialize_registry() sys.dont_write_bytecode = True repo = parse_repo(repo_path) + return project, registry, repo, store - if not skip_source_validation: - data_sources = [t.batch_source for t in repo.feature_views] - # Make sure the data source used by this feature view is supported by Feast - for data_source in data_sources: - data_source.validate(store.config) - # For each object in the registry, determine whether it should be kept or deleted. +def extract_objects_for_apply_delete(project, registry, repo): ( entities_to_keep, entities_to_delete, @@ -162,7 +176,6 @@ def apply_total(repo_config: RepoConfig, repo_path: Path, skip_source_validation ) # TODO(achals): This code path should be refactored to handle added & kept entities separately. entities_to_keep = set(entities_to_keep).union(entities_to_add) - views = tag_objects_for_keep_delete_add( set(registry.list_feature_views(project=project)), repo.feature_views ) @@ -171,7 +184,6 @@ def apply_total(repo_config: RepoConfig, repo_path: Path, skip_source_validation cast(Set[FeatureView], views[1]), cast(Set[FeatureView], views[2]), ) - request_views = tag_objects_for_keep_delete_add( set(registry.list_request_feature_views(project=project)), repo.request_feature_views, @@ -184,7 +196,6 @@ def apply_total(repo_config: RepoConfig, repo_path: Path, skip_source_validation cast(Set[RequestFeatureView], request_views[1]), cast(Set[RequestFeatureView], request_views[2]), ) - base_views_to_keep: Set[Union[RequestFeatureView, FeatureView]] = { *views_to_keep, *views_to_add, @@ -195,7 +206,6 @@ def apply_total(repo_config: RepoConfig, repo_path: Path, skip_source_validation *views_to_delete, *request_views_to_delete, } - odfvs = tag_objects_for_keep_delete_add( set(registry.list_on_demand_feature_views(project=project)), repo.on_demand_feature_views, @@ -206,7 +216,6 @@ def apply_total(repo_config: RepoConfig, repo_path: Path, skip_source_validation cast(Set[OnDemandFeatureView], odfvs[2]), ) odfvs_to_keep = odfvs_to_keep.union(odfvs_to_add) - ( tables_to_keep, tables_to_delete, @@ -215,7 +224,6 @@ def apply_total(repo_config: RepoConfig, repo_path: Path, skip_source_validation set(registry.list_feature_tables(project=project)), repo.feature_tables ) tables_to_keep = tables_to_keep.union(tables_to_add) - ( services_to_keep, services_to_delete, @@ -224,9 +232,7 @@ def apply_total(repo_config: RepoConfig, repo_path: Path, skip_source_validation set(registry.list_feature_services(project=project)), repo.feature_services ) services_to_keep = services_to_keep.union(services_to_add) - sys.dont_write_bytecode = False - # Apply all changes to the registry and infrastructure. all_to_apply: List[ Union[ @@ -248,15 +254,39 @@ def apply_total(repo_config: RepoConfig, repo_path: Path, skip_source_validation all_to_delete.extend(services_to_delete) all_to_delete.extend(odfvs_to_delete) all_to_delete.extend(tables_to_delete) + return all_to_apply, all_to_delete, tables_to_delete, views_to_delete, views_to_keep + + + +@log_exceptions_and_usage +def apply_total(repo_config: RepoConfig, repo_path: Path, skip_source_validation: bool): + + os.chdir(repo_path) + project, registry, repo, store = _prepare_registry_and_repo(repo_config, repo_path) + + if not skip_source_validation: + data_sources = [t.batch_source for t in repo.feature_views] + # Make sure the data source used by this feature view is supported by Feast + for data_source in data_sources: + data_source.validate(store.config) + + # For each object in the registry, determine whether it should be kept or deleted. + all_to_apply, all_to_delete, tables_to_delete, views_to_delete, views_to_keep = extract_objects_for_apply_delete( + project, registry, repo) diff = store.apply(all_to_apply, objects_to_delete=all_to_delete, partial=False) + log_cli_output(diff, repo, tables_to_delete, views_to_delete, views_to_keep) + + +def log_cli_output(diff, repo, tables_to_delete, views_to_delete, views_to_keep): + from colorama import Fore, Style + message_action_map = { TransitionType.CREATE: ("Created", Fore.GREEN), TransitionType.DELETE: ("Deleted", Fore.RED), TransitionType.UNCHANGED: ("Unchanged", Fore.LIGHTBLUE_EX), } - for fco_diff in diff.fco_diffs: if fco_diff.name == DUMMY_ENTITY_NAME: continue @@ -264,7 +294,6 @@ def apply_total(repo_config: RepoConfig, repo_path: Path, skip_source_validation click.echo( f"{action} {fco_diff.fco_type} {Style.BRIGHT + color}{fco_diff.name}{Style.RESET_ALL}" ) - views_to_keep_in_infra = [ view for view in views_to_keep if isinstance(view, FeatureView) ] @@ -283,7 +312,6 @@ def apply_total(repo_config: RepoConfig, repo_path: Path, skip_source_validation click.echo( f"Removing infrastructure for {Style.BRIGHT + Fore.RED}{name}{Style.RESET_ALL}" ) - # TODO: consider echoing also entities being deployed/removed @log_exceptions_and_usage From 314b4af401ac92cf6999b40119774421ef4dd4ae Mon Sep 17 00:00:00 2001 From: Achal Shah Date: Wed, 15 Dec 2021 01:08:16 -0800 Subject: [PATCH 4/9] Add a feast plan command Signed-off-by: Achal Shah --- sdk/python/feast/cli.py | 4 +- sdk/python/feast/feature_store.py | 161 +++++++++++------- sdk/python/feast/registry.py | 12 +- sdk/python/feast/repo_operations.py | 19 ++- ..._cli_apply_duplicated_featureview_names.py | 2 +- 5 files changed, 123 insertions(+), 75 deletions(-) diff --git a/sdk/python/feast/cli.py b/sdk/python/feast/cli.py index d1c980c461..1997b12e3b 100644 --- a/sdk/python/feast/cli.py +++ b/sdk/python/feast/cli.py @@ -34,8 +34,9 @@ cli_check_repo, generate_project_name, init_repo, + plan, registry_dump, - teardown, plan, + teardown, ) _logger = logging.getLogger(__name__) @@ -371,7 +372,6 @@ def plan_command(ctx: click.Context, skip_source_validation: bool): print(str(e)) - @cli.command("apply", cls=NoOptionDefaultFormat) @click.option( "--skip-source-validation", diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index 446aaa8d15..6c702d02ba 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -26,7 +26,7 @@ from feast import feature_server, flags, flags_helper, utils from feast.base_feature_view import BaseFeatureView -from feast.diff.FcoDiff import RegistryDiff, TransitionType +from feast.diff.FcoDiff import RegistryDiff from feast.entity import Entity from feast.errors import ( EntityNotFoundException, @@ -361,39 +361,76 @@ def _get_features(self, features: Union[List[str], FeatureService],) -> List[str return _feature_refs @log_exceptions_and_usage - def plan(self, - objects: Union[ - Entity, - FeatureView, - OnDemandFeatureView, - RequestFeatureView, - FeatureService, - FeatureTable, - List[ - Union[ - FeatureView, - OnDemandFeatureView, - RequestFeatureView, - Entity, - FeatureService, - FeatureTable, - ] - ], - ], - objects_to_delete: Optional[ - List[ - Union[ - FeatureView, - OnDemandFeatureView, - RequestFeatureView, - Entity, - FeatureService, - FeatureTable, - ] - ] - ] = None, - partial: bool = True, + def plan( + self, + objects: Union[ + Entity, + FeatureView, + OnDemandFeatureView, + RequestFeatureView, + FeatureService, + FeatureTable, + List[ + Union[ + FeatureView, + OnDemandFeatureView, + RequestFeatureView, + Entity, + FeatureService, + FeatureTable, + ] + ], + ], + objects_to_delete: Optional[ + List[ + Union[ + FeatureView, + OnDemandFeatureView, + RequestFeatureView, + Entity, + FeatureService, + FeatureTable, + ] + ] + ] = None, + partial: bool = True, ) -> Tuple[Registry, RegistryDiff]: + """Dry-run registering objects to metadata store. + + The plan method dry-runs registering one or more definitions (e.g., Entity, FeatureView), and produces + a list of all the changes the that would be introduced in the feature repo. The changes computed by the plan + command are for informational purpose, and are not actually applied to the registry. + + Args: + objects: A single object, or a list of objects that are intended to be registered with the Feature Store. + objects_to_delete: A list of objects to be deleted from the registry. + partial: If True, apply will only handle the specified objects; if False, apply will also delete + all the objects in objects_to_delete. + + Raises: + ValueError: The 'objects' parameter could not be parsed properly. + + Examples: + Generate a plan adding an Entity and a FeatureView. + + >>> from feast import FeatureStore, Entity, FeatureView, Feature, ValueType, FileSource, RepoConfig + >>> from datetime import timedelta + >>> fs = FeatureStore(repo_path="feature_repo") + >>> driver = Entity(name="driver_id", value_type=ValueType.INT64, description="driver id") + >>> driver_hourly_stats = FileSource( + ... path="feature_repo/data/driver_stats.parquet", + ... event_timestamp_column="event_timestamp", + ... created_timestamp_column="created", + ... ) + >>> driver_hourly_stats_view = FeatureView( + ... name="driver_hourly_stats", + ... entities=["driver_id"], + ... ttl=timedelta(seconds=86400 * 1), + ... batch_source=driver_hourly_stats, + ... ) + >>> fs.plan([driver_hourly_stats_view, driver]) # register entity and feature view + """ + if not isinstance(objects, Iterable): objects = [objects] @@ -421,7 +458,7 @@ def plan(self, tables_to_update = [ob for ob in objects if isinstance(ob, FeatureTable)] if len(entities_to_update) + len(views_to_update) + len( - request_views_to_update + request_views_to_update ) + len(odfvs_to_update) + len(services_to_update) + len( tables_to_update ) != len( @@ -431,8 +468,8 @@ def plan(self, # Validate all types of feature views. if ( - not flags_helper.enable_on_demand_feature_views(self.config) - and len(odfvs_to_update) > 0 + not flags_helper.enable_on_demand_feature_views(self.config) + and len(odfvs_to_update) > 0 ): raise ExperimentalFeatureNotEnabled(flags.FLAG_ON_DEMAND_TRANSFORM_NAME) @@ -473,9 +510,7 @@ def plan(self, feature_service, project=self.project, commit=False ) for table in tables_to_update: - new_registry.apply_feature_table( - table, project=self.project, commit=False - ) + new_registry.apply_feature_table(table, project=self.project, commit=False) if not partial: # Delete all registry objects that should not exist. @@ -529,28 +564,23 @@ def plan(self, else RegistryProto() ) - return new_registry, Registry.diff_between(current_registry_proto, new_registry_proto) + return ( + new_registry, + Registry.diff_between(current_registry_proto, new_registry_proto), + ) @log_exceptions_and_usage def apply( self, - objects: Union[ - Entity, - FeatureView, - OnDemandFeatureView, - RequestFeatureView, - FeatureService, - FeatureTable, - List[ - Union[ - FeatureView, - OnDemandFeatureView, - RequestFeatureView, - Entity, - FeatureService, - FeatureTable, - ] - ], + objects: List[ + Union[ + FeatureView, + OnDemandFeatureView, + RequestFeatureView, + Entity, + FeatureService, + FeatureTable, + ] ], objects_to_delete: Optional[ List[ @@ -604,8 +634,14 @@ def apply( >>> fs.apply([driver_hourly_stats_view, driver]) # register entity and feature view """ # TODO: Add locking + + if not objects_to_delete: + objects_to_delete = [] + new_registry, diffs = self.plan(objects, objects_to_delete, partial) - new_registry.cached_registry_proto_ttl = self._registry.cached_registry_proto_ttl + new_registry.cached_registry_proto_ttl = ( + self._registry.cached_registry_proto_ttl + ) self._registry = new_registry entities_to_update = [ob for ob in objects if isinstance(ob, Entity)] @@ -613,8 +649,12 @@ def apply( tables_to_update = [ob for ob in objects if isinstance(ob, FeatureTable)] entities_to_delete = [ob for ob in objects_to_delete if isinstance(ob, Entity)] - views_to_delete = [ob for ob in objects_to_delete if isinstance(ob, FeatureView)] - tables_to_delete = [ob for ob in objects_to_delete if isinstance(ob, FeatureTable)] + views_to_delete = [ + ob for ob in objects_to_delete if isinstance(ob, FeatureView) + ] + tables_to_delete = [ + ob for ob in objects_to_delete if isinstance(ob, FeatureTable) + ] self._get_provider().update_infra( project=self.project, @@ -627,7 +667,6 @@ def apply( self._registry.commit() - @log_exceptions_and_usage def teardown(self): """Tears down all local and cloud resources for the feature store.""" diff --git a/sdk/python/feast/registry.py b/sdk/python/feast/registry.py index 761d09fb02..176ab5ca91 100644 --- a/sdk/python/feast/registry.py +++ b/sdk/python/feast/registry.py @@ -138,12 +138,12 @@ def diff_between( diff = RegistryDiff() attribute_to_object_type_str = { - "entities": "Entity", - "feature_views": "Feature View", - "feature_tables": "Feature Table", - "on_demand_feature_views": "On Demand Feature View", - "request_feature_views": "Request Feature View", - "feature_services": "Feature Service", + "entities": "entity", + "feature_views": "feature view", + "feature_tables": "feature table", + "on_demand_feature_views": "on demand feature view", + "request_feature_views": "request feature view", + "feature_services": "feature service", } for object_type in [ diff --git a/sdk/python/feast/repo_operations.py b/sdk/python/feast/repo_operations.py index 588aa630c4..0ccdd5d232 100644 --- a/sdk/python/feast/repo_operations.py +++ b/sdk/python/feast/repo_operations.py @@ -142,8 +142,13 @@ def plan(repo_config: RepoConfig, repo_path: Path, skip_source_validation: bool) # For each object in the registry, determine whether it should be kept or deleted, # and whether new objects need to be added. - all_to_apply, all_to_delete, tables_to_delete, views_to_delete, views_to_keep = extract_objects_for_apply_delete( - project, registry, repo) + ( + all_to_apply, + all_to_delete, + tables_to_delete, + views_to_delete, + views_to_keep, + ) = extract_objects_for_apply_delete(project, registry, repo) _, diff = store.plan(all_to_apply, objects_to_delete=all_to_delete, partial=False) @@ -257,7 +262,6 @@ def extract_objects_for_apply_delete(project, registry, repo): return all_to_apply, all_to_delete, tables_to_delete, views_to_delete, views_to_keep - @log_exceptions_and_usage def apply_total(repo_config: RepoConfig, repo_path: Path, skip_source_validation: bool): @@ -271,8 +275,13 @@ def apply_total(repo_config: RepoConfig, repo_path: Path, skip_source_validation data_source.validate(store.config) # For each object in the registry, determine whether it should be kept or deleted. - all_to_apply, all_to_delete, tables_to_delete, views_to_delete, views_to_keep = extract_objects_for_apply_delete( - project, registry, repo) + ( + all_to_apply, + all_to_delete, + tables_to_delete, + views_to_delete, + views_to_keep, + ) = extract_objects_for_apply_delete(project, registry, repo) diff = store.apply(all_to_apply, objects_to_delete=all_to_delete, partial=False) diff --git a/sdk/python/tests/integration/registration/test_cli_apply_duplicated_featureview_names.py b/sdk/python/tests/integration/registration/test_cli_apply_duplicated_featureview_names.py index 2b5ba2dcb0..6987066e8d 100644 --- a/sdk/python/tests/integration/registration/test_cli_apply_duplicated_featureview_names.py +++ b/sdk/python/tests/integration/registration/test_cli_apply_duplicated_featureview_names.py @@ -85,7 +85,7 @@ def test_cli_apply_imported_featureview() -> None: rc, output = runner.run_with_output(["apply"], cwd=repo_path) assert rc == 0 - assert b"Registered feature service driver_locations_service" in output + assert b"Created feature service driver_locations_service" in output def test_cli_apply_imported_featureview_with_duplication() -> None: From 5880db87d55ee8f356371b87ba0020c3a9dfddae Mon Sep 17 00:00:00 2001 From: Achal Shah Date: Wed, 15 Dec 2021 01:14:21 -0800 Subject: [PATCH 5/9] return from apply() Signed-off-by: Achal Shah --- sdk/python/feast/feature_store.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index 6c702d02ba..7fdd2f3da2 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -667,6 +667,8 @@ def apply( self._registry.commit() + return diffs + @log_exceptions_and_usage def teardown(self): """Tears down all local and cloud resources for the feature store.""" From 24adf1d7247df349d7b345f152be4234db81f96d Mon Sep 17 00:00:00 2001 From: Achal Shah Date: Wed, 15 Dec 2021 10:26:30 -0800 Subject: [PATCH 6/9] Fix errors in doctests Signed-off-by: Achal Shah --- sdk/python/feast/feature_store.py | 32 ++++++++++++++++++---------- sdk/python/tests/doctest/test_all.py | 4 +++- 2 files changed, 24 insertions(+), 12 deletions(-) diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index 7fdd2f3da2..be87c5b038 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -428,7 +428,7 @@ def plan( ... ttl=timedelta(seconds=86400 * 1), ... batch_source=driver_hourly_stats, ... ) - >>> fs.plan([driver_hourly_stats_view, driver]) # register entity and feature view + >>> registry, diff = fs.plan([driver_hourly_stats_view, driver]) # register entity and feature view """ if not isinstance(objects, Iterable): @@ -572,15 +572,23 @@ def plan( @log_exceptions_and_usage def apply( self, - objects: List[ - Union[ - FeatureView, - OnDemandFeatureView, - RequestFeatureView, - Entity, - FeatureService, - FeatureTable, - ] + objects: Union[ + Entity, + FeatureView, + OnDemandFeatureView, + RequestFeatureView, + FeatureService, + FeatureTable, + List[ + Union[ + FeatureView, + OnDemandFeatureView, + RequestFeatureView, + Entity, + FeatureService, + FeatureTable, + ] + ], ], objects_to_delete: Optional[ List[ @@ -631,9 +639,11 @@ def apply( ... ttl=timedelta(seconds=86400 * 1), ... batch_source=driver_hourly_stats, ... ) - >>> fs.apply([driver_hourly_stats_view, driver]) # register entity and feature view + >>> diff = fs.apply([driver_hourly_stats_view, driver]) # register entity and feature view """ # TODO: Add locking + if not isinstance(objects, Iterable): + objects = [objects] if not objects_to_delete: objects_to_delete = [] diff --git a/sdk/python/tests/doctest/test_all.py b/sdk/python/tests/doctest/test_all.py index c406303f00..7507805b03 100644 --- a/sdk/python/tests/doctest/test_all.py +++ b/sdk/python/tests/doctest/test_all.py @@ -59,6 +59,7 @@ def test_docstrings(): """ successful = True current_packages = [feast] + failed_cases = [] while current_packages: next_packages = [] @@ -94,6 +95,7 @@ def test_docstrings(): result = unittest.TextTestRunner(sys.stdout).run(test_suite) if not result.wasSuccessful(): successful = False + failed_cases.append(result.failures) except Exception: successful = False finally: @@ -103,4 +105,4 @@ def test_docstrings(): current_packages = next_packages if not successful: - raise Exception("Docstring tests failed.") + raise Exception(f"Docstring tests failed. Failed results: {failed_cases}") From 2bfe5a16eae544f03acc03d7ab4faa0b462a89f2 Mon Sep 17 00:00:00 2001 From: Achal Shah Date: Wed, 15 Dec 2021 14:09:29 -0800 Subject: [PATCH 7/9] Fix deepcopy and use a clone method instead Signed-off-by: Achal Shah --- sdk/python/feast/feature_store.py | 5 ++--- sdk/python/feast/registry.py | 12 ++++++++++++ 2 files changed, 14 insertions(+), 3 deletions(-) diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index be87c5b038..00f7d27bb3 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -16,7 +16,7 @@ import os import warnings from collections import Counter, OrderedDict, defaultdict -from datetime import datetime, timedelta +from datetime import datetime from pathlib import Path from typing import Any, Dict, Iterable, List, Optional, Set, Tuple, Union, cast @@ -444,8 +444,7 @@ def plan( if self._registry.cached_registry_proto else RegistryProto() ) - new_registry = copy.deepcopy(self._registry) - new_registry.cached_registry_proto_ttl = timedelta() + new_registry = self._registry.clone() # Separate all objects into entities, feature services, and different feature view types. entities_to_update = [ob for ob in objects if isinstance(ob, Entity)] diff --git a/sdk/python/feast/registry.py b/sdk/python/feast/registry.py index 176ab5ca91..5a1b789cb0 100644 --- a/sdk/python/feast/registry.py +++ b/sdk/python/feast/registry.py @@ -130,6 +130,18 @@ def __init__( else 0 ) + def clone(self) -> "Registry": + new_registry = Registry(None, None) + new_registry.cached_registry_proto_ttl = timedelta(seconds=0) + new_registry.cached_registry_proto = ( + self.cached_registry_proto.__deepcopy__() + if self.cached_registry_proto + else RegistryProto() + ) + new_registry.cached_registry_proto_created = datetime.utcnow() + new_registry._registry_store = None + return new_registry + # TODO(achals): This method needs to be filled out and used in the feast plan/apply methods. @staticmethod def diff_between( From 40f6292116ededa7ff5285c4c79dcc19ec3e6eeb Mon Sep 17 00:00:00 2001 From: Achal Shah Date: Wed, 15 Dec 2021 14:47:25 -0800 Subject: [PATCH 8/9] Fix registry clone Signed-off-by: Achal Shah --- sdk/python/feast/feature_store.py | 1 + sdk/python/feast/registry.py | 3 ++- sdk/python/feast/registry_store.py | 11 +++++++++++ sdk/python/tests/doctest/test_all.py | 3 ++- 4 files changed, 16 insertions(+), 2 deletions(-) diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index 07f59f622c..7b648b9f1e 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -630,6 +630,7 @@ def apply( new_registry.cached_registry_proto_ttl = ( self._registry.cached_registry_proto_ttl ) + new_registry._registry_store = self._registry._registry_store self._registry = new_registry entities_to_update = [ob for ob in objects if isinstance(ob, Entity)] diff --git a/sdk/python/feast/registry.py b/sdk/python/feast/registry.py index 8f621aa40d..bfc1e9a336 100644 --- a/sdk/python/feast/registry.py +++ b/sdk/python/feast/registry.py @@ -42,6 +42,7 @@ from feast.feature_view import FeatureView from feast.on_demand_feature_view import OnDemandFeatureView from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto +from feast.registry_store import NoopRegistryStore from feast.repo_config import RegistryConfig from feast.request_feature_view import RequestFeatureView @@ -137,7 +138,7 @@ def clone(self) -> "Registry": else RegistryProto() ) new_registry.cached_registry_proto_created = datetime.utcnow() - new_registry._registry_store = None + new_registry._registry_store = NoopRegistryStore() return new_registry # TODO(achals): This method needs to be filled out and used in the feast plan/apply methods. diff --git a/sdk/python/feast/registry_store.py b/sdk/python/feast/registry_store.py index 22c8b0f5ed..c42a55cd9d 100644 --- a/sdk/python/feast/registry_store.py +++ b/sdk/python/feast/registry_store.py @@ -36,3 +36,14 @@ def teardown(self): Tear down the registry. """ pass + + +class NoopRegistryStore(RegistryStore): + def get_registry_proto(self) -> RegistryProto: + pass + + def update_registry_proto(self, registry_proto: RegistryProto): + pass + + def teardown(self): + pass diff --git a/sdk/python/tests/doctest/test_all.py b/sdk/python/tests/doctest/test_all.py index 7507805b03..bf3a09db1e 100644 --- a/sdk/python/tests/doctest/test_all.py +++ b/sdk/python/tests/doctest/test_all.py @@ -96,8 +96,9 @@ def test_docstrings(): if not result.wasSuccessful(): successful = False failed_cases.append(result.failures) - except Exception: + except Exception as e: successful = False + failed_cases.append((full_name, e)) finally: if teardown_function: teardown_function() From cf8ce45e43e9fa794d25e8c520339e13014b4285 Mon Sep 17 00:00:00 2001 From: Achal Shah Date: Thu, 16 Dec 2021 00:05:12 -0800 Subject: [PATCH 9/9] CR updates Signed-off-by: Achal Shah --- sdk/python/feast/feature_store.py | 223 ++++++++++++++-------------- sdk/python/feast/repo_operations.py | 38 ++--- 2 files changed, 127 insertions(+), 134 deletions(-) diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index 7b648b9f1e..b0b99061e9 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -18,7 +18,18 @@ from collections import Counter, OrderedDict, defaultdict from datetime import datetime from pathlib import Path -from typing import Any, Dict, Iterable, List, Optional, Set, Tuple, Union, cast +from typing import ( + Any, + Dict, + Iterable, + List, + NamedTuple, + Optional, + Set, + Tuple, + Union, + cast, +) import pandas as pd from colorama import Fore, Style @@ -68,6 +79,31 @@ warnings.simplefilter("once", DeprecationWarning) +class RepoContents(NamedTuple): + feature_views: Set[FeatureView] + on_demand_feature_views: Set[OnDemandFeatureView] + request_feature_views: Set[RequestFeatureView] + entities: Set[Entity] + feature_services: Set[FeatureService] + + def to_registry_proto(self) -> RegistryProto: + registry_proto = RegistryProto() + registry_proto.entities.extend([e.to_proto() for e in self.entities]) + registry_proto.feature_views.extend( + [fv.to_proto() for fv in self.feature_views] + ) + registry_proto.on_demand_feature_views.extend( + [fv.to_proto() for fv in self.on_demand_feature_views] + ) + registry_proto.request_feature_views.extend( + [fv.to_proto() for fv in self.request_feature_views] + ) + registry_proto.feature_services.extend( + [fs.to_proto() for fs in self.feature_services] + ) + return registry_proto + + class FeatureStore: """ A FeatureStore object is used to define, create, and retrieve features. @@ -360,7 +396,56 @@ def _get_features(self, features: Union[List[str], FeatureService],) -> List[str return _feature_refs @log_exceptions_and_usage - def plan( + def plan(self, desired_repo_objects: RepoContents) -> RegistryDiff: + """Dry-run registering objects to metadata store. + + The plan method dry-runs registering one or more definitions (e.g., Entity, FeatureView), and produces + a list of all the changes the that would be introduced in the feature repo. The changes computed by the plan + command are for informational purpose, and are not actually applied to the registry. + + Args: + objects: A single object, or a list of objects that are intended to be registered with the Feature Store. + objects_to_delete: A list of objects to be deleted from the registry. + partial: If True, apply will only handle the specified objects; if False, apply will also delete + all the objects in objects_to_delete. + + Raises: + ValueError: The 'objects' parameter could not be parsed properly. + + Examples: + Generate a plan adding an Entity and a FeatureView. + + >>> from feast import FeatureStore, Entity, FeatureView, Feature, ValueType, FileSource, RepoConfig + >>> from feast.feature_store import RepoContents + >>> from datetime import timedelta + >>> fs = FeatureStore(repo_path="feature_repo") + >>> driver = Entity(name="driver_id", value_type=ValueType.INT64, description="driver id") + >>> driver_hourly_stats = FileSource( + ... path="feature_repo/data/driver_stats.parquet", + ... event_timestamp_column="event_timestamp", + ... created_timestamp_column="created", + ... ) + >>> driver_hourly_stats_view = FeatureView( + ... name="driver_hourly_stats", + ... entities=["driver_id"], + ... ttl=timedelta(seconds=86400 * 1), + ... batch_source=driver_hourly_stats, + ... ) + >>> diff = fs.plan(RepoContents({driver_hourly_stats_view}, set(), set(), {driver}, set())) # register entity and feature view + """ + + current_registry_proto = ( + self._registry.cached_registry_proto.__deepcopy__() + if self._registry.cached_registry_proto + else RegistryProto() + ) + + desired_registry_proto = desired_repo_objects.to_registry_proto() + diffs = Registry.diff_between(current_registry_proto, desired_registry_proto) + return diffs + + @log_exceptions_and_usage + def apply( self, objects: Union[ Entity, @@ -390,24 +475,26 @@ def plan( ] ] = None, partial: bool = True, - ) -> Tuple[Registry, RegistryDiff]: - """Dry-run registering objects to metadata store. + ) -> RegistryDiff: + """Register objects to metadata store and update related infrastructure. - The plan method dry-runs registering one or more definitions (e.g., Entity, FeatureView), and produces - a list of all the changes the that would be introduced in the feature repo. The changes computed by the plan - command are for informational purpose, and are not actually applied to the registry. + The apply method registers one or more definitions (e.g., Entity, FeatureView) and registers or updates these + objects in the Feast registry. Once the apply method has updated the infrastructure (e.g., create tables in + an online store), it will commit the updated registry. All operations are idempotent, meaning they can safely + be rerun. Args: - objects: A single object, or a list of objects that are intended to be registered with the Feature Store. - objects_to_delete: A list of objects to be deleted from the registry. + objects: A single object, or a list of objects that should be registered with the Feature Store. + objects_to_delete: A list of objects to be deleted from the registry and removed from the + provider's infrastructure. This deletion will only be performed if partial is set to False. partial: If True, apply will only handle the specified objects; if False, apply will also delete - all the objects in objects_to_delete. + all the objects in objects_to_delete, and tear down any associated cloud resources. Raises: ValueError: The 'objects' parameter could not be parsed properly. Examples: - Generate a plan adding an Entity and a FeatureView. + Register an Entity and a FeatureView. >>> from feast import FeatureStore, Entity, FeatureView, Feature, ValueType, FileSource, RepoConfig >>> from datetime import timedelta @@ -424,12 +511,11 @@ def plan( ... ttl=timedelta(seconds=86400 * 1), ... batch_source=driver_hourly_stats, ... ) - >>> registry, diff = fs.plan([driver_hourly_stats_view, driver]) # register entity and feature view + >>> diff = fs.apply([driver_hourly_stats_view, driver]) # register entity and feature view """ - + # TODO: Add locking if not isinstance(objects, Iterable): objects = [objects] - assert isinstance(objects, list) if not objects_to_delete: @@ -440,7 +526,6 @@ def plan( if self._registry.cached_registry_proto else RegistryProto() ) - new_registry = self._registry.clone() # Separate all objects into entities, feature services, and different feature view types. entities_to_update = [ob for ob in objects if isinstance(ob, Entity)] @@ -492,11 +577,11 @@ def plan( for view in itertools.chain( views_to_update, odfvs_to_update, request_views_to_update ): - new_registry.apply_feature_view(view, project=self.project, commit=False) + self._registry.apply_feature_view(view, project=self.project, commit=False) for ent in entities_to_update: - new_registry.apply_entity(ent, project=self.project, commit=False) + self._registry.apply_entity(ent, project=self.project, commit=False) for feature_service in services_to_update: - new_registry.apply_feature_service( + self._registry.apply_feature_service( feature_service, project=self.project, commit=False ) @@ -519,119 +604,33 @@ def plan( ] for entity in entities_to_delete: - new_registry.delete_entity( + self._registry.delete_entity( entity.name, project=self.project, commit=False ) for view in views_to_delete: - new_registry.delete_feature_view( + self._registry.delete_feature_view( view.name, project=self.project, commit=False ) for request_view in request_views_to_delete: - new_registry.delete_feature_view( + self._registry.delete_feature_view( request_view.name, project=self.project, commit=False ) for odfv in odfvs_to_delete: - new_registry.delete_feature_view( + self._registry.delete_feature_view( odfv.name, project=self.project, commit=False ) for service in services_to_delete: - new_registry.delete_feature_service( + self._registry.delete_feature_service( service.name, project=self.project, commit=False ) new_registry_proto = ( - new_registry.cached_registry_proto - if new_registry.cached_registry_proto + self._registry.cached_registry_proto + if self._registry.cached_registry_proto else RegistryProto() ) - return ( - new_registry, - Registry.diff_between(current_registry_proto, new_registry_proto), - ) - - @log_exceptions_and_usage - def apply( - self, - objects: Union[ - Entity, - FeatureView, - OnDemandFeatureView, - RequestFeatureView, - FeatureService, - List[ - Union[ - FeatureView, - OnDemandFeatureView, - RequestFeatureView, - Entity, - FeatureService, - ] - ], - ], - objects_to_delete: Optional[ - List[ - Union[ - FeatureView, - OnDemandFeatureView, - RequestFeatureView, - Entity, - FeatureService, - ] - ] - ] = None, - partial: bool = True, - ): - """Register objects to metadata store and update related infrastructure. - - The apply method registers one or more definitions (e.g., Entity, FeatureView) and registers or updates these - objects in the Feast registry. Once the apply method has updated the infrastructure (e.g., create tables in - an online store), it will commit the updated registry. All operations are idempotent, meaning they can safely - be rerun. - - Args: - objects: A single object, or a list of objects that should be registered with the Feature Store. - objects_to_delete: A list of objects to be deleted from the registry and removed from the - provider's infrastructure. This deletion will only be performed if partial is set to False. - partial: If True, apply will only handle the specified objects; if False, apply will also delete - all the objects in objects_to_delete, and tear down any associated cloud resources. - - Raises: - ValueError: The 'objects' parameter could not be parsed properly. - - Examples: - Register an Entity and a FeatureView. - - >>> from feast import FeatureStore, Entity, FeatureView, Feature, ValueType, FileSource, RepoConfig - >>> from datetime import timedelta - >>> fs = FeatureStore(repo_path="feature_repo") - >>> driver = Entity(name="driver_id", value_type=ValueType.INT64, description="driver id") - >>> driver_hourly_stats = FileSource( - ... path="feature_repo/data/driver_stats.parquet", - ... event_timestamp_column="event_timestamp", - ... created_timestamp_column="created", - ... ) - >>> driver_hourly_stats_view = FeatureView( - ... name="driver_hourly_stats", - ... entities=["driver_id"], - ... ttl=timedelta(seconds=86400 * 1), - ... batch_source=driver_hourly_stats, - ... ) - >>> diff = fs.apply([driver_hourly_stats_view, driver]) # register entity and feature view - """ - # TODO: Add locking - if not isinstance(objects, Iterable): - objects = [objects] - - if not objects_to_delete: - objects_to_delete = [] - - new_registry, diffs = self.plan(objects, objects_to_delete, partial) - new_registry.cached_registry_proto_ttl = ( - self._registry.cached_registry_proto_ttl - ) - new_registry._registry_store = self._registry._registry_store - self._registry = new_registry + diffs = Registry.diff_between(current_registry_proto, new_registry_proto) entities_to_update = [ob for ob in objects if isinstance(ob, Entity)] views_to_update = [ob for ob in objects if isinstance(ob, FeatureView)] diff --git a/sdk/python/feast/repo_operations.py b/sdk/python/feast/repo_operations.py index a05032ba08..86de6d1958 100644 --- a/sdk/python/feast/repo_operations.py +++ b/sdk/python/feast/repo_operations.py @@ -6,7 +6,7 @@ import sys from importlib.abc import Loader from pathlib import Path -from typing import List, NamedTuple, Set, Union, cast +from typing import List, Set, Union, cast import click from click.exceptions import BadParameter @@ -15,7 +15,7 @@ from feast.diff.FcoDiff import TransitionType, tag_objects_for_keep_delete_add from feast.entity import Entity from feast.feature_service import FeatureService -from feast.feature_store import FeatureStore +from feast.feature_store import FeatureStore, RepoContents from feast.feature_view import DUMMY_ENTITY, DUMMY_ENTITY_NAME, FeatureView from feast.names import adjectives, animals from feast.on_demand_feature_view import OnDemandFeatureView @@ -33,14 +33,6 @@ def py_path_to_module(path: Path, repo_root: Path) -> str: ) -class ParsedRepo(NamedTuple): - feature_views: Set[FeatureView] - on_demand_feature_views: Set[OnDemandFeatureView] - request_feature_views: Set[RequestFeatureView] - entities: Set[Entity] - feature_services: Set[FeatureService] - - def read_feastignore(repo_root: Path) -> List[str]: """Read .feastignore in the repo root directory (if exists) and return the list of user-defined ignore paths""" feast_ignore = repo_root / ".feastignore" @@ -94,9 +86,9 @@ def get_repo_files(repo_root: Path) -> List[Path]: return sorted(repo_files) -def parse_repo(repo_root: Path) -> ParsedRepo: +def parse_repo(repo_root: Path) -> RepoContents: """ Collect feature table definitions from feature repo """ - res = ParsedRepo( + res = RepoContents( entities=set(), feature_views=set(), feature_services=set(), @@ -135,16 +127,18 @@ def plan(repo_config: RepoConfig, repo_path: Path, skip_source_validation: bool) for data_source in data_sources: data_source.validate(store.config) - # For each object in the registry, determine whether it should be kept or deleted, - # and whether new objects need to be added. - ( - all_to_apply, - all_to_delete, - views_to_delete, - views_to_keep, - ) = extract_objects_for_apply_delete(project, registry, repo) - - _, diff = store.plan(all_to_apply, objects_to_delete=all_to_delete, partial=False) + diff = store.plan(repo) + views_to_delete = [ + v + for v in diff.fco_diffs + if v.fco_type == "feature view" and v.transition_type == TransitionType.DELETE + ] + views_to_keep = [ + v + for v in diff.fco_diffs + if v.fco_type == "feature view" + and v.transition_type in {TransitionType.CREATE, TransitionType.UNCHANGED} + ] log_cli_output(diff, views_to_delete, views_to_keep)