diff --git a/sdk/python/feast/cli.py b/sdk/python/feast/cli.py index 6acd7ec55b..1997b12e3b 100644 --- a/sdk/python/feast/cli.py +++ b/sdk/python/feast/cli.py @@ -34,6 +34,7 @@ cli_check_repo, generate_project_name, init_repo, + plan, registry_dump, teardown, ) @@ -351,6 +352,26 @@ def on_demand_feature_view_list(ctx: click.Context): print(tabulate(table, headers=["NAME"], tablefmt="plain")) +@cli.command("plan", cls=NoOptionDefaultFormat) +@click.option( + "--skip-source-validation", + is_flag=True, + help="Don't validate the data sources by checking for that the tables exist.", +) +@click.pass_context +def plan_command(ctx: click.Context, skip_source_validation: bool): + """ + Create or update a feature store deployment + """ + repo = ctx.obj["CHDIR"] + cli_check_repo(repo) + repo_config = load_repo_config(repo) + try: + plan(repo_config, repo, skip_source_validation) + except FeastProviderLoginError as e: + print(str(e)) + + @cli.command("apply", cls=NoOptionDefaultFormat) @click.option( "--skip-source-validation", diff --git a/sdk/python/feast/diff/FcoDiff.py b/sdk/python/feast/diff/FcoDiff.py index b19eb713c2..4e2047d38f 100644 --- a/sdk/python/feast/diff/FcoDiff.py +++ b/sdk/python/feast/diff/FcoDiff.py @@ -26,6 +26,8 @@ class TransitionType(Enum): @dataclass class FcoDiff: + name: str + fco_type: str current_fco: Any new_fco: Any fco_property_diffs: List[PropertyDiff] diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index 98af761931..b0b99061e9 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -18,7 +18,18 @@ from collections import Counter, OrderedDict, defaultdict from datetime import datetime from pathlib import Path -from typing import Any, Dict, Iterable, List, Optional, Set, Tuple, Union, cast +from typing import ( + Any, + Dict, + Iterable, + List, + NamedTuple, + Optional, + Set, + Tuple, + Union, + cast, +) import pandas as pd from colorama import Fore, Style @@ -26,6 +37,7 @@ from feast import feature_server, flags, flags_helper, utils from feast.base_feature_view import BaseFeatureView +from feast.diff.FcoDiff import RegistryDiff from feast.entity import Entity from feast.errors import ( EntityNotFoundException, @@ -51,6 +63,7 @@ from feast.infra.provider import Provider, RetrievalJob, get_provider from feast.on_demand_feature_view import OnDemandFeatureView from feast.online_response import OnlineResponse, _infer_online_entity_rows +from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto from feast.protos.feast.serving.ServingService_pb2 import ( GetOnlineFeaturesRequestV2, GetOnlineFeaturesResponse, @@ -66,6 +79,31 @@ warnings.simplefilter("once", DeprecationWarning) +class RepoContents(NamedTuple): + feature_views: Set[FeatureView] + on_demand_feature_views: Set[OnDemandFeatureView] + request_feature_views: Set[RequestFeatureView] + entities: Set[Entity] + feature_services: Set[FeatureService] + + def to_registry_proto(self) -> RegistryProto: + registry_proto = RegistryProto() + registry_proto.entities.extend([e.to_proto() for e in self.entities]) + registry_proto.feature_views.extend( + [fv.to_proto() for fv in self.feature_views] + ) + registry_proto.on_demand_feature_views.extend( + [fv.to_proto() for fv in self.on_demand_feature_views] + ) + registry_proto.request_feature_views.extend( + [fv.to_proto() for fv in self.request_feature_views] + ) + registry_proto.feature_services.extend( + [fs.to_proto() for fs in self.feature_services] + ) + return registry_proto + + class FeatureStore: """ A FeatureStore object is used to define, create, and retrieve features. @@ -357,6 +395,55 @@ def _get_features(self, features: Union[List[str], FeatureService],) -> List[str _feature_refs = _features return _feature_refs + @log_exceptions_and_usage + def plan(self, desired_repo_objects: RepoContents) -> RegistryDiff: + """Dry-run registering objects to metadata store. + + The plan method dry-runs registering one or more definitions (e.g., Entity, FeatureView), and produces + a list of all the changes the that would be introduced in the feature repo. The changes computed by the plan + command are for informational purpose, and are not actually applied to the registry. + + Args: + objects: A single object, or a list of objects that are intended to be registered with the Feature Store. + objects_to_delete: A list of objects to be deleted from the registry. + partial: If True, apply will only handle the specified objects; if False, apply will also delete + all the objects in objects_to_delete. + + Raises: + ValueError: The 'objects' parameter could not be parsed properly. + + Examples: + Generate a plan adding an Entity and a FeatureView. + + >>> from feast import FeatureStore, Entity, FeatureView, Feature, ValueType, FileSource, RepoConfig + >>> from feast.feature_store import RepoContents + >>> from datetime import timedelta + >>> fs = FeatureStore(repo_path="feature_repo") + >>> driver = Entity(name="driver_id", value_type=ValueType.INT64, description="driver id") + >>> driver_hourly_stats = FileSource( + ... path="feature_repo/data/driver_stats.parquet", + ... event_timestamp_column="event_timestamp", + ... created_timestamp_column="created", + ... ) + >>> driver_hourly_stats_view = FeatureView( + ... name="driver_hourly_stats", + ... entities=["driver_id"], + ... ttl=timedelta(seconds=86400 * 1), + ... batch_source=driver_hourly_stats, + ... ) + >>> diff = fs.plan(RepoContents({driver_hourly_stats_view}, set(), set(), {driver}, set())) # register entity and feature view + """ + + current_registry_proto = ( + self._registry.cached_registry_proto.__deepcopy__() + if self._registry.cached_registry_proto + else RegistryProto() + ) + + desired_registry_proto = desired_repo_objects.to_registry_proto() + diffs = Registry.diff_between(current_registry_proto, desired_registry_proto) + return diffs + @log_exceptions_and_usage def apply( self, @@ -388,7 +475,7 @@ def apply( ] ] = None, partial: bool = True, - ): + ) -> RegistryDiff: """Register objects to metadata store and update related infrastructure. The apply method registers one or more definitions (e.g., Entity, FeatureView) and registers or updates these @@ -424,18 +511,22 @@ def apply( ... ttl=timedelta(seconds=86400 * 1), ... batch_source=driver_hourly_stats, ... ) - >>> fs.apply([driver_hourly_stats_view, driver]) # register entity and feature view + >>> diff = fs.apply([driver_hourly_stats_view, driver]) # register entity and feature view """ # TODO: Add locking - if not isinstance(objects, Iterable): objects = [objects] - assert isinstance(objects, list) if not objects_to_delete: objects_to_delete = [] + current_registry_proto = ( + self._registry.cached_registry_proto.__deepcopy__() + if self._registry.cached_registry_proto + else RegistryProto() + ) + # Separate all objects into entities, feature services, and different feature view types. entities_to_update = [ob for ob in objects if isinstance(ob, Entity)] views_to_update = [ob for ob in objects if isinstance(ob, FeatureView)] @@ -533,6 +624,22 @@ def apply( service.name, project=self.project, commit=False ) + new_registry_proto = ( + self._registry.cached_registry_proto + if self._registry.cached_registry_proto + else RegistryProto() + ) + + diffs = Registry.diff_between(current_registry_proto, new_registry_proto) + + entities_to_update = [ob for ob in objects if isinstance(ob, Entity)] + views_to_update = [ob for ob in objects if isinstance(ob, FeatureView)] + + entities_to_delete = [ob for ob in objects_to_delete if isinstance(ob, Entity)] + views_to_delete = [ + ob for ob in objects_to_delete if isinstance(ob, FeatureView) + ] + self._get_provider().update_infra( project=self.project, tables_to_delete=views_to_delete if not partial else [], @@ -544,6 +651,8 @@ def apply( self._registry.commit() + return diffs + @log_exceptions_and_usage def teardown(self): """Tears down all local and cloud resources for the feature store.""" diff --git a/sdk/python/feast/registry.py b/sdk/python/feast/registry.py index 68d5b0bc11..bfc1e9a336 100644 --- a/sdk/python/feast/registry.py +++ b/sdk/python/feast/registry.py @@ -42,6 +42,7 @@ from feast.feature_view import FeatureView from feast.on_demand_feature_view import OnDemandFeatureView from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto +from feast.registry_store import NoopRegistryStore from feast.repo_config import RegistryConfig from feast.request_feature_view import RequestFeatureView @@ -128,6 +129,18 @@ def __init__( else 0 ) + def clone(self) -> "Registry": + new_registry = Registry(None, None) + new_registry.cached_registry_proto_ttl = timedelta(seconds=0) + new_registry.cached_registry_proto = ( + self.cached_registry_proto.__deepcopy__() + if self.cached_registry_proto + else RegistryProto() + ) + new_registry.cached_registry_proto_created = datetime.utcnow() + new_registry._registry_store = NoopRegistryStore() + return new_registry + # TODO(achals): This method needs to be filled out and used in the feast plan/apply methods. @staticmethod def diff_between( @@ -135,25 +148,66 @@ def diff_between( ) -> RegistryDiff: diff = RegistryDiff() - # Handle Entities - ( - entities_to_keep, - entities_to_delete, - entities_to_add, - ) = tag_proto_objects_for_keep_delete_add( - current_registry.entities, new_registry.entities, - ) + attribute_to_object_type_str = { + "entities": "entity", + "feature_views": "feature view", + "feature_tables": "feature table", + "on_demand_feature_views": "on demand feature view", + "request_feature_views": "request feature view", + "feature_services": "feature service", + } - for e in entities_to_add: - diff.add_fco_diff(FcoDiff(None, e, [], TransitionType.CREATE)) - for e in entities_to_delete: - diff.add_fco_diff(FcoDiff(e, None, [], TransitionType.DELETE)) + for object_type in [ + "entities", + "feature_views", + "feature_tables", + "on_demand_feature_views", + "request_feature_views", + "feature_services", + ]: + ( + objects_to_keep, + objects_to_delete, + objects_to_add, + ) = tag_proto_objects_for_keep_delete_add( + getattr(current_registry, object_type), + getattr(new_registry, object_type), + ) + + for e in objects_to_add: + diff.add_fco_diff( + FcoDiff( + e.spec.name, + attribute_to_object_type_str[object_type], + None, + e, + [], + TransitionType.CREATE, + ) + ) + for e in objects_to_delete: + diff.add_fco_diff( + FcoDiff( + e.spec.name, + attribute_to_object_type_str[object_type], + e, + None, + [], + TransitionType.DELETE, + ) + ) + for e in objects_to_keep: + diff.add_fco_diff( + FcoDiff( + e.spec.name, + attribute_to_object_type_str[object_type], + e, + e, + [], + TransitionType.UNCHANGED, + ) + ) - # Handle Feature Views - # Handle On Demand Feature Views - # Handle Request Feature Views - # Handle Feature Services - logger.info(f"Diff: {diff}") return diff def _initialize_registry(self): diff --git a/sdk/python/feast/registry_store.py b/sdk/python/feast/registry_store.py index 22c8b0f5ed..c42a55cd9d 100644 --- a/sdk/python/feast/registry_store.py +++ b/sdk/python/feast/registry_store.py @@ -36,3 +36,14 @@ def teardown(self): Tear down the registry. """ pass + + +class NoopRegistryStore(RegistryStore): + def get_registry_proto(self) -> RegistryProto: + pass + + def update_registry_proto(self, registry_proto: RegistryProto): + pass + + def teardown(self): + pass diff --git a/sdk/python/feast/repo_operations.py b/sdk/python/feast/repo_operations.py index c7620c07a0..86de6d1958 100644 --- a/sdk/python/feast/repo_operations.py +++ b/sdk/python/feast/repo_operations.py @@ -6,16 +6,16 @@ import sys from importlib.abc import Loader from pathlib import Path -from typing import List, NamedTuple, Set, Union, cast +from typing import List, Set, Union, cast import click from click.exceptions import BadParameter from feast.base_feature_view import BaseFeatureView -from feast.diff.FcoDiff import tag_objects_for_keep_delete_add +from feast.diff.FcoDiff import TransitionType, tag_objects_for_keep_delete_add from feast.entity import Entity from feast.feature_service import FeatureService -from feast.feature_store import FeatureStore +from feast.feature_store import FeatureStore, RepoContents from feast.feature_view import DUMMY_ENTITY, DUMMY_ENTITY_NAME, FeatureView from feast.names import adjectives, animals from feast.on_demand_feature_view import OnDemandFeatureView @@ -33,14 +33,6 @@ def py_path_to_module(path: Path, repo_root: Path) -> str: ) -class ParsedRepo(NamedTuple): - feature_views: Set[FeatureView] - on_demand_feature_views: Set[OnDemandFeatureView] - request_feature_views: Set[RequestFeatureView] - entities: Set[Entity] - feature_services: Set[FeatureService] - - def read_feastignore(repo_root: Path) -> List[str]: """Read .feastignore in the repo root directory (if exists) and return the list of user-defined ignore paths""" feast_ignore = repo_root / ".feastignore" @@ -94,9 +86,9 @@ def get_repo_files(repo_root: Path) -> List[Path]: return sorted(repo_files) -def parse_repo(repo_root: Path) -> ParsedRepo: +def parse_repo(repo_root: Path) -> RepoContents: """ Collect feature table definitions from feature repo """ - res = ParsedRepo( + res = RepoContents( entities=set(), feature_views=set(), feature_services=set(), @@ -124,10 +116,34 @@ def parse_repo(repo_root: Path) -> ParsedRepo: @log_exceptions_and_usage -def apply_total(repo_config: RepoConfig, repo_path: Path, skip_source_validation: bool): - from colorama import Fore, Style +def plan(repo_config: RepoConfig, repo_path: Path, skip_source_validation: bool): os.chdir(repo_path) + project, registry, repo, store = _prepare_registry_and_repo(repo_config, repo_path) + + if not skip_source_validation: + data_sources = [t.batch_source for t in repo.feature_views] + # Make sure the data source used by this feature view is supported by Feast + for data_source in data_sources: + data_source.validate(store.config) + + diff = store.plan(repo) + views_to_delete = [ + v + for v in diff.fco_diffs + if v.fco_type == "feature view" and v.transition_type == TransitionType.DELETE + ] + views_to_keep = [ + v + for v in diff.fco_diffs + if v.fco_type == "feature view" + and v.transition_type in {TransitionType.CREATE, TransitionType.UNCHANGED} + ] + + log_cli_output(diff, views_to_delete, views_to_keep) + + +def _prepare_registry_and_repo(repo_config, repo_path): store = FeatureStore(config=repo_config) project = store.project if not is_valid_name(project): @@ -140,14 +156,10 @@ def apply_total(repo_config: RepoConfig, repo_path: Path, skip_source_validation registry._initialize_registry() sys.dont_write_bytecode = True repo = parse_repo(repo_path) + return project, registry, repo, store - if not skip_source_validation: - data_sources = [t.batch_source for t in repo.feature_views] - # Make sure the data source used by this feature view is supported by Feast - for data_source in data_sources: - data_source.validate(store.config) - # For each object in the registry, determine whether it should be kept or deleted. +def extract_objects_for_apply_delete(project, registry, repo): ( entities_to_keep, entities_to_delete, @@ -157,7 +169,6 @@ def apply_total(repo_config: RepoConfig, repo_path: Path, skip_source_validation ) # TODO(achals): This code path should be refactored to handle added & kept entities separately. entities_to_keep = set(entities_to_keep).union(entities_to_add) - views = tag_objects_for_keep_delete_add( set(registry.list_feature_views(project=project)), repo.feature_views ) @@ -166,7 +177,6 @@ def apply_total(repo_config: RepoConfig, repo_path: Path, skip_source_validation cast(Set[FeatureView], views[1]), cast(Set[FeatureView], views[2]), ) - request_views = tag_objects_for_keep_delete_add( set(registry.list_request_feature_views(project=project)), repo.request_feature_views, @@ -179,7 +189,6 @@ def apply_total(repo_config: RepoConfig, repo_path: Path, skip_source_validation cast(Set[RequestFeatureView], request_views[1]), cast(Set[RequestFeatureView], request_views[2]), ) - base_views_to_keep: Set[Union[RequestFeatureView, FeatureView]] = { *views_to_keep, *views_to_add, @@ -190,7 +199,6 @@ def apply_total(repo_config: RepoConfig, repo_path: Path, skip_source_validation *views_to_delete, *request_views_to_delete, } - odfvs = tag_objects_for_keep_delete_add( set(registry.list_on_demand_feature_views(project=project)), repo.on_demand_feature_views, @@ -201,7 +209,6 @@ def apply_total(repo_config: RepoConfig, repo_path: Path, skip_source_validation cast(Set[OnDemandFeatureView], odfvs[2]), ) odfvs_to_keep = odfvs_to_keep.union(odfvs_to_add) - ( services_to_keep, services_to_delete, @@ -210,9 +217,7 @@ def apply_total(repo_config: RepoConfig, repo_path: Path, skip_source_validation set(registry.list_feature_services(project=project)), repo.feature_services ) services_to_keep = services_to_keep.union(services_to_add) - sys.dont_write_bytecode = False - # Apply all changes to the registry and infrastructure. all_to_apply: List[ Union[Entity, BaseFeatureView, FeatureService, OnDemandFeatureView] @@ -229,44 +234,49 @@ def apply_total(repo_config: RepoConfig, repo_path: Path, skip_source_validation all_to_delete.extend(services_to_delete) all_to_delete.extend(odfvs_to_delete) - store.apply(all_to_apply, objects_to_delete=all_to_delete, partial=False) + return all_to_apply, all_to_delete, views_to_delete, views_to_keep - for entity in entities_to_delete: - click.echo( - f"Deleted entity {Style.BRIGHT + Fore.GREEN}{entity.name}{Style.RESET_ALL} from registry" - ) - for view in base_views_to_delete: - click.echo( - f"Deleted feature view {Style.BRIGHT + Fore.GREEN}{view.name}{Style.RESET_ALL} from registry" - ) - for odfv in odfvs_to_delete: - click.echo( - f"Deleted on demand feature view {Style.BRIGHT + Fore.GREEN}{odfv.name}{Style.RESET_ALL} from registry" - ) - for feature_service in services_to_delete: - click.echo( - f"Deleted feature service {Style.BRIGHT + Fore.GREEN}{feature_service.name}{Style.RESET_ALL} " - f"from registry" - ) - for entity in entities_to_keep: - if entity.name != DUMMY_ENTITY_NAME: - click.echo( - f"Registered entity {Style.BRIGHT + Fore.GREEN}{entity.name}{Style.RESET_ALL}" - ) - for view in base_views_to_keep: - click.echo( - f"Registered feature view {Style.BRIGHT + Fore.GREEN}{view.name}{Style.RESET_ALL}" - ) - for odfv in odfvs_to_keep: - click.echo( - f"Registered on demand feature view {Style.BRIGHT + Fore.GREEN}{odfv.name}{Style.RESET_ALL}" - ) - for feature_service in services_to_keep: +@log_exceptions_and_usage +def apply_total(repo_config: RepoConfig, repo_path: Path, skip_source_validation: bool): + + os.chdir(repo_path) + project, registry, repo, store = _prepare_registry_and_repo(repo_config, repo_path) + + if not skip_source_validation: + data_sources = [t.batch_source for t in repo.feature_views] + # Make sure the data source used by this feature view is supported by Feast + for data_source in data_sources: + data_source.validate(store.config) + + # For each object in the registry, determine whether it should be kept or deleted. + ( + all_to_apply, + all_to_delete, + views_to_delete, + views_to_keep, + ) = extract_objects_for_apply_delete(project, registry, repo) + + diff = store.apply(all_to_apply, objects_to_delete=all_to_delete, partial=False) + + log_cli_output(diff, views_to_delete, views_to_keep) + + +def log_cli_output(diff, views_to_delete, views_to_keep): + from colorama import Fore, Style + + message_action_map = { + TransitionType.CREATE: ("Created", Fore.GREEN), + TransitionType.DELETE: ("Deleted", Fore.RED), + TransitionType.UNCHANGED: ("Unchanged", Fore.LIGHTBLUE_EX), + } + for fco_diff in diff.fco_diffs: + if fco_diff.name == DUMMY_ENTITY_NAME: + continue + action, color = message_action_map[fco_diff.transition_type] click.echo( - f"Registered feature service {Style.BRIGHT + Fore.GREEN}{feature_service.name}{Style.RESET_ALL}" + f"{action} {fco_diff.fco_type} {Style.BRIGHT + color}{fco_diff.name}{Style.RESET_ALL}" ) - views_to_keep_in_infra = [ view for view in views_to_keep if isinstance(view, FeatureView) ] @@ -279,9 +289,8 @@ def apply_total(repo_config: RepoConfig, repo_path: Path, skip_source_validation ] for name in [view.name for view in views_to_delete_from_infra]: click.echo( - f"Removing infrastructure for {Style.BRIGHT + Fore.GREEN}{name}{Style.RESET_ALL}" + f"Removing infrastructure for {Style.BRIGHT + Fore.RED}{name}{Style.RESET_ALL}" ) - # TODO: consider echoing also entities being deployed/removed @log_exceptions_and_usage diff --git a/sdk/python/tests/doctest/test_all.py b/sdk/python/tests/doctest/test_all.py index c406303f00..bf3a09db1e 100644 --- a/sdk/python/tests/doctest/test_all.py +++ b/sdk/python/tests/doctest/test_all.py @@ -59,6 +59,7 @@ def test_docstrings(): """ successful = True current_packages = [feast] + failed_cases = [] while current_packages: next_packages = [] @@ -94,8 +95,10 @@ def test_docstrings(): result = unittest.TextTestRunner(sys.stdout).run(test_suite) if not result.wasSuccessful(): successful = False - except Exception: + failed_cases.append(result.failures) + except Exception as e: successful = False + failed_cases.append((full_name, e)) finally: if teardown_function: teardown_function() @@ -103,4 +106,4 @@ def test_docstrings(): current_packages = next_packages if not successful: - raise Exception("Docstring tests failed.") + raise Exception(f"Docstring tests failed. Failed results: {failed_cases}") diff --git a/sdk/python/tests/integration/registration/test_cli_apply_duplicated_featureview_names.py b/sdk/python/tests/integration/registration/test_cli_apply_duplicated_featureview_names.py index 2b5ba2dcb0..6987066e8d 100644 --- a/sdk/python/tests/integration/registration/test_cli_apply_duplicated_featureview_names.py +++ b/sdk/python/tests/integration/registration/test_cli_apply_duplicated_featureview_names.py @@ -85,7 +85,7 @@ def test_cli_apply_imported_featureview() -> None: rc, output = runner.run_with_output(["apply"], cwd=repo_path) assert rc == 0 - assert b"Registered feature service driver_locations_service" in output + assert b"Created feature service driver_locations_service" in output def test_cli_apply_imported_featureview_with_duplication() -> None: