From e25d66f9626e5fee16c33cdc8bc5857391601031 Mon Sep 17 00:00:00 2001 From: Francisco Javier Arceo Date: Sat, 27 Apr 2024 20:40:56 -0400 Subject: [PATCH 01/10] feat: Adding support for dictionary writes to online store Signed-off-by: Francisco Javier Arceo --- .../unit/online_store/test_online_writes.py | 141 ++++++++++++++++++ 1 file changed, 141 insertions(+) create mode 100644 sdk/python/tests/unit/online_store/test_online_writes.py diff --git a/sdk/python/tests/unit/online_store/test_online_writes.py b/sdk/python/tests/unit/online_store/test_online_writes.py new file mode 100644 index 0000000000..42cb8cf014 --- /dev/null +++ b/sdk/python/tests/unit/online_store/test_online_writes.py @@ -0,0 +1,141 @@ +# Copyright 2022 The Feast Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import tempfile +import unittest +from datetime import datetime, timedelta +from typing import Any, Dict + +import pandas as pd +import pytest + +from feast import Entity, FeatureStore, FeatureView, FileSource, RepoConfig +from feast.driver_test_data import create_driver_hourly_stats_df +from feast.field import Field +from feast.infra.online_stores.sqlite import SqliteOnlineStoreConfig +from feast.on_demand_feature_view import on_demand_feature_view +from feast.types import Float32, Float64, Int64 + + +class TestOnlineWrites(unittest.TestCase): + def setUp(self): + with tempfile.TemporaryDirectory() as data_dir: + self.store = FeatureStore( + config=RepoConfig( + project="test_write_to_online_store", + registry=os.path.join(data_dir, "registry.db"), + provider="local", + entity_key_serialization_version=2, + online_store=SqliteOnlineStoreConfig( + path=os.path.join(data_dir, "online.db") + ), + ) + ) + + # Generate test data. + end_date = datetime.now().replace(microsecond=0, second=0, minute=0) + start_date = end_date - timedelta(days=15) + + driver_entities = [1001, 1002, 1003, 1004, 1005] + driver_df = create_driver_hourly_stats_df( + driver_entities, start_date, end_date + ) + driver_stats_path = os.path.join(data_dir, "driver_stats.parquet") + driver_df.to_parquet( + path=driver_stats_path, allow_truncated_timestamps=True + ) + + driver = Entity(name="driver", join_keys=["driver_id"]) + + driver_stats_source = FileSource( + name="driver_hourly_stats_source", + path=driver_stats_path, + timestamp_field="event_timestamp", + created_timestamp_column="created", + ) + + driver_stats_fv = FeatureView( + name="driver_hourly_stats", + entities=[driver], + ttl=timedelta(days=0), + schema=[ + Field(name="conv_rate", dtype=Float32), + Field(name="acc_rate", dtype=Float32), + Field(name="avg_daily_trips", dtype=Int64), + ], + online=True, + source=driver_stats_source, + ) + + @on_demand_feature_view( + sources=[driver_stats_fv[["conv_rate", "acc_rate"]]], + schema=[Field(name="conv_rate_plus_acc", dtype=Float64)], + mode="python", + ) + def test_view(inputs: Dict[str, Any]) -> Dict[str, Any]: + output: Dict[str, Any] = { + "conv_rate_plus_acc": [ + conv_rate + acc_rate + for conv_rate, acc_rate in zip( + inputs["conv_rate"], inputs["acc_rate"] + ) + ] + } + return output + + self.store.apply( + [ + driver, + driver_stats_source, + driver_stats_fv, + test_view, + ] + ) + self.store.write_to_online_store( + feature_view_name="driver_hourly_stats", df=driver_df + ) + # This will give the intuitive structure of the data as: + # {"driver_id": [..], "conv_rate": [..], "acc_rate": [..], "avg_daily_trips": [..]} + driver_dict = driver_df.to_dict(orient='list') + self.store.write_to_online_store( + feature_view_name="driver_hourly_stats", df=driver_dict, + ) + + def test_online_retrieval(self): + entity_rows = [ + { + "driver_id": 1001, + } + ] + + online_python_response = self.store.get_online_features( + entity_rows=entity_rows, + features=[ + "driver_hourly_stats:conv_rate", + "driver_hourly_stats:acc_rate", + "test_view:conv_rate_plus_acc", + ], + ).to_dict() + + assert len(online_python_response) == 4 + assert all( + key in online_python_response.keys() + for key in [ + "driver_id", + "acc_rate", + "conv_rate", + "conv_rate_plus_acc", + ] + ) From 5aafa4093b2977a62319cfa6891f8496ac2c5e58 Mon Sep 17 00:00:00 2001 From: Francisco Javier Arceo Date: Sat, 27 Apr 2024 21:18:00 -0400 Subject: [PATCH 02/10] Simple approach Signed-off-by: Francisco Javier Arceo --- sdk/python/feast/errors.py | 7 +++++++ sdk/python/feast/feature_store.py | 10 +++++++++- .../tests/unit/online_store/test_online_writes.py | 2 +- 3 files changed, 17 insertions(+), 2 deletions(-) diff --git a/sdk/python/feast/errors.py b/sdk/python/feast/errors.py index b7151ff0c8..92061191eb 100644 --- a/sdk/python/feast/errors.py +++ b/sdk/python/feast/errors.py @@ -420,3 +420,10 @@ def __init__(self, push_source_name: str): class ReadOnlyRegistryException(Exception): def __init__(self): super().__init__("Registry implementation is read-only.") + + +class DataFrameSerializationError(Exception): + def __init__(self, dict_obj: dict): + super().__init__( + f"Failed to serialize the provided dictionary into a pandas DataFrame: {dict_obj.keys()}" + ) \ No newline at end of file diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index e83a24b664..2208cdcd97 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -61,6 +61,7 @@ PushSourceNotFoundException, RequestDataNotFoundInEntityDfException, RequestDataNotFoundInEntityRowsException, + DataFrameSerializationError, ) from feast.feast_object import FeastObject from feast.feature_service import FeatureService @@ -1406,7 +1407,8 @@ def push( def write_to_online_store( self, feature_view_name: str, - df: pd.DataFrame, + df: Optional[pd.DataFrame] = None, + dict: Optional[Dict] = None, allow_registry_cache: bool = True, ): """ @@ -1415,6 +1417,7 @@ def write_to_online_store( Args: feature_view_name: The feature view to which the dataframe corresponds. df: The dataframe to be persisted. + dict: Optional the dictionary object to be written allow_registry_cache (optional): Whether to allow retrieving feature views from a cached registry. """ # TODO: restrict this to work with online StreamFeatureViews and validate the FeatureView type @@ -1426,6 +1429,11 @@ def write_to_online_store( feature_view = self.get_feature_view( feature_view_name, allow_registry_cache=allow_registry_cache ) + if df is None and dict is not None: + try: + df = pd.DataFrame(dict) + except Exception as _: + raise DataFrameSerializationError(dict) provider = self._get_provider() provider.ingest_df(feature_view, df) diff --git a/sdk/python/tests/unit/online_store/test_online_writes.py b/sdk/python/tests/unit/online_store/test_online_writes.py index 42cb8cf014..acee4d2c46 100644 --- a/sdk/python/tests/unit/online_store/test_online_writes.py +++ b/sdk/python/tests/unit/online_store/test_online_writes.py @@ -110,7 +110,7 @@ def test_view(inputs: Dict[str, Any]) -> Dict[str, Any]: # {"driver_id": [..], "conv_rate": [..], "acc_rate": [..], "avg_daily_trips": [..]} driver_dict = driver_df.to_dict(orient='list') self.store.write_to_online_store( - feature_view_name="driver_hourly_stats", df=driver_dict, + feature_view_name="driver_hourly_stats", dict=driver_dict, ) def test_online_retrieval(self): From 11650d851c80e7f65ab81ecf6da8ebc4905d563f Mon Sep 17 00:00:00 2001 From: Francisco Javier Arceo Date: Sat, 27 Apr 2024 21:20:02 -0400 Subject: [PATCH 03/10] lint Signed-off-by: Francisco Javier Arceo --- sdk/python/feast/errors.py | 2 +- sdk/python/feast/feature_store.py | 2 +- sdk/python/tests/unit/online_store/test_online_writes.py | 8 +++----- 3 files changed, 5 insertions(+), 7 deletions(-) diff --git a/sdk/python/feast/errors.py b/sdk/python/feast/errors.py index 92061191eb..009e08838e 100644 --- a/sdk/python/feast/errors.py +++ b/sdk/python/feast/errors.py @@ -426,4 +426,4 @@ class DataFrameSerializationError(Exception): def __init__(self, dict_obj: dict): super().__init__( f"Failed to serialize the provided dictionary into a pandas DataFrame: {dict_obj.keys()}" - ) \ No newline at end of file + ) diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index 2208cdcd97..38fb454d88 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -54,6 +54,7 @@ from feast.dqm.errors import ValidationFailed from feast.entity import Entity from feast.errors import ( + DataFrameSerializationError, DataSourceRepeatNamesException, EntityNotFoundException, FeatureNameCollisionError, @@ -61,7 +62,6 @@ PushSourceNotFoundException, RequestDataNotFoundInEntityDfException, RequestDataNotFoundInEntityRowsException, - DataFrameSerializationError, ) from feast.feast_object import FeastObject from feast.feature_service import FeatureService diff --git a/sdk/python/tests/unit/online_store/test_online_writes.py b/sdk/python/tests/unit/online_store/test_online_writes.py index acee4d2c46..c5c965e726 100644 --- a/sdk/python/tests/unit/online_store/test_online_writes.py +++ b/sdk/python/tests/unit/online_store/test_online_writes.py @@ -18,9 +18,6 @@ from datetime import datetime, timedelta from typing import Any, Dict -import pandas as pd -import pytest - from feast import Entity, FeatureStore, FeatureView, FileSource, RepoConfig from feast.driver_test_data import create_driver_hourly_stats_df from feast.field import Field @@ -108,9 +105,10 @@ def test_view(inputs: Dict[str, Any]) -> Dict[str, Any]: ) # This will give the intuitive structure of the data as: # {"driver_id": [..], "conv_rate": [..], "acc_rate": [..], "avg_daily_trips": [..]} - driver_dict = driver_df.to_dict(orient='list') + driver_dict = driver_df.to_dict(orient="list") self.store.write_to_online_store( - feature_view_name="driver_hourly_stats", dict=driver_dict, + feature_view_name="driver_hourly_stats", + dict=driver_dict, ) def test_online_retrieval(self): From 615ec3510a5f06d044b0580ca53fdbbee1e90237 Mon Sep 17 00:00:00 2001 From: Francisco Javier Arceo Date: Sat, 27 Apr 2024 21:26:09 -0400 Subject: [PATCH 04/10] adding error if both are missing Signed-off-by: Francisco Javier Arceo --- sdk/python/feast/feature_store.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index 38fb454d88..6901dbbd31 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -1429,6 +1429,8 @@ def write_to_online_store( feature_view = self.get_feature_view( feature_view_name, allow_registry_cache=allow_registry_cache ) + if df is not None and dict is not None: + raise ValueError("Both df and dict cannot be provided at the same time.") if df is None and dict is not None: try: df = pd.DataFrame(dict) From 605bc2644d830a0317cac6e8ef36efec3e260292 Mon Sep 17 00:00:00 2001 From: Francisco Javier Arceo Date: Sun, 28 Apr 2024 06:11:38 -0400 Subject: [PATCH 05/10] rename dict to input_dict Signed-off-by: Francisco Javier Arceo --- sdk/python/feast/errors.py | 4 ++-- sdk/python/feast/feature_store.py | 12 ++++++------ 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/sdk/python/feast/errors.py b/sdk/python/feast/errors.py index 009e08838e..52fefce9d9 100644 --- a/sdk/python/feast/errors.py +++ b/sdk/python/feast/errors.py @@ -423,7 +423,7 @@ def __init__(self): class DataFrameSerializationError(Exception): - def __init__(self, dict_obj: dict): + def __init__(self, input_dict: dict): super().__init__( - f"Failed to serialize the provided dictionary into a pandas DataFrame: {dict_obj.keys()}" + f"Failed to serialize the provided dictionary into a pandas DataFrame: {input_dict.keys()}" ) diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index 6901dbbd31..da798c8def 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -1408,7 +1408,7 @@ def write_to_online_store( self, feature_view_name: str, df: Optional[pd.DataFrame] = None, - dict: Optional[Dict] = None, + input_dict: Optional[Dict] = None, allow_registry_cache: bool = True, ): """ @@ -1417,7 +1417,7 @@ def write_to_online_store( Args: feature_view_name: The feature view to which the dataframe corresponds. df: The dataframe to be persisted. - dict: Optional the dictionary object to be written + input_dict: Optional the dictionary object to be written allow_registry_cache (optional): Whether to allow retrieving feature views from a cached registry. """ # TODO: restrict this to work with online StreamFeatureViews and validate the FeatureView type @@ -1429,13 +1429,13 @@ def write_to_online_store( feature_view = self.get_feature_view( feature_view_name, allow_registry_cache=allow_registry_cache ) - if df is not None and dict is not None: + if df is not None and input_dict is not None: raise ValueError("Both df and dict cannot be provided at the same time.") - if df is None and dict is not None: + if df is None and input_dict is not None: try: - df = pd.DataFrame(dict) + df = pd.DataFrame(input_dict) except Exception as _: - raise DataFrameSerializationError(dict) + raise DataFrameSerializationError(input_dict) provider = self._get_provider() provider.ingest_df(feature_view, df) From f363950d517f11734ead9a33247bf545e132641e Mon Sep 17 00:00:00 2001 From: Francisco Javier Arceo Date: Sun, 28 Apr 2024 06:24:34 -0400 Subject: [PATCH 06/10] updated input arg to test Signed-off-by: Francisco Javier Arceo --- sdk/python/tests/unit/online_store/test_online_writes.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/python/tests/unit/online_store/test_online_writes.py b/sdk/python/tests/unit/online_store/test_online_writes.py index c5c965e726..a18d26d74d 100644 --- a/sdk/python/tests/unit/online_store/test_online_writes.py +++ b/sdk/python/tests/unit/online_store/test_online_writes.py @@ -108,7 +108,7 @@ def test_view(inputs: Dict[str, Any]) -> Dict[str, Any]: driver_dict = driver_df.to_dict(orient="list") self.store.write_to_online_store( feature_view_name="driver_hourly_stats", - dict=driver_dict, + input_dict=driver_dict, ) def test_online_retrieval(self): From d1a596d4f79cf43826091c588a55836209d27e9b Mon Sep 17 00:00:00 2001 From: Francisco Javier Arceo Date: Tue, 30 Apr 2024 11:27:21 -0400 Subject: [PATCH 07/10] Renaming function argument Signed-off-by: Francisco Javier Arceo --- sdk/python/feast/feature_store.py | 21 ++++++++++++------- .../unit/online_store/test_online_writes.py | 2 +- 2 files changed, 14 insertions(+), 9 deletions(-) diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index da798c8def..e919dfc0f0 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -1408,7 +1408,7 @@ def write_to_online_store( self, feature_view_name: str, df: Optional[pd.DataFrame] = None, - input_dict: Optional[Dict] = None, + inputs: Optional[Dict, pd.DataFrame] = None, allow_registry_cache: bool = True, ): """ @@ -1429,13 +1429,18 @@ def write_to_online_store( feature_view = self.get_feature_view( feature_view_name, allow_registry_cache=allow_registry_cache ) - if df is not None and input_dict is not None: - raise ValueError("Both df and dict cannot be provided at the same time.") - if df is None and input_dict is not None: - try: - df = pd.DataFrame(input_dict) - except Exception as _: - raise DataFrameSerializationError(input_dict) + if df is not None and inputs is not None: + raise ValueError("Both df and inputs cannot be provided at the same time.") + if df is None and inputs is not None: + if isinstance(inputs, dict): + try: + df = pd.DataFrame(inputs) + except Exception as _: + raise DataFrameSerializationError(inputs) + elif isinstance(inputs, pd.DataFrame): + pass + else: + raise ValueError("inputs must be a dictionary or a pandas DataFrame.") provider = self._get_provider() provider.ingest_df(feature_view, df) diff --git a/sdk/python/tests/unit/online_store/test_online_writes.py b/sdk/python/tests/unit/online_store/test_online_writes.py index a18d26d74d..5fb1351969 100644 --- a/sdk/python/tests/unit/online_store/test_online_writes.py +++ b/sdk/python/tests/unit/online_store/test_online_writes.py @@ -108,7 +108,7 @@ def test_view(inputs: Dict[str, Any]) -> Dict[str, Any]: driver_dict = driver_df.to_dict(orient="list") self.store.write_to_online_store( feature_view_name="driver_hourly_stats", - input_dict=driver_dict, + inputs=driver_dict, ) def test_online_retrieval(self): From d55311dc663470b85f90df0e493acc1d3cf5231b Mon Sep 17 00:00:00 2001 From: Francisco Javier Arceo Date: Tue, 30 Apr 2024 11:35:53 -0400 Subject: [PATCH 08/10] updated docstring Signed-off-by: Francisco Javier Arceo --- sdk/python/feast/feature_store.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index e919dfc0f0..a6c5866ace 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -1417,7 +1417,7 @@ def write_to_online_store( Args: feature_view_name: The feature view to which the dataframe corresponds. df: The dataframe to be persisted. - input_dict: Optional the dictionary object to be written + inputs: Optional the dictionary object to be written allow_registry_cache (optional): Whether to allow retrieving feature views from a cached registry. """ # TODO: restrict this to work with online StreamFeatureViews and validate the FeatureView type From 778437714515966d092a916115f188a542c7ae4f Mon Sep 17 00:00:00 2001 From: Francisco Javier Arceo Date: Tue, 30 Apr 2024 11:51:20 -0400 Subject: [PATCH 09/10] updated type signature Signed-off-by: Francisco Javier Arceo --- sdk/python/feast/feature_store.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index a6c5866ace..c8c02b0a72 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -1408,7 +1408,7 @@ def write_to_online_store( self, feature_view_name: str, df: Optional[pd.DataFrame] = None, - inputs: Optional[Dict, pd.DataFrame] = None, + inputs: Optional[Union[Dict, pd.DataFrame]] = None, allow_registry_cache: bool = True, ): """ From 30429c27d0b8c7ec866f2bc189792f52984d25ac Mon Sep 17 00:00:00 2001 From: Francisco Javier Arceo Date: Tue, 30 Apr 2024 12:08:16 -0400 Subject: [PATCH 10/10] updated type to be more explicit Signed-off-by: Francisco Javier Arceo --- sdk/python/feast/feature_store.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index c8c02b0a72..bc492e4208 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -1408,7 +1408,7 @@ def write_to_online_store( self, feature_view_name: str, df: Optional[pd.DataFrame] = None, - inputs: Optional[Union[Dict, pd.DataFrame]] = None, + inputs: Optional[Union[Dict[str, List[Any]], pd.DataFrame]] = None, allow_registry_cache: bool = True, ): """