diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index 2af349cd3e..50213e80dc 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -809,6 +809,26 @@ def tqdm_builder(length): feature_view, self.project, start_date, end_date ) + @log_exceptions_and_usage + def write_to_online_store( + self, feature_view_name: str, df: pd.DataFrame, + ): + """ + ingests data directly into the Online store + """ + if not flags_helper.enable_direct_ingestion_to_online_store(self.config): + raise ExperimentalFeatureNotEnabled( + flags.FLAG_DIRECT_INGEST_TO_ONLINE_STORE + ) + + # TODO: restrict this to work with online StreamFeatureViews and validate the FeatureView type + feature_view = self._registry.get_feature_view(feature_view_name, self.project) + entities = [] + for entity_name in feature_view.entities: + entities.append(self._registry.get_entity(entity_name, self.project)) + provider = self._get_provider() + provider.ingest_df(feature_view, entities, df) + @log_exceptions_and_usage def get_online_features( self, diff --git a/sdk/python/feast/flags.py b/sdk/python/feast/flags.py index c9e7416863..5c6357ec26 100644 --- a/sdk/python/feast/flags.py +++ b/sdk/python/feast/flags.py @@ -2,6 +2,7 @@ FLAG_ON_DEMAND_TRANSFORM_NAME = "on_demand_transforms" FLAG_PYTHON_FEATURE_SERVER_NAME = "python_feature_server" FLAG_AWS_LAMBDA_FEATURE_SERVER_NAME = "aws_lambda_feature_server" +FLAG_DIRECT_INGEST_TO_ONLINE_STORE = "direct_ingest_to_online_store" ENV_FLAG_IS_TEST = "IS_TEST" FLAG_NAMES = { @@ -9,4 +10,5 @@ FLAG_ON_DEMAND_TRANSFORM_NAME, FLAG_PYTHON_FEATURE_SERVER_NAME, FLAG_AWS_LAMBDA_FEATURE_SERVER_NAME, + FLAG_DIRECT_INGEST_TO_ONLINE_STORE, } diff --git a/sdk/python/feast/flags_helper.py b/sdk/python/feast/flags_helper.py index 85770c39ec..89784d6ecc 100644 --- a/sdk/python/feast/flags_helper.py +++ b/sdk/python/feast/flags_helper.py @@ -41,3 +41,7 @@ def enable_python_feature_server(repo_config: RepoConfig) -> bool: def enable_aws_lambda_feature_server(repo_config: RepoConfig) -> bool: return feature_flag_enabled(repo_config, flags.FLAG_AWS_LAMBDA_FEATURE_SERVER_NAME) + + +def enable_direct_ingestion_to_online_store(repo_config: RepoConfig) -> bool: + return feature_flag_enabled(repo_config, flags.FLAG_DIRECT_INGEST_TO_ONLINE_STORE) diff --git a/sdk/python/feast/infra/passthrough_provider.py b/sdk/python/feast/infra/passthrough_provider.py index 3e4c0a3485..45745cfed0 100644 --- a/sdk/python/feast/infra/passthrough_provider.py +++ b/sdk/python/feast/infra/passthrough_provider.py @@ -2,6 +2,7 @@ from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union import pandas +import pyarrow as pa from tqdm import tqdm from feast.entity import Entity @@ -82,6 +83,21 @@ def online_read( return result + def ingest_df( + self, feature_view: FeatureView, entities: List[Entity], df: pandas.DataFrame, + ): + table = pa.Table.from_pandas(df) + + if feature_view.batch_source.field_mapping is not None: + table = _run_field_mapping(table, feature_view.batch_source.field_mapping) + + join_keys = [entity.join_key for entity in entities] + rows_to_write = _convert_arrow_to_proto(table, feature_view, join_keys) + + self.online_write_batch( + self.repo_config, feature_view, rows_to_write, progress=None + ) + def materialize_single_feature_view( self, config: RepoConfig, diff --git a/sdk/python/feast/infra/provider.py b/sdk/python/feast/infra/provider.py index 4debd59aa8..4b7c8069f2 100644 --- a/sdk/python/feast/infra/provider.py +++ b/sdk/python/feast/infra/provider.py @@ -98,6 +98,14 @@ def online_write_batch( """ ... + def ingest_df( + self, feature_view: FeatureView, entities: List[Entity], df: pandas.DataFrame, + ): + """ + Ingests a DataFrame directly into the online store + """ + pass + @abc.abstractmethod def materialize_single_feature_view( self, diff --git a/sdk/python/tests/integration/online_store/test_universal_online.py b/sdk/python/tests/integration/online_store/test_universal_online.py index b9824ae163..259df20c57 100644 --- a/sdk/python/tests/integration/online_store/test_universal_online.py +++ b/sdk/python/tests/integration/online_store/test_universal_online.py @@ -1,3 +1,4 @@ +import datetime import itertools import unittest from datetime import timedelta @@ -19,6 +20,49 @@ driver, location, ) +from tests.integration.feature_repos.universal.feature_views import ( + create_driver_hourly_stats_feature_view, +) + + +@pytest.mark.integration +def test_write_to_online_store(environment, universal_data_sources): + fs = environment.feature_store + entities, datasets, data_sources = universal_data_sources + driver_hourly_stats = create_driver_hourly_stats_feature_view( + data_sources["driver"] + ) + driver_entity = driver() + + # Register Feature View and Entity + fs.apply([driver_hourly_stats, driver_entity]) + + # fake data to ingest into Online Store + data = { + "driver_id": [123], + "conv_rate": [0.85], + "acc_rate": [0.91], + "avg_daily_trips": [14], + "event_timestamp": [pd.Timestamp(datetime.datetime.utcnow()).round("ms")], + "created": [pd.Timestamp(datetime.datetime.utcnow()).round("ms")], + } + df_data = pd.DataFrame(data) + + # directly ingest data into the Online Store + fs.write_to_online_store("driver_stats", df_data) + + # assert the right data is in the Online Store + df = fs.get_online_features( + features=[ + "driver_stats:avg_daily_trips", + "driver_stats:acc_rate", + "driver_stats:conv_rate", + ], + entity_rows=[{"driver": 123}], + ).to_df() + assert df["avg_daily_trips"].iloc[0] == 14 + assert df["acc_rate"].iloc[0] == 0.91 + assert df["conv_rate"].iloc[0] == 0.85 @pytest.mark.integration