Skip to content

Commit

Permalink
direct data ingestion into Online store (#1939)
Browse files Browse the repository at this point in the history
* direct data ingestion into Online store

Signed-off-by: Vitaly Sergeyev <vsergeyev@better.com>

* feature flags and formatting

Signed-off-by: Vitaly Sergeyev <vsergeyev@better.com>

* add test for write_to_online_store

Signed-off-by: Vitaly Sergeyev <vsergeyev@better.com>

* formatting

Signed-off-by: Vitaly Sergeyev <vsergeyev@better.com>

* move test to test_universal_online

Signed-off-by: Vitaly Sergeyev <vsergeyev@better.com>

* formatting

Signed-off-by: Vitaly Sergeyev <vsergeyev@better.com>

* remove unused import

Signed-off-by: Vitaly Sergeyev <vsergeyev@better.com>

* test flags not needed

Signed-off-by: Vitaly Sergeyev <vsergeyev@better.com>
  • Loading branch information
Vitaly Sergeyev committed Oct 25, 2021
1 parent ce3ad03 commit 6728f2a
Show file tree
Hide file tree
Showing 6 changed files with 94 additions and 0 deletions.
20 changes: 20 additions & 0 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -809,6 +809,26 @@ def tqdm_builder(length):
feature_view, self.project, start_date, end_date
)

@log_exceptions_and_usage
def write_to_online_store(
self, feature_view_name: str, df: pd.DataFrame,
):
"""
ingests data directly into the Online store
"""
if not flags_helper.enable_direct_ingestion_to_online_store(self.config):
raise ExperimentalFeatureNotEnabled(
flags.FLAG_DIRECT_INGEST_TO_ONLINE_STORE
)

# TODO: restrict this to work with online StreamFeatureViews and validate the FeatureView type
feature_view = self._registry.get_feature_view(feature_view_name, self.project)
entities = []
for entity_name in feature_view.entities:
entities.append(self._registry.get_entity(entity_name, self.project))
provider = self._get_provider()
provider.ingest_df(feature_view, entities, df)

@log_exceptions_and_usage
def get_online_features(
self,
Expand Down
2 changes: 2 additions & 0 deletions sdk/python/feast/flags.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@
FLAG_ON_DEMAND_TRANSFORM_NAME = "on_demand_transforms"
FLAG_PYTHON_FEATURE_SERVER_NAME = "python_feature_server"
FLAG_AWS_LAMBDA_FEATURE_SERVER_NAME = "aws_lambda_feature_server"
FLAG_DIRECT_INGEST_TO_ONLINE_STORE = "direct_ingest_to_online_store"
ENV_FLAG_IS_TEST = "IS_TEST"

FLAG_NAMES = {
FLAG_ALPHA_FEATURES_NAME,
FLAG_ON_DEMAND_TRANSFORM_NAME,
FLAG_PYTHON_FEATURE_SERVER_NAME,
FLAG_AWS_LAMBDA_FEATURE_SERVER_NAME,
FLAG_DIRECT_INGEST_TO_ONLINE_STORE,
}
4 changes: 4 additions & 0 deletions sdk/python/feast/flags_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,7 @@ def enable_python_feature_server(repo_config: RepoConfig) -> bool:

def enable_aws_lambda_feature_server(repo_config: RepoConfig) -> bool:
return feature_flag_enabled(repo_config, flags.FLAG_AWS_LAMBDA_FEATURE_SERVER_NAME)


def enable_direct_ingestion_to_online_store(repo_config: RepoConfig) -> bool:
return feature_flag_enabled(repo_config, flags.FLAG_DIRECT_INGEST_TO_ONLINE_STORE)
16 changes: 16 additions & 0 deletions sdk/python/feast/infra/passthrough_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union

import pandas
import pyarrow as pa
from tqdm import tqdm

from feast.entity import Entity
Expand Down Expand Up @@ -82,6 +83,21 @@ def online_read(

return result

def ingest_df(
self, feature_view: FeatureView, entities: List[Entity], df: pandas.DataFrame,
):
table = pa.Table.from_pandas(df)

if feature_view.batch_source.field_mapping is not None:
table = _run_field_mapping(table, feature_view.batch_source.field_mapping)

join_keys = [entity.join_key for entity in entities]
rows_to_write = _convert_arrow_to_proto(table, feature_view, join_keys)

self.online_write_batch(
self.repo_config, feature_view, rows_to_write, progress=None
)

def materialize_single_feature_view(
self,
config: RepoConfig,
Expand Down
8 changes: 8 additions & 0 deletions sdk/python/feast/infra/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,14 @@ def online_write_batch(
"""
...

def ingest_df(
self, feature_view: FeatureView, entities: List[Entity], df: pandas.DataFrame,
):
"""
Ingests a DataFrame directly into the online store
"""
pass

@abc.abstractmethod
def materialize_single_feature_view(
self,
Expand Down
44 changes: 44 additions & 0 deletions sdk/python/tests/integration/online_store/test_universal_online.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import datetime
import itertools
import unittest
from datetime import timedelta
Expand All @@ -19,6 +20,49 @@
driver,
location,
)
from tests.integration.feature_repos.universal.feature_views import (
create_driver_hourly_stats_feature_view,
)


@pytest.mark.integration
def test_write_to_online_store(environment, universal_data_sources):
fs = environment.feature_store
entities, datasets, data_sources = universal_data_sources
driver_hourly_stats = create_driver_hourly_stats_feature_view(
data_sources["driver"]
)
driver_entity = driver()

# Register Feature View and Entity
fs.apply([driver_hourly_stats, driver_entity])

# fake data to ingest into Online Store
data = {
"driver_id": [123],
"conv_rate": [0.85],
"acc_rate": [0.91],
"avg_daily_trips": [14],
"event_timestamp": [pd.Timestamp(datetime.datetime.utcnow()).round("ms")],
"created": [pd.Timestamp(datetime.datetime.utcnow()).round("ms")],
}
df_data = pd.DataFrame(data)

# directly ingest data into the Online Store
fs.write_to_online_store("driver_stats", df_data)

# assert the right data is in the Online Store
df = fs.get_online_features(
features=[
"driver_stats:avg_daily_trips",
"driver_stats:acc_rate",
"driver_stats:conv_rate",
],
entity_rows=[{"driver": 123}],
).to_df()
assert df["avg_daily_trips"].iloc[0] == 14
assert df["acc_rate"].iloc[0] == 0.91
assert df["conv_rate"].iloc[0] == 0.85


@pytest.mark.integration
Expand Down

0 comments on commit 6728f2a

Please sign in to comment.