From 9db7e96326ad04ab254df3f54d271810f2f834ef Mon Sep 17 00:00:00 2001 From: Lokesh Rangineni Date: Wed, 12 Jun 2024 15:46:39 -0400 Subject: [PATCH] Draft changes to add remote online store to feast. Signed-off-by: Lokesh Rangineni Adding the integration test and remote online creator class so that it will fit into existing integration testing framework. Signed-off-by: Lokesh Rangineni Fix after rebase Signed-off-by: Lokesh Rangineni Removing the RemoteOnlineStoreCreator and adding custom integration test case. Incorporating the code review comments. Signed-off-by: Lokesh Rangineni reformatting the code, removing unnecessary braces. Signed-off-by: Lokesh Rangineni Trying to fix the errors reported in make lint-python Signed-off-by: Lokesh Rangineni Ran the command make format-python and trying to see if it fixes the lint errors. Signed-off-by: Lokesh Rangineni increasing the server start timeout to see if it fixes the integration test cases. Signed-off-by: Lokesh Rangineni checking changes after make format-python Signed-off-by: Lokesh Rangineni trying to see if this fixes the PR integrationt test failure. Signed-off-by: Lokesh Rangineni Signed-off-by: Lokesh Rangineni checking in the changes for make format-python Signed-off-by: Lokesh Rangineni Upgrading python version to 3.11, adding support for 3.11 as well. Signed-off-by: Lokesh Rangineni chore: Bump macOS runners to macos-13 (#4152) bump macos runner to 13 Signed-off-by: tokoko Signed-off-by: Lokesh Rangineni chore: Use pixi to lock python dependencies in a single command (#4114) use pixi to lock python dependencies in a single command Signed-off-by: tokoko Signed-off-by: Lokesh Rangineni feat: List all feature views (#4256) * feature: Adding type to base feature view Signed-off-by: Francisco Javier Arceo * fixed linter Signed-off-by: Francisco Javier Arceo * fixed type and meta Signed-off-by: Francisco Javier Arceo * adding new listing Signed-off-by: Francisco Javier Arceo * updated Signed-off-by: Francisco Javier Arceo * cleaning up changes Signed-off-by: Francisco Javier Arceo * reverting FV proto Signed-off-by: Francisco Javier Arceo * doing simple way Signed-off-by: Francisco Javier Arceo * added a test Signed-off-by: Francisco Javier Arceo * updated to add warnings Signed-off-by: Francisco Javier Arceo --------- Signed-off-by: Francisco Javier Arceo feat: Adding vector search for sqlite (#4176) * feat: Adding vector search for sqlite Signed-off-by: Francisco Javier Arceo * adding the sqlite_vss dependency Signed-off-by: Francisco Javier Arceo * linter Signed-off-by: Francisco Javier Arceo * latest progress Signed-off-by: Francisco Javier Arceo * uploading latest progress Signed-off-by: Francisco Javier Arceo * updated function Signed-off-by: Francisco Javier Arceo * adding configuration Signed-off-by: Francisco Javier Arceo * adding current progress Signed-off-by: Francisco Javier Arceo * updating requirements files Signed-off-by: Francisco Javier Arceo * moving to sqlite-vec Signed-off-by: Francisco Javier Arceo * updating sqlite.py Signed-off-by: Francisco Javier Arceo * checking in progress Signed-off-by: Francisco Javier Arceo * updated test type Signed-off-by: Francisco Javier Arceo * got the initialization working, nice Signed-off-by: Francisco Javier Arceo * checking in progress from last night Signed-off-by: Francisco Javier Arceo * removing unnecessary stuff Signed-off-by: Francisco Javier Arceo * fixing merge conflicts Signed-off-by: Francisco Javier Arceo * removing files changed accidentally] Signed-off-by: Francisco Javier Arceo * uploading current progress...things run but need to update the virtual table insertion Signed-off-by: Francisco Javier Arceo * linted Signed-off-by: Francisco Javier Arceo * adding working notes Signed-off-by: Francisco Javier Arceo * found a bug, original feature_store.py was only grabbing first feature view, adjusted Signed-off-by: Francisco Javier Arceo * cant use a string have to verify it is a proper FeatureView object Signed-off-by: Francisco Javier Arceo * updated got it working, need to fix some other stuff still Signed-off-by: Francisco Javier Arceo * working Signed-off-by: Francisco Javier Arceo * linter Signed-off-by: Francisco Javier Arceo * fixing some type issues Signed-off-by: Francisco Javier Arceo * fixed typing and lint issues Signed-off-by: Francisco Javier Arceo * updated dependencies Signed-off-by: Francisco Javier Arceo * fix for pixi and updating requirements Signed-off-by: Francisco Javier Arceo * fixed type Signed-off-by: Francisco Javier Arceo * linter Signed-off-by: Francisco Javier Arceo * testing sqlite_vec import Signed-off-by: Francisco Javier Arceo * adding minimal example test Signed-off-by: Francisco Javier Arceo * lint Signed-off-by: Francisco Javier Arceo * testing raw sqlite Signed-off-by: Francisco Javier Arceo * Printing package version * printing version Signed-off-by: Francisco Javier Arceo * updated requirements * rebuilding requirments Signed-off-by: Francisco Javier Arceo * only going to run this on 3.10 for now Signed-off-by: Francisco Javier Arceo * updated docs for sqlite caveats Signed-off-by: Francisco Javier Arceo * adding reason Signed-off-by: Francisco Javier Arceo * skipping Signed-off-by: Francisco Javier Arceo * updated tests Signed-off-by: Francisco Javier Arceo * removing print Signed-off-by: Francisco Javier Arceo * added method call Signed-off-by: Francisco Javier Arceo * added prubt Signed-off-by: Francisco Javier Arceo * added print Signed-off-by: Francisco Javier Arceo * removing print Signed-off-by: Francisco Javier Arceo * adding check in sqlite Signed-off-by: Francisco Javier Arceo * missed an = Signed-off-by: Francisco Javier Arceo * still running on 3.11 Signed-off-by: Francisco Javier Arceo * typo Signed-off-by: Francisco Javier Arceo * fix Signed-off-by: Francisco Javier Arceo * fix Signed-off-by: Francisco Javier Arceo * updated setup and docs Signed-off-by: Francisco Javier Arceo * renamed things Signed-off-by: Francisco Javier Arceo --------- Signed-off-by: Francisco Javier Arceo squashing the last 15 commits to one. Merge branch 'master' into feature/adding-remote-onlinestore-rebase Adding documentation and incorporating code review comment. Signed-off-by: Lokesh Rangineni Adding documentation and incorporating code review comment. Signed-off-by: Lokesh Rangineni Merge remote-tracking branch 'fork/feature/adding-remote-onlinestore-rebase' into feature/adding-remote-onlinestore-rebase Signed-off-by: Lokesh Rangineni --- .github/workflows/pr_integration_tests.yml | 2 +- Makefile | 6 - docs/SUMMARY.md | 1 + docs/reference/online-stores/README.md | 4 + docs/reference/online-stores/remote.md | 21 ++ infra/scripts/pixi/pixi.toml | 8 +- .../feast/infra/online_stores/remote.py | 174 ++++++++++++++ sdk/python/feast/repo_config.py | 1 + sdk/python/tests/conftest.py | 18 +- .../feature_repos/repo_configuration.py | 6 +- .../online_store/test_remote_online_store.py | 222 ++++++++++++++++++ .../tests/unit/online_store/__init__.py | 0 setup.py | 1 - 13 files changed, 444 insertions(+), 20 deletions(-) create mode 100644 docs/reference/online-stores/remote.md create mode 100644 sdk/python/feast/infra/online_stores/remote.py create mode 100644 sdk/python/tests/integration/online_store/test_remote_online_store.py create mode 100644 sdk/python/tests/unit/online_store/__init__.py diff --git a/.github/workflows/pr_integration_tests.yml b/.github/workflows/pr_integration_tests.yml index bcf1be7d492..f4a9132d292 100644 --- a/.github/workflows/pr_integration_tests.yml +++ b/.github/workflows/pr_integration_tests.yml @@ -23,7 +23,7 @@ jobs: strategy: fail-fast: false matrix: - python-version: [ "3.9", "3.10", "3.11" ] + python-version: [ "3.11" ] os: [ ubuntu-latest ] env: OS: ${{ matrix.os }} diff --git a/Makefile b/Makefile index f00dd00d981..c7fdc861000 100644 --- a/Makefile +++ b/Makefile @@ -70,12 +70,6 @@ lock-python-dependencies-all: pixi run --environment py311 --manifest-path infra/scripts/pixi/pixi.toml "uv pip compile --system --no-strip-extras setup.py --output-file sdk/python/requirements/py3.11-requirements.txt" pixi run --environment py311 --manifest-path infra/scripts/pixi/pixi.toml "uv pip compile --system --no-strip-extras setup.py --extra ci --output-file sdk/python/requirements/py3.11-ci-requirements.txt" -lock-python-dependencies-all: - pixi run --environment py39 --manifest-path infra/scripts/pixi/pixi.toml "python -m piptools compile -U --output-file sdk/python/requirements/py3.9-requirements.txt" - pixi run --environment py39 --manifest-path infra/scripts/pixi/pixi.toml "python -m piptools compile -U --extra ci --output-file sdk/python/requirements/py3.9-ci-requirements.txt" - pixi run --environment py310 --manifest-path infra/scripts/pixi/pixi.toml "python -m piptools compile -U --output-file sdk/python/requirements/py3.10-requirements.txt" - pixi run --environment py310 --manifest-path infra/scripts/pixi/pixi.toml "python -m piptools compile -U --extra ci --output-file sdk/python/requirements/py3.10-ci-requirements.txt" - benchmark-python: IS_TEST=True python -m pytest --integration --benchmark --benchmark-autosave --benchmark-save-data sdk/python/tests diff --git a/docs/SUMMARY.md b/docs/SUMMARY.md index 2e205dee0a1..995c04e5f1e 100644 --- a/docs/SUMMARY.md +++ b/docs/SUMMARY.md @@ -95,6 +95,7 @@ * [Datastore](reference/online-stores/datastore.md) * [DynamoDB](reference/online-stores/dynamodb.md) * [Bigtable](reference/online-stores/bigtable.md) + * [Remote](reference/online-stores/remote.md) * [PostgreSQL (contrib)](reference/online-stores/postgres.md) * [Cassandra + Astra DB (contrib)](reference/online-stores/cassandra.md) * [MySQL (contrib)](reference/online-stores/mysql.md) diff --git a/docs/reference/online-stores/README.md b/docs/reference/online-stores/README.md index 686e820f4e7..b5f4eb8de89 100644 --- a/docs/reference/online-stores/README.md +++ b/docs/reference/online-stores/README.md @@ -61,3 +61,7 @@ Please see [Online Store](../../getting-started/architecture-and-components/onli {% content-ref url="scylladb.md" %} [scylladb.md](scylladb.md) {% endcontent-ref %} + +{% content-ref url="remote.md" %} +[remote.md](remote.md) +{% endcontent-ref %} diff --git a/docs/reference/online-stores/remote.md b/docs/reference/online-stores/remote.md new file mode 100644 index 00000000000..c560fa6f223 --- /dev/null +++ b/docs/reference/online-stores/remote.md @@ -0,0 +1,21 @@ +# Remote online store + +## Description + +This remote online store will let you interact with remote feature server. At this moment this only supports the read operation. You can use this online store and able retrieve online features `store.get_online_features` from remote feature server. + +## Examples + +The registry is pointing to registry of remote feature store. If it is not accessible then should be configured to use remote registry. + +{% code title="feature_store.yaml" %} +```yaml +project: my-local-project + registry: /remote/data/registry.db + provider: local + online_store: + path: http://localhost:6566 + type: remote + entity_key_serialization_version: 2 +``` +{% endcode %} \ No newline at end of file diff --git a/infra/scripts/pixi/pixi.toml b/infra/scripts/pixi/pixi.toml index 9a643365700..10179339f70 100644 --- a/infra/scripts/pixi/pixi.toml +++ b/infra/scripts/pixi/pixi.toml @@ -6,7 +6,7 @@ platforms = ["linux-64", "osx-arm64"] [tasks] [dependencies] -pip-tools = ">=7.4.1,<7.5" +uv = ">=0.1.39,<0.2" [feature.py39.dependencies] python = "~=3.9.0" @@ -14,6 +14,10 @@ python = "~=3.9.0" [feature.py310.dependencies] python = "~=3.10.0" +[feature.py311.dependencies] +python = "~=3.11.0" + [environments] py39 = ["py39"] -py310 = ["py310"] \ No newline at end of file +py310 = ["py310"] +py311 = ["py311"] diff --git a/sdk/python/feast/infra/online_stores/remote.py b/sdk/python/feast/infra/online_stores/remote.py new file mode 100644 index 00000000000..a2dd35626dc --- /dev/null +++ b/sdk/python/feast/infra/online_stores/remote.py @@ -0,0 +1,174 @@ +# Copyright 2021 The Feast Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import json +import logging +from datetime import datetime +from typing import Any, Callable, Dict, List, Literal, Optional, Sequence, Tuple + +import requests +from pydantic import StrictStr + +from feast import Entity, FeatureView, RepoConfig +from feast.infra.online_stores.online_store import OnlineStore +from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto +from feast.protos.feast.types.Value_pb2 import Value as ValueProto +from feast.repo_config import FeastConfigBaseModel +from feast.type_map import python_values_to_proto_values +from feast.value_type import ValueType + +logger = logging.getLogger(__name__) + + +class RemoteOnlineStoreConfig(FeastConfigBaseModel): + """Remote Online store config for remote online store""" + + type: Literal["remote"] = "remote" + """Online store type selector""" + + path: StrictStr = "http://localhost:6566" + """ str: Path to metadata store. + If type is 'remote', then this is a URL for registry server """ + + +class RemoteOnlineStore(OnlineStore): + """ + remote online store implementation wrapper to communicate with feast online server. + """ + + def online_write_batch( + self, + config: RepoConfig, + table: FeatureView, + data: List[ + Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]] + ], + progress: Optional[Callable[[int], Any]], + ) -> None: + pass + + def online_read( + self, + config: RepoConfig, + table: FeatureView, + entity_keys: List[EntityKeyProto], + requested_features: Optional[List[str]] = None, + ) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]: + assert isinstance(config.online_store, RemoteOnlineStoreConfig) + config.online_store.__class__ = RemoteOnlineStoreConfig + + req_body = self._construct_online_read_api_json_request( + entity_keys, table, requested_features + ) + response = requests.post( + f"{config.online_store.path}/get-online-features", data=req_body + ) + if response.status_code == 200: + logger.debug("Able to retrieve the online features from feature server.") + response_json = json.loads(response.text) + event_ts = self._get_event_ts(response_json) + # Iterating over results and converting the API results in column format to row format. + result_tuples: List[ + Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]] + ] = [] + for feature_value_index in range(len(entity_keys)): + feature_values_dict: Dict[str, ValueProto] = dict() + for index, feature_name in enumerate( + response_json["metadata"]["feature_names"] + ): + if ( + requested_features is not None + and feature_name in requested_features + ): + if ( + response_json["results"][index]["statuses"][ + feature_value_index + ] + == "PRESENT" + ): + message = python_values_to_proto_values( + [ + response_json["results"][index]["values"][ + feature_value_index + ] + ], + ValueType.UNKNOWN, + ) + feature_values_dict[feature_name] = message[0] + else: + feature_values_dict[feature_name] = ValueProto() + + result_tuples.append((event_ts, feature_values_dict)) + return result_tuples + else: + error_msg = f"Unable to retrieve the online store data using feature server API. Error_code={response.status_code}, error_message={response.reason}" + logger.error(error_msg) + raise RuntimeError(error_msg) + + def _construct_online_read_api_json_request( + self, + entity_keys: List[EntityKeyProto], + table: FeatureView, + requested_features: Optional[List[str]] = None, + ): + api_requested_features = [] + if requested_features is not None: + for requested_feature in requested_features: + api_requested_features.append(f"{table.name}:{requested_feature}") + + entity_values = [] + entity_key = "" + for row in entity_keys: + entity_key = row.join_keys[0] + entity_values.append( + getattr(row.entity_values[0], row.entity_values[0].WhichOneof("val")) + ) + + req_body = json.dumps( + { + "features": api_requested_features, + "entities": {entity_key: entity_values}, + } + ) + return req_body + + def _check_if_feature_requested(self, feature_name, requested_features): + for requested_feature in requested_features: + if feature_name in requested_feature: + return True + return False + + def _get_event_ts(self, response_json) -> datetime: + event_ts = "" + if len(response_json["results"]) > 1: + event_ts = response_json["results"][1]["event_timestamps"][0] + return datetime.fromisoformat(event_ts.replace("Z", "+00:00")) + + def update( + self, + config: RepoConfig, + tables_to_delete: Sequence[FeatureView], + tables_to_keep: Sequence[FeatureView], + entities_to_delete: Sequence[Entity], + entities_to_keep: Sequence[Entity], + partial: bool, + ): + pass + + def teardown( + self, + config: RepoConfig, + tables: Sequence[FeatureView], + entities: Sequence[Entity], + ): + pass diff --git a/sdk/python/feast/repo_config.py b/sdk/python/feast/repo_config.py index 6ef81794bf8..b6a767e0f44 100644 --- a/sdk/python/feast/repo_config.py +++ b/sdk/python/feast/repo_config.py @@ -64,6 +64,7 @@ "hazelcast": "feast.infra.online_stores.contrib.hazelcast_online_store.hazelcast_online_store.HazelcastOnlineStore", "ikv": "feast.infra.online_stores.contrib.ikv_online_store.ikv.IKVOnlineStore", "elasticsearch": "feast.infra.online_stores.contrib.elasticsearch.ElasticSearchOnlineStore", + "remote": "feast.infra.online_stores.remote.RemoteOnlineStore", } OFFLINE_STORE_CLASS_FOR_TYPE = { diff --git a/sdk/python/tests/conftest.py b/sdk/python/tests/conftest.py index 7c875fc9bde..03677dd1507 100644 --- a/sdk/python/tests/conftest.py +++ b/sdk/python/tests/conftest.py @@ -32,8 +32,8 @@ create_basic_driver_dataset, create_document_dataset, ) -from tests.integration.feature_repos.integration_test_repo_config import ( # noqa: E402 - IntegrationTestRepoConfig, +from tests.integration.feature_repos.integration_test_repo_config import ( + IntegrationTestRepoConfig, # noqa: E402 ) from tests.integration.feature_repos.repo_configuration import ( # noqa: E402 AVAILABLE_OFFLINE_STORES, @@ -45,8 +45,8 @@ construct_universal_feature_views, construct_universal_test_data, ) -from tests.integration.feature_repos.universal.data_sources.file import ( # noqa: E402 - FileDataSourceCreator, +from tests.integration.feature_repos.universal.data_sources.file import ( + FileDataSourceCreator, # noqa: E402 ) from tests.integration.feature_repos.universal.entities import ( # noqa: E402 customer, @@ -173,7 +173,15 @@ def simple_dataset_2() -> pd.DataFrame: def start_test_local_server(repo_path: str, port: int): fs = FeatureStore(repo_path) - fs.serve("localhost", port, no_access_log=True) + fs.serve( + host="localhost", + port=port, + no_access_log=True, + keep_alive_timeout=30, + type_="http", + workers=1, + registry_ttl_sec=30, + ) @pytest.fixture diff --git a/sdk/python/tests/integration/feature_repos/repo_configuration.py b/sdk/python/tests/integration/feature_repos/repo_configuration.py index 0545496f33a..4c463c3b0e7 100644 --- a/sdk/python/tests/integration/feature_repos/repo_configuration.py +++ b/sdk/python/tests/integration/feature_repos/repo_configuration.py @@ -134,9 +134,7 @@ AVAILABLE_ONLINE_STORES: Dict[ str, Tuple[Union[str, Dict[Any, Any]], Optional[Type[OnlineStoreCreator]]] -] = { - "sqlite": ({"type": "sqlite"}, None), -} +] = {"sqlite": ({"type": "sqlite"}, None)} # Only configure Cloud DWH if running full integration tests if os.getenv("FEAST_IS_LOCAL_TEST", "False") != "True": @@ -153,7 +151,6 @@ AVAILABLE_ONLINE_STORES["datastore"] = ("datastore", None) AVAILABLE_ONLINE_STORES["snowflake"] = (SNOWFLAKE_CONFIG, None) AVAILABLE_ONLINE_STORES["bigtable"] = (BIGTABLE_CONFIG, None) - # Uncomment to test using private Rockset account. Currently not enabled as # there is no dedicated Rockset instance for CI testing and there is no # containerized version of Rockset. @@ -487,7 +484,6 @@ def construct_test_environment( "arn:aws:iam::402087665549:role/lambda_execution_role", ), ) - else: feature_server = LocalFeatureServerConfig( feature_logging=FeatureLoggingConfig(enabled=True) diff --git a/sdk/python/tests/integration/online_store/test_remote_online_store.py b/sdk/python/tests/integration/online_store/test_remote_online_store.py new file mode 100644 index 00000000000..96ddec4c166 --- /dev/null +++ b/sdk/python/tests/integration/online_store/test_remote_online_store.py @@ -0,0 +1,222 @@ +import os +import tempfile +from datetime import datetime +from multiprocessing import Process +from textwrap import dedent + +import pytest + +from feast.feature_store import FeatureStore +from feast.wait import wait_retry_backoff +from tests.conftest import start_test_local_server +from tests.utils.cli_repo_creator import CliRunner +from tests.utils.http_server import check_port_open, free_port + + +@pytest.mark.integration +@pytest.mark.universal_online_stores +def test_remote_online_store_read(): + with tempfile.TemporaryDirectory() as remote_server_tmp_dir, tempfile.TemporaryDirectory() as remote_client_tmp_dir: + server_store, server_url, registry_path = ( + _create_server_store_spin_feature_server(temp_dir=remote_server_tmp_dir) + ) + assert None not in (server_store, server_url, registry_path) + client_store = _create_remote_client_feature_store( + temp_dir=remote_client_tmp_dir, + server_registry_path=str(registry_path), + feature_server_url=server_url, + ) + assert client_store is not None + _assert_non_existing_entity_feature_views_entity( + client_store=client_store, server_store=server_store + ) + _assert_existing_feature_views_entity( + client_store=client_store, server_store=server_store + ) + _assert_non_existing_feature_views( + client_store=client_store, server_store=server_store + ) + + +def _assert_non_existing_entity_feature_views_entity( + client_store: FeatureStore, server_store: FeatureStore +): + features = [ + "driver_hourly_stats:conv_rate", + "driver_hourly_stats:acc_rate", + "driver_hourly_stats:avg_daily_trips", + ] + + entity_rows = [{"driver_id": 1234}] + _assert_client_server_online_stores_are_matching( + client_store=client_store, + server_store=server_store, + features=features, + entity_rows=entity_rows, + ) + + +def _assert_non_existing_feature_views( + client_store: FeatureStore, server_store: FeatureStore +): + features = [ + "driver_hourly_stats1:conv_rate", + "driver_hourly_stats1:acc_rate", + "driver_hourly_stats:avg_daily_trips", + ] + + entity_rows = [{"driver_id": 1001}, {"driver_id": 1002}] + + with pytest.raises( + Exception, match="Feature view driver_hourly_stats1 does not exist" + ): + client_store.get_online_features( + features=features, entity_rows=entity_rows + ).to_dict() + + with pytest.raises( + Exception, match="Feature view driver_hourly_stats1 does not exist" + ): + server_store.get_online_features( + features=features, entity_rows=entity_rows + ).to_dict() + + +def _assert_existing_feature_views_entity( + client_store: FeatureStore, server_store: FeatureStore +): + features = [ + "driver_hourly_stats:conv_rate", + "driver_hourly_stats:acc_rate", + "driver_hourly_stats:avg_daily_trips", + ] + + entity_rows = [{"driver_id": 1001}, {"driver_id": 1002}] + _assert_client_server_online_stores_are_matching( + client_store=client_store, + server_store=server_store, + features=features, + entity_rows=entity_rows, + ) + + features = ["driver_hourly_stats:conv_rate"] + _assert_client_server_online_stores_are_matching( + client_store=client_store, + server_store=server_store, + features=features, + entity_rows=entity_rows, + ) + + +def _assert_client_server_online_stores_are_matching( + client_store: FeatureStore, server_store: FeatureStore, features, entity_rows +): + online_features_from_client = client_store.get_online_features( + features=features, entity_rows=entity_rows + ).to_dict() + + assert online_features_from_client is not None + + online_features_from_server = server_store.get_online_features( + features=features, entity_rows=entity_rows + ).to_dict() + + assert online_features_from_server is not None + assert online_features_from_client is not None + assert online_features_from_client == online_features_from_server + + +def _create_server_store_spin_feature_server(temp_dir): + feast_server_port = free_port() + store = _default_store(str(temp_dir), "REMOTE_ONLINE_SERVER_PROJECT") + server_url = next( + _start_feature_server( + repo_path=str(store.repo_path), server_port=feast_server_port + ) + ) + print(f"Server started successfully, {server_url}") + return store, server_url, os.path.join(store.repo_path, "data", "registry.db") + + +def _default_store(temp_dir, project_name): + runner = CliRunner() + result = runner.run(["init", project_name], cwd=temp_dir) + repo_path = os.path.join(temp_dir, project_name, "feature_repo") + assert result.returncode == 0 + + result = runner.run(["--chdir", repo_path, "apply"], cwd=temp_dir) + assert result.returncode == 0 + + fs = FeatureStore(repo_path=repo_path) + fs.materialize_incremental( + end_date=datetime.utcnow(), feature_views=["driver_hourly_stats"] + ) + return fs + + +def _create_remote_client_feature_store( + temp_dir, server_registry_path: str, feature_server_url: str +): + project_name = "REMOTE_ONLINE_CLIENT_PROJECT" + runner = CliRunner() + result = runner.run(["init", project_name], cwd=temp_dir) + assert result.returncode == 0 + repo_path = os.path.join(temp_dir, project_name, "feature_repo") + _overwrite_remote_client_feature_store_yaml( + repo_path=str(repo_path), + registry_path=server_registry_path, + feature_server_url=feature_server_url, + ) + + result = runner.run(["--chdir", repo_path, "apply"], cwd=temp_dir) + assert result.returncode == 0 + + return FeatureStore(repo_path=repo_path) + + +def _overwrite_remote_client_feature_store_yaml( + repo_path: str, registry_path: str, feature_server_url: str +): + repo_config = os.path.join(repo_path, "feature_store.yaml") + with open(repo_config, "w") as repo_config: + repo_config.write( + dedent( + f""" + project: REMOTE_ONLINE_CLIENT_PROJECT + registry: {registry_path} + provider: local + online_store: + path: {feature_server_url} + type: remote + entity_key_serialization_version: 2 + """ + ) + ) + + +def _start_feature_server(repo_path: str, server_port: int): + feast_server_process = Process( + target=start_test_local_server, args=(repo_path, server_port) + ) + feast_server_process.start() + _time_out_sec: int = 45 + # Wait for server to start + wait_retry_backoff( + lambda: (None, check_port_open("localhost", server_port)), + timeout_secs=_time_out_sec, + timeout_msg=f"Unable to start the feast server in {_time_out_sec} seconds for remote online store type, port={server_port}", + ) + + yield f"http://localhost:{server_port}" + + if feast_server_process.is_alive(): + feast_server_process.kill() + + # wait server to free the port + wait_retry_backoff( + lambda: ( + None, + not check_port_open("localhost", server_port), + ), + timeout_secs=30, + ) diff --git a/sdk/python/tests/unit/online_store/__init__.py b/sdk/python/tests/unit/online_store/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/setup.py b/setup.py index 18ee91acf20..9b3d0e55e62 100644 --- a/setup.py +++ b/setup.py @@ -66,7 +66,6 @@ "uvicorn[standard]>=0.14.0,<1", "gunicorn; platform_system != 'Windows'", "dask[dataframe]>=2024.4.2", - "bowler", # Needed for automatic repo upgrades ] GCP_REQUIRED = [