Skip to content

Commit

Permalink
feat: Key ttl setting for redis online store (feast-dev#2341)
Browse files Browse the repository at this point in the history
* key ttl setting for redis online store

Signed-off-by: Vitaly Sergeyev <vsergeyev@better.com>

* Optional typing

Signed-off-by: Vitaly Sergeyev <vsergeyev@better.com>
  • Loading branch information
Vitaly Sergeyev authored Feb 28, 2022
1 parent 4e47686 commit 236a108
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 3 deletions.
11 changes: 8 additions & 3 deletions sdk/python/feast/infra/online_stores/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ class RedisOnlineStoreConfig(FeastConfigBaseModel):
"""Connection string containing the host, port, and configuration parameters for Redis
format: host:port,parameter1,parameter2 eg. redis:6379,db=0 """

key_ttl_seconds: Optional[int] = None
"""(Optional) redis key bin ttl (in seconds) for expiring entities"""


class RedisOnlineStore(OnlineStore):
_client: Optional[Union[Redis, RedisCluster]] = None
Expand Down Expand Up @@ -227,9 +230,11 @@ def online_write_batch(
entity_hset[f_key] = val.SerializeToString()

pipe.hset(redis_key_bin, mapping=entity_hset)
# TODO: support expiring the entity / features in Redis
# otherwise entity features remain in redis until cleaned up in separate process
# client.expire redis_key_bin based a ttl setting

if online_store_config.key_ttl_seconds:
pipe.expire(
name=redis_key_bin, time=online_store_config.key_ttl_seconds
)
results = pipe.execute()
if progress:
progress(len(results))
Expand Down
61 changes: 61 additions & 0 deletions sdk/python/tests/integration/online_store/test_universal_online.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,67 @@
from tests.utils.data_source_utils import prep_file_source


@pytest.mark.integration
def test_entity_ttl_online_store(local_redis_environment, universal_data_sources):
if os.getenv("FEAST_IS_LOCAL_TEST", "False") == "True":
return
fs = local_redis_environment.feature_store
# setting ttl setting in online store to 1 second
fs.config.online_store.key_ttl_seconds = 1
entities, datasets, data_sources = universal_data_sources
driver_hourly_stats = create_driver_hourly_stats_feature_view(
data_sources["driver"]
)
driver_entity = driver()

# Register Feature View and Entity
fs.apply([driver_hourly_stats, driver_entity])

# fake data to ingest into Online Store
data = {
"driver_id": [1],
"conv_rate": [0.5],
"acc_rate": [0.6],
"avg_daily_trips": [4],
"event_timestamp": [pd.Timestamp(datetime.datetime.utcnow()).round("ms")],
"created": [pd.Timestamp(datetime.datetime.utcnow()).round("ms")],
}
df_ingest = pd.DataFrame(data)

# directly ingest data into the Online Store
fs.write_to_online_store("driver_stats", df_ingest)

# assert the right data is in the Online Store
df = fs.get_online_features(
features=[
"driver_stats:avg_daily_trips",
"driver_stats:acc_rate",
"driver_stats:conv_rate",
],
entity_rows=[{"driver": 1}],
).to_df()
assertpy.assert_that(df["avg_daily_trips"].iloc[0]).is_equal_to(4)
assertpy.assert_that(df["acc_rate"].iloc[0]).is_close_to(0.6, 1e-6)
assertpy.assert_that(df["conv_rate"].iloc[0]).is_close_to(0.5, 1e-6)

# simulate time passing for testing ttl
time.sleep(1)

# retrieve the same entity again
df = fs.get_online_features(
features=[
"driver_stats:avg_daily_trips",
"driver_stats:acc_rate",
"driver_stats:conv_rate",
],
entity_rows=[{"driver": 1}],
).to_df()
# assert that the entity features expired in the online store
assertpy.assert_that(df["avg_daily_trips"].iloc[0]).is_none()
assertpy.assert_that(df["acc_rate"].iloc[0]).is_none()
assertpy.assert_that(df["conv_rate"].iloc[0]).is_none()


# TODO: make this work with all universal (all online store types)
@pytest.mark.integration
def test_write_to_online_store_event_check(local_redis_environment):
Expand Down

0 comments on commit 236a108

Please sign in to comment.