diff --git a/java/serving/src/test/resources/docker-compose/feast10/setup_it.py b/java/serving/src/test/resources/docker-compose/feast10/setup_it.py index 503b66f328..61aaa6fec8 100644 --- a/java/serving/src/test/resources/docker-compose/feast10/setup_it.py +++ b/java/serving/src/test/resources/docker-compose/feast10/setup_it.py @@ -64,7 +64,7 @@ def main(): print("Running setup_it.py") setup_data() - existing_repo_config = load_repo_config(Path(".")) + existing_repo_config = load_repo_config(Path("."), Path(".") / "feature_store.yaml") # Update to default online store since otherwise, relies on Dockerized Redis service fs = FeatureStore(config=existing_repo_config.copy(update={"online_store": {}})) diff --git a/sdk/python/feast/cli.py b/sdk/python/feast/cli.py index c6a301e958..2763d03d48 100644 --- a/sdk/python/feast/cli.py +++ b/sdk/python/feast/cli.py @@ -72,8 +72,17 @@ def format_options(self, ctx: click.Context, formatter: click.HelpFormatter): default="info", help="The logging level. One of DEBUG, INFO, WARNING, ERROR, and CRITICAL (case-insensitive).", ) +@click.option( + "--feature-store-yaml", + help="Override the directory where the CLI should look for the feature_store.yaml file.", +) @click.pass_context -def cli(ctx: click.Context, chdir: Optional[str], log_level: str): +def cli( + ctx: click.Context, + chdir: Optional[str], + log_level: str, + feature_store_yaml: Optional[str], +): """ Feast CLI @@ -83,6 +92,11 @@ def cli(ctx: click.Context, chdir: Optional[str], log_level: str): """ ctx.ensure_object(dict) ctx.obj["CHDIR"] = Path.cwd() if chdir is None else Path(chdir).absolute() + ctx.obj["FS_YAML_FILE"] = ( + Path(feature_store_yaml).absolute() + if feature_store_yaml + else ctx.obj["CHDIR"] / "feature_store.yaml" + ) try: level = getattr(logging, log_level.upper()) logging.basicConfig( @@ -143,8 +157,9 @@ def ui(ctx: click.Context, host: str, port: int, registry_ttl_sec: int): Shows the Feast UI over the current directory """ repo = ctx.obj["CHDIR"] - cli_check_repo(repo) - store = FeatureStore(repo_path=str(repo)) + fs_yaml_file = ctx.obj["FS_YAML_FILE"] + cli_check_repo(repo, fs_yaml_file) + store = FeatureStore(repo_path=str(repo), fs_yaml_file=fs_yaml_file) # Pass in the registry_dump method to get around a circular dependency store.serve_ui( host=host, @@ -161,8 +176,9 @@ def endpoint(ctx: click.Context): Display feature server endpoints """ repo = ctx.obj["CHDIR"] - cli_check_repo(repo) - store = FeatureStore(repo_path=str(repo)) + fs_yaml_file = ctx.obj["FS_YAML_FILE"] + cli_check_repo(repo, fs_yaml_file) + store = FeatureStore(repo_path=str(repo), fs_yaml_file=fs_yaml_file) endpoint = store.get_feature_server_endpoint() if endpoint is not None: _logger.info( @@ -188,8 +204,9 @@ def data_source_describe(ctx: click.Context, name: str): Describe a data source """ repo = ctx.obj["CHDIR"] - cli_check_repo(repo) - store = FeatureStore(repo_path=str(repo)) + fs_yaml_file = ctx.obj["FS_YAML_FILE"] + cli_check_repo(repo, fs_yaml_file) + store = FeatureStore(repo_path=str(repo), fs_yaml_file=fs_yaml_file) try: data_source = store.get_data_source(name) @@ -216,8 +233,9 @@ def data_source_list(ctx: click.Context): List all data sources """ repo = ctx.obj["CHDIR"] - cli_check_repo(repo) - store = FeatureStore(repo_path=str(repo)) + fs_yaml_file = ctx.obj["FS_YAML_FILE"] + cli_check_repo(repo, fs_yaml_file) + store = FeatureStore(repo_path=str(repo), fs_yaml_file=fs_yaml_file) table = [] for datasource in store.list_data_sources(): table.append([datasource.name, datasource.__class__]) @@ -248,8 +266,9 @@ def entity_describe(ctx: click.Context, name: str): Describe an entity """ repo = ctx.obj["CHDIR"] - cli_check_repo(repo) - store = FeatureStore(repo_path=str(repo)) + fs_yaml_file = ctx.obj["FS_YAML_FILE"] + cli_check_repo(repo, fs_yaml_file) + store = FeatureStore(repo_path=str(repo), fs_yaml_file=fs_yaml_file) try: entity = store.get_entity(name) @@ -271,8 +290,9 @@ def entity_list(ctx: click.Context): List all entities """ repo = ctx.obj["CHDIR"] - cli_check_repo(repo) - store = FeatureStore(repo_path=str(repo)) + fs_yaml_file = ctx.obj["FS_YAML_FILE"] + cli_check_repo(repo, fs_yaml_file) + store = FeatureStore(repo_path=str(repo), fs_yaml_file=fs_yaml_file) table = [] for entity in store.list_entities(): table.append([entity.name, entity.description, entity.value_type]) @@ -298,8 +318,9 @@ def feature_service_describe(ctx: click.Context, name: str): Describe a feature service """ repo = ctx.obj["CHDIR"] - cli_check_repo(repo) - store = FeatureStore(repo_path=str(repo)) + fs_yaml_file = ctx.obj["FS_YAML_FILE"] + cli_check_repo(repo, fs_yaml_file) + store = FeatureStore(repo_path=str(repo), fs_yaml_file=fs_yaml_file) try: feature_service = store.get_feature_service(name) @@ -323,8 +344,9 @@ def feature_service_list(ctx: click.Context): List all feature services """ repo = ctx.obj["CHDIR"] - cli_check_repo(repo) - store = FeatureStore(repo_path=str(repo)) + fs_yaml_file = ctx.obj["FS_YAML_FILE"] + cli_check_repo(repo, fs_yaml_file) + store = FeatureStore(repo_path=str(repo), fs_yaml_file=fs_yaml_file) feature_services = [] for feature_service in store.list_feature_services(): feature_names = [] @@ -355,8 +377,9 @@ def feature_view_describe(ctx: click.Context, name: str): Describe a feature view """ repo = ctx.obj["CHDIR"] - cli_check_repo(repo) - store = FeatureStore(repo_path=str(repo)) + fs_yaml_file = ctx.obj["FS_YAML_FILE"] + cli_check_repo(repo, fs_yaml_file) + store = FeatureStore(repo_path=str(repo), fs_yaml_file=fs_yaml_file) try: feature_view = store.get_feature_view(name) @@ -378,8 +401,10 @@ def feature_view_list(ctx: click.Context): List all feature views """ repo = ctx.obj["CHDIR"] - cli_check_repo(repo) - store = FeatureStore(repo_path=str(repo)) + fs_yaml_file = ctx.obj["FS_YAML_FILE"] + + cli_check_repo(repo, fs_yaml_file) + store = FeatureStore(repo_path=str(repo), fs_yaml_file=fs_yaml_file) table = [] for feature_view in [ *store.list_feature_views(), @@ -421,8 +446,9 @@ def on_demand_feature_view_describe(ctx: click.Context, name: str): [Experimental] Describe an on demand feature view """ repo = ctx.obj["CHDIR"] - cli_check_repo(repo) - store = FeatureStore(repo_path=str(repo)) + fs_yaml_file = ctx.obj["FS_YAML_FILE"] + cli_check_repo(repo, fs_yaml_file) + store = FeatureStore(repo_path=str(repo), fs_yaml_file=fs_yaml_file) try: on_demand_feature_view = store.get_on_demand_feature_view(name) @@ -446,8 +472,9 @@ def on_demand_feature_view_list(ctx: click.Context): [Experimental] List all on demand feature views """ repo = ctx.obj["CHDIR"] - cli_check_repo(repo) - store = FeatureStore(repo_path=str(repo)) + fs_yaml_file = ctx.obj["FS_YAML_FILE"] + cli_check_repo(repo, fs_yaml_file) + store = FeatureStore(repo_path=str(repo), fs_yaml_file=fs_yaml_file) table = [] for on_demand_feature_view in store.list_on_demand_feature_views(): table.append([on_demand_feature_view.name]) @@ -469,8 +496,9 @@ def plan_command(ctx: click.Context, skip_source_validation: bool): Create or update a feature store deployment """ repo = ctx.obj["CHDIR"] - cli_check_repo(repo) - repo_config = load_repo_config(repo) + fs_yaml_file = ctx.obj["FS_YAML_FILE"] + cli_check_repo(repo, fs_yaml_file) + repo_config = load_repo_config(repo, fs_yaml_file) try: plan(repo_config, repo, skip_source_validation) except FeastProviderLoginError as e: @@ -489,8 +517,10 @@ def apply_total_command(ctx: click.Context, skip_source_validation: bool): Create or update a feature store deployment """ repo = ctx.obj["CHDIR"] - cli_check_repo(repo) - repo_config = load_repo_config(repo) + fs_yaml_file = ctx.obj["FS_YAML_FILE"] + cli_check_repo(repo, fs_yaml_file) + + repo_config = load_repo_config(repo, fs_yaml_file) try: apply_total(repo_config, repo, skip_source_validation) except FeastProviderLoginError as e: @@ -504,8 +534,9 @@ def teardown_command(ctx: click.Context): Tear down deployed feature store infrastructure """ repo = ctx.obj["CHDIR"] - cli_check_repo(repo) - repo_config = load_repo_config(repo) + fs_yaml_file = ctx.obj["FS_YAML_FILE"] + cli_check_repo(repo, fs_yaml_file) + repo_config = load_repo_config(repo, fs_yaml_file) teardown(repo_config, repo) @@ -517,8 +548,9 @@ def registry_dump_command(ctx: click.Context): Print contents of the metadata registry """ repo = ctx.obj["CHDIR"] - cli_check_repo(repo) - repo_config = load_repo_config(repo) + fs_yaml_file = ctx.obj["FS_YAML_FILE"] + cli_check_repo(repo, fs_yaml_file) + repo_config = load_repo_config(repo, fs_yaml_file) click.echo(registry_dump(repo_config, repo_path=repo)) @@ -545,8 +577,9 @@ def materialize_command( START_TS and END_TS should be in ISO 8601 format, e.g. '2021-07-16T19:20:01' """ repo = ctx.obj["CHDIR"] - cli_check_repo(repo) - store = FeatureStore(repo_path=str(repo)) + fs_yaml_file = ctx.obj["FS_YAML_FILE"] + cli_check_repo(repo, fs_yaml_file) + store = FeatureStore(repo_path=str(repo), fs_yaml_file=fs_yaml_file) store.materialize( feature_views=None if not views else views, start_date=utils.make_tzaware(parser.parse(start_ts)), @@ -573,8 +606,9 @@ def materialize_incremental_command(ctx: click.Context, end_ts: str, views: List END_TS should be in ISO 8601 format, e.g. '2021-07-16T19:20:01' """ repo = ctx.obj["CHDIR"] - cli_check_repo(repo) - store = FeatureStore(repo_path=str(repo)) + fs_yaml_file = ctx.obj["FS_YAML_FILE"] + cli_check_repo(repo, fs_yaml_file) + store = FeatureStore(repo_path=str(repo), fs_yaml_file=fs_yaml_file) store.materialize_incremental( feature_views=None if not views else views, end_date=utils.make_tzaware(datetime.fromisoformat(end_ts)), @@ -663,8 +697,9 @@ def serve_command( ): """Start a feature server locally on a given port.""" repo = ctx.obj["CHDIR"] - cli_check_repo(repo) - store = FeatureStore(repo_path=str(repo)) + fs_yaml_file = ctx.obj["FS_YAML_FILE"] + cli_check_repo(repo, fs_yaml_file) + store = FeatureStore(repo_path=str(repo), fs_yaml_file=fs_yaml_file) if go: # Turn on Go feature retrieval. @@ -685,8 +720,9 @@ def serve_command( def serve_transformations_command(ctx: click.Context, port: int): """[Experimental] Start a feature consumption server locally on a given port.""" repo = ctx.obj["CHDIR"] - cli_check_repo(repo) - store = FeatureStore(repo_path=str(repo)) + fs_yaml_file = ctx.obj["FS_YAML_FILE"] + cli_check_repo(repo, fs_yaml_file) + store = FeatureStore(repo_path=str(repo), fs_yaml_file=fs_yaml_file) store.serve_transformations(port) @@ -724,8 +760,9 @@ def validate( START_TS and END_TS should be in ISO 8601 format, e.g. '2021-07-16T19:20:01' """ repo = ctx.obj["CHDIR"] - cli_check_repo(repo) - store = FeatureStore(repo_path=str(repo)) + fs_yaml_file = ctx.obj["FS_YAML_FILE"] + cli_check_repo(repo, fs_yaml_file) + store = FeatureStore(repo_path=str(repo), fs_yaml_file=fs_yaml_file) feature_service = store.get_feature_service(name=feature_service) reference = store.get_validation_reference(reference) @@ -766,7 +803,8 @@ def repo_upgrade(ctx: click.Context, write: bool): Upgrade a feature repo in place. """ repo = ctx.obj["CHDIR"] - cli_check_repo(repo) + fs_yaml_file = ctx.obj["FS_YAML_FILE"] + cli_check_repo(repo, fs_yaml_file) try: RepoUpgrader(repo, write).upgrade() except FeastProviderLoginError as e: diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index ac682fb6cd..4af634864c 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -114,13 +114,14 @@ class FeatureStore: repo_path: Path _registry: BaseRegistry _provider: Provider - _go_server: "EmbeddedOnlineFeatureServer" + _go_server: Optional["EmbeddedOnlineFeatureServer"] @log_exceptions def __init__( self, repo_path: Optional[str] = None, config: Optional[RepoConfig] = None, + fs_yaml_file: Optional[Path] = None, ): """ Creates a FeatureStore object. @@ -128,16 +129,26 @@ def __init__( Raises: ValueError: If both or neither of repo_path and config are specified. """ - if repo_path is not None and config is not None: - raise ValueError("You cannot specify both repo_path and config.") - if config is not None: + if fs_yaml_file is not None and config is not None: + raise ValueError("You cannot specify both fs_yaml_dir and config.") + + if repo_path: + self.repo_path = Path(repo_path) + else: self.repo_path = Path(os.getcwd()) + + # If config is specified, or fs_yaml_file is specified, those take precedence over + # the default feature_store.yaml location under repo_path. + if config is not None: self.config = config - elif repo_path is not None: - self.repo_path = Path(repo_path) - self.config = load_repo_config(Path(repo_path)) + elif fs_yaml_file is not None: + self.config = load_repo_config(self.repo_path, fs_yaml_file) + elif repo_path: + self.config = load_repo_config( + self.repo_path, Path(repo_path) / "feature_store.yaml" + ) else: - raise ValueError("Please specify one of repo_path or config.") + raise ValueError("Please specify one of fs_yaml_dir or config.") registry_config = self.config.get_registry_config() if registry_config.registry_type == "sql": @@ -146,7 +157,8 @@ def __init__( r = Registry(registry_config, repo_path=self.repo_path) r._initialize_registry(self.config.project) self._registry = r - self._provider = get_provider(self.config, self.repo_path) + + self._provider = get_provider(self.config) self._go_server = None @log_exceptions @@ -1569,7 +1581,7 @@ def _get_online_features( } # If the embedded Go code is enabled, send request to it instead of going through regular Python logic. - if self.config.go_feature_retrieval: + if self.config.go_feature_retrieval and self._go_server: self._lazy_init_go_server() entity_native_values: Dict[str, List[Any]] @@ -2221,7 +2233,7 @@ def serve( ) -> None: """Start the feature consumption server locally on a given port.""" type_ = type_.lower() - if self.config.go_feature_serving: + if self.config.go_feature_serving and self._go_server: # Start go server instead of python if the flag is enabled self._lazy_init_go_server() enable_logging = ( diff --git a/sdk/python/feast/infra/provider.py b/sdk/python/feast/infra/provider.py index e99a09a9e2..c5f9380677 100644 --- a/sdk/python/feast/infra/provider.py +++ b/sdk/python/feast/infra/provider.py @@ -297,7 +297,7 @@ def get_feature_server_endpoint(self) -> Optional[str]: return None -def get_provider(config: RepoConfig, repo_path: Path) -> Provider: +def get_provider(config: RepoConfig) -> Provider: if "." not in config.provider: if config.provider not in PROVIDERS_CLASS_FOR_TYPE: raise errors.FeastProviderNotImplementedError(config.provider) diff --git a/sdk/python/feast/repo_config.py b/sdk/python/feast/repo_config.py index 34df1a215f..703f14fde4 100644 --- a/sdk/python/feast/repo_config.py +++ b/sdk/python/feast/repo_config.py @@ -483,8 +483,8 @@ def get_feature_server_config_from_type(feature_server_type: str): return import_class(module_name, config_class_name, config_class_name) -def load_repo_config(repo_path: Path) -> RepoConfig: - config_path = repo_path / "feature_store.yaml" +def load_repo_config(repo_path: Path, fs_yaml_file: Path) -> RepoConfig: + config_path = fs_yaml_file with open(config_path) as f: raw_config = yaml.safe_load(os.path.expandvars(f.read())) diff --git a/sdk/python/feast/repo_operations.py b/sdk/python/feast/repo_operations.py index 91cab2e992..916250542d 100644 --- a/sdk/python/feast/repo_operations.py +++ b/sdk/python/feast/repo_operations.py @@ -325,13 +325,12 @@ def registry_dump(repo_config: RepoConfig, repo_path: Path) -> str: return json.dumps(registry_dict, indent=2, sort_keys=True) -def cli_check_repo(repo_path: Path): +def cli_check_repo(repo_path: Path, fs_yaml_file: Path): sys.path.append(str(repo_path)) - config_path = repo_path / "feature_store.yaml" - if not config_path.exists(): + if not fs_yaml_file.exists(): print( - f"Can't find feature_store.yaml at {repo_path}. Make sure you're running feast from an initialized " - f"feast repository. " + f"Can't find feature repo configuration file at {fs_yaml_file}. " + "Make sure you're running feast from an initialized feast repository." ) sys.exit(1) diff --git a/sdk/python/tests/unit/cli/test_cli.py b/sdk/python/tests/unit/cli/test_cli.py index 9b535ce8fb..f55e5ffc06 100644 --- a/sdk/python/tests/unit/cli/test_cli.py +++ b/sdk/python/tests/unit/cli/test_cli.py @@ -72,6 +72,19 @@ def test_3rd_party_registry_store() -> None: assertpy.assert_that(return_code).is_equal_to(0) +def test_3rd_party_registry_store_with_fs_yaml_override() -> None: + runner = CliRunner() + + fs_yaml_file = "test_fs.yaml" + with setup_third_party_registry_store_repo( + "foo.registry_store.FooRegistryStore", fs_yaml_file_name=fs_yaml_file + ) as repo_path: + return_code, output = runner.run_with_output( + ["--feature-store-yaml", fs_yaml_file, "apply"], cwd=repo_path + ) + assertpy.assert_that(return_code).is_equal_to(0) + + @contextmanager def setup_third_party_provider_repo(provider_name: str): with tempfile.TemporaryDirectory() as repo_dir_name: @@ -106,13 +119,15 @@ def setup_third_party_provider_repo(provider_name: str): @contextmanager -def setup_third_party_registry_store_repo(registry_store: str): +def setup_third_party_registry_store_repo( + registry_store: str, fs_yaml_file_name: str = "feature_store.yaml" +): with tempfile.TemporaryDirectory() as repo_dir_name: # Construct an example repo in a temporary dir repo_path = Path(repo_dir_name) - repo_config = repo_path / "feature_store.yaml" + repo_config = repo_path / fs_yaml_file_name repo_config.write_text( dedent( diff --git a/sdk/python/tests/unit/infra/scaffolding/test_repo_config.py b/sdk/python/tests/unit/infra/scaffolding/test_repo_config.py index 8cbe461b0f..22fd1e696f 100644 --- a/sdk/python/tests/unit/infra/scaffolding/test_repo_config.py +++ b/sdk/python/tests/unit/infra/scaffolding/test_repo_config.py @@ -21,7 +21,7 @@ def _test_config(config_text, expect_error: Optional[str]): error = None rc = None try: - rc = load_repo_config(repo_path) + rc = load_repo_config(repo_path, repo_config) except FeastConfigError as e: error = e