Skip to content

Commit

Permalink
fix: Updating the batch field so that you can query create and event …
Browse files Browse the repository at this point in the history
…date. (#3411)

* fix: Assertion condition when value is 0 (#3401)

* fix: Add assertion condition when value is 0

Signed-off-by: zlatan.el <zlatan.el@kakaomobility.com>

* chore: Add comment about zero value validation

Signed-off-by: zlatan.el <zlatan.el@kakaomobility.com>

* chore: Modifiy the comment

Signed-off-by: zlatan.el <zlatan.el@kakaomobility.com>

* chore: Add the comment

Signed-off-by: zlatan.el <zlatan.el@kakaomobility.com>

Signed-off-by: zlatan.el <zlatan.el@kakaomobility.com>
Co-authored-by: zlatan.el <zlatan.el@kakaomobility.com>
Signed-off-by: franciscojavierarceo <francisco.arceo@affirm.com>

* updating the batch field so that if you want return the created date of a model you can just add it in the get_online_features feature argument

Signed-off-by: franciscojavierarceo <francisco.arceo@affirm.com>

* linted

Signed-off-by: franciscojavierarceo <francisco.arceo@affirm.com>

* adding change to also support querying the event_timestamp

Signed-off-by: franciscojavierarceo <francisco.arceo@affirm.com>

Signed-off-by: zlatan.el <zlatan.el@kakaomobility.com>
Signed-off-by: franciscojavierarceo <francisco.arceo@affirm.com>
Co-authored-by: kysersozelee <kysersoze.lee@gmail.com>
Co-authored-by: zlatan.el <zlatan.el@kakaomobility.com>
  • Loading branch information
3 people authored Jan 5, 2023
1 parent 81c3483 commit 01ab462
Showing 1 changed file with 39 additions and 7 deletions.
46 changes: 39 additions & 7 deletions sdk/python/feast/infra/offline_stores/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ def evaluate_historical_retrieval():
)

entity_df_with_features = _drop_columns(
df_to_join, timestamp_field, created_timestamp_column
df_to_join, features, timestamp_field, created_timestamp_column
)

# Ensure that we delete dataframes to free up memory
Expand Down Expand Up @@ -599,6 +599,11 @@ def _normalize_timestamp(
created_timestamp_column_type = df_to_join_types[created_timestamp_column]

if not hasattr(timestamp_field_type, "tz") or timestamp_field_type.tz != pytz.UTC:
# if you are querying for the event timestamp field, we have to deduplicate
if len(df_to_join[timestamp_field].shape) > 1:
df_to_join, dups = _df_column_uniquify(df_to_join)
df_to_join = df_to_join.drop(columns=dups)

# Make sure all timestamp fields are tz-aware. We default tz-naive fields to UTC
df_to_join[timestamp_field] = df_to_join[timestamp_field].apply(
lambda x: x if x.tzinfo is not None else x.replace(tzinfo=pytz.utc),
Expand All @@ -609,6 +614,11 @@ def _normalize_timestamp(
not hasattr(created_timestamp_column_type, "tz")
or created_timestamp_column_type.tz != pytz.UTC
):
if len(df_to_join[created_timestamp_column].shape) > 1:
# if you are querying for the created timestamp field, we have to deduplicate
df_to_join, dups = _df_column_uniquify(df_to_join)
df_to_join = df_to_join.drop(columns=dups)

df_to_join[created_timestamp_column] = df_to_join[
created_timestamp_column
].apply(
Expand Down Expand Up @@ -701,14 +711,36 @@ def _drop_duplicates(

def _drop_columns(
df_to_join: dd.DataFrame,
features: List[str],
timestamp_field: str,
created_timestamp_column: str,
) -> dd.DataFrame:
entity_df_with_features = df_to_join.drop([timestamp_field], axis=1).persist()

if created_timestamp_column:
entity_df_with_features = entity_df_with_features.drop(
[created_timestamp_column], axis=1
).persist()
entity_df_with_features = df_to_join
timestamp_columns = [
timestamp_field,
created_timestamp_column,
]
for column in timestamp_columns:
if column and column not in features:
entity_df_with_features = entity_df_with_features.drop(
[column], axis=1
).persist()

return entity_df_with_features


def _df_column_uniquify(df: dd.DataFrame) -> Tuple[dd.DataFrame, List[str]]:
df_columns = df.columns
new_columns = []
duplicate_cols = []
for item in df_columns:
counter = 0
newitem = item
while newitem in new_columns:
counter += 1
newitem = "{}_{}".format(item, counter)
if counter > 0:
duplicate_cols.append(newitem)
new_columns.append(newitem)
df.columns = new_columns
return df, duplicate_cols

0 comments on commit 01ab462

Please sign in to comment.