Skip to content

Commit

Permalink
feat: Event timestamps response (feast-dev#2355)
Browse files Browse the repository at this point in the history
* ability to get event timestamps from online response

Signed-off-by: Vitaly Sergeyev <vsergeyev@better.com>

* fix event timestamp bugs

Signed-off-by: Vitaly Sergeyev <vsergeyev@better.com>

* python formatting

Signed-off-by: Vitaly Sergeyev <vsergeyev@better.com>

* optional param to retrieve event_timestamp in online_reponse

Signed-off-by: Vitaly Sergeyev <vsergeyev@better.com>

* formatting

Signed-off-by: Vitaly Sergeyev <vsergeyev@better.com>

* renaming param

Signed-off-by: Vitaly Sergeyev <vsergeyev@better.com>
Signed-off-by: Achal Shah <achals@gmail.com>
  • Loading branch information
Vitaly Sergeyev authored and achals committed Mar 8, 2022
1 parent f5597cf commit f6fc65a
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 6 deletions.
2 changes: 1 addition & 1 deletion sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -1531,10 +1531,10 @@ def _read_from_online_store(

# Each row is a set of features for a given entity key. We only need to convert
# the data to Protobuf once.
row_ts_proto = Timestamp()
null_value = Value()
read_row_protos = []
for read_row in read_rows:
row_ts_proto = Timestamp()
row_ts, feature_data = read_row
# TODO (Ly): reuse whatever timestamp if row_ts is None?
if row_ts is not None:
Expand Down
3 changes: 2 additions & 1 deletion sdk/python/feast/infra/online_stores/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
Union,
)

import pytz
from google.protobuf.timestamp_pb2 import Timestamp
from pydantic import StrictStr
from pydantic.typing import Literal
Expand Down Expand Up @@ -302,5 +303,5 @@ def _get_features_for_entity(
if not res:
return None, None
else:
timestamp = datetime.fromtimestamp(res_ts.seconds)
timestamp = datetime.fromtimestamp(res_ts.seconds, tz=pytz.utc)
return timestamp, res
24 changes: 20 additions & 4 deletions sdk/python/feast/online_response.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@
from feast.protos.feast.serving.ServingService_pb2 import GetOnlineFeaturesResponse
from feast.type_map import feast_value_type_to_python_type

TIMESTAMP_POSTFIX: str = "__ts"


class OnlineResponse:
"""
Defines a online response in feast.
Defines an online response in feast.
"""

def __init__(self, online_response_proto: GetOnlineFeaturesResponse):
Expand All @@ -44,9 +46,12 @@ def __init__(self, online_response_proto: GetOnlineFeaturesResponse):
del result.event_timestamps[idx]
break

def to_dict(self) -> Dict[str, Any]:
def to_dict(self, include_event_timestamps: bool = False) -> Dict[str, Any]:
"""
Converts GetOnlineFeaturesResponse features into a dictionary form.
Args:
is_with_event_timestamps: bool Optionally include feature timestamps in the dictionary
"""
response: Dict[str, List[Any]] = {}

Expand All @@ -58,11 +63,22 @@ def to_dict(self) -> Dict[str, Any]:
else:
response[feature_ref].append(native_type_value)

if include_event_timestamps:
event_ts = result.event_timestamps[idx].seconds
timestamp_ref = feature_ref + TIMESTAMP_POSTFIX
if timestamp_ref not in response:
response[timestamp_ref] = [event_ts]
else:
response[timestamp_ref].append(event_ts)

return response

def to_df(self) -> pd.DataFrame:
def to_df(self, include_event_timestamps: bool = False) -> pd.DataFrame:
"""
Converts GetOnlineFeaturesResponse features into Panda dataframe form.
Args:
is_with_event_timestamps: bool Optionally include feature timestamps in the dataframe
"""

return pd.DataFrame(self.to_dict())
return pd.DataFrame(self.to_dict(include_event_timestamps))
65 changes: 65 additions & 0 deletions sdk/python/tests/integration/online_store/test_universal_online.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
FeatureNameCollisionError,
RequestDataNotFoundInEntityRowsException,
)
from feast.online_response import TIMESTAMP_POSTFIX
from feast.wait import wait_retry_backoff
from tests.integration.feature_repos.repo_configuration import (
Environment,
Expand Down Expand Up @@ -322,6 +323,70 @@ def get_online_features_dict(
return dict1


@pytest.mark.integration
@pytest.mark.universal
@pytest.mark.parametrize("full_feature_names", [True, False], ids=lambda v: str(v))
def test_online_retrieval_with_event_timestamps(
environment, universal_data_sources, full_feature_names
):
fs = environment.feature_store
entities, datasets, data_sources = universal_data_sources
feature_views = construct_universal_feature_views(data_sources)

fs.apply([driver(), feature_views.driver, feature_views.global_fv])

# fake data to ingest into Online Store
data = {
"driver_id": [1, 2],
"conv_rate": [0.5, 0.3],
"acc_rate": [0.6, 0.4],
"avg_daily_trips": [4, 5],
"event_timestamp": [
pd.to_datetime(1646263500, utc=True, unit="s"),
pd.to_datetime(1646263600, utc=True, unit="s"),
],
"created": [
pd.to_datetime(1646263500, unit="s"),
pd.to_datetime(1646263600, unit="s"),
],
}
df_ingest = pd.DataFrame(data)

# directly ingest data into the Online Store
fs.write_to_online_store("driver_stats", df_ingest)

response = fs.get_online_features(
features=[
"driver_stats:avg_daily_trips",
"driver_stats:acc_rate",
"driver_stats:conv_rate",
],
entity_rows=[{"driver": 1}, {"driver": 2}],
)
df = response.to_df(True)
assertpy.assert_that(len(df)).is_equal_to(2)
assertpy.assert_that(df["driver_id"].iloc[0]).is_equal_to(1)
assertpy.assert_that(df["driver_id"].iloc[1]).is_equal_to(2)
assertpy.assert_that(df["avg_daily_trips" + TIMESTAMP_POSTFIX].iloc[0]).is_equal_to(
1646263500
)
assertpy.assert_that(df["avg_daily_trips" + TIMESTAMP_POSTFIX].iloc[1]).is_equal_to(
1646263600
)
assertpy.assert_that(df["acc_rate" + TIMESTAMP_POSTFIX].iloc[0]).is_equal_to(
1646263500
)
assertpy.assert_that(df["acc_rate" + TIMESTAMP_POSTFIX].iloc[1]).is_equal_to(
1646263600
)
assertpy.assert_that(df["conv_rate" + TIMESTAMP_POSTFIX].iloc[0]).is_equal_to(
1646263500
)
assertpy.assert_that(df["conv_rate" + TIMESTAMP_POSTFIX].iloc[1]).is_equal_to(
1646263600
)


@pytest.mark.integration
@pytest.mark.universal
@pytest.mark.parametrize("full_feature_names", [True, False], ids=lambda v: str(v))
Expand Down

0 comments on commit f6fc65a

Please sign in to comment.