Skip to content

Commit

Permalink
Clean up uploaded entities in Redshift offline store (#1730)
Browse files Browse the repository at this point in the history
Signed-off-by: Tsotne Tabidze <tsotne@tecton.ai>
  • Loading branch information
Tsotne Tabidze authored Jul 28, 2021
1 parent 8099ea7 commit 6713384
Show file tree
Hide file tree
Showing 3 changed files with 128 additions and 101 deletions.
177 changes: 102 additions & 75 deletions sdk/python/feast/infra/offline_stores/redshift.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import contextlib
import uuid
from datetime import datetime
from typing import Dict, List, Optional, Union
from typing import Callable, ContextManager, Dict, Iterator, List, Optional, Union

import numpy as np
import pandas as pd
Expand Down Expand Up @@ -113,40 +114,53 @@ def get_historical_features(
)
s3_resource = aws_utils.get_s3_resource(config.offline_store.region)

table_name = offline_utils.get_temp_entity_table_name()
@contextlib.contextmanager
def query_generator() -> Iterator[str]:
table_name = offline_utils.get_temp_entity_table_name()

entity_schema = _upload_entity_df_and_get_entity_schema(
entity_df, redshift_client, config, s3_resource, table_name
)
entity_schema = _upload_entity_df_and_get_entity_schema(
entity_df, redshift_client, config, s3_resource, table_name
)

entity_df_event_timestamp_col = offline_utils.infer_event_timestamp_from_entity_df(
entity_schema
)
entity_df_event_timestamp_col = offline_utils.infer_event_timestamp_from_entity_df(
entity_schema
)

expected_join_keys = offline_utils.get_expected_join_keys(
project, feature_views, registry
)
expected_join_keys = offline_utils.get_expected_join_keys(
project, feature_views, registry
)

offline_utils.assert_expected_columns_in_entity_df(
entity_schema, expected_join_keys, entity_df_event_timestamp_col
)
offline_utils.assert_expected_columns_in_entity_df(
entity_schema, expected_join_keys, entity_df_event_timestamp_col
)

# Build a query context containing all information required to template the Redshift SQL query
query_context = offline_utils.get_feature_view_query_context(
feature_refs, feature_views, registry, project,
)
# Build a query context containing all information required to template the Redshift SQL query
query_context = offline_utils.get_feature_view_query_context(
feature_refs, feature_views, registry, project,
)

# Generate the Redshift SQL query from the query context
query = offline_utils.build_point_in_time_query(
query_context,
left_table_query_string=table_name,
entity_df_event_timestamp_col=entity_df_event_timestamp_col,
query_template=MULTIPLE_FEATURE_VIEW_POINT_IN_TIME_JOIN,
full_feature_names=full_feature_names,
)
# Generate the Redshift SQL query from the query context
query = offline_utils.build_point_in_time_query(
query_context,
left_table_query_string=table_name,
entity_df_event_timestamp_col=entity_df_event_timestamp_col,
query_template=MULTIPLE_FEATURE_VIEW_POINT_IN_TIME_JOIN,
full_feature_names=full_feature_names,
)

yield query

# Clean up the uploaded Redshift table
aws_utils.execute_redshift_statement(
redshift_client,
config.offline_store.cluster_id,
config.offline_store.database,
config.offline_store.user,
f"DROP TABLE {table_name}",
)

return RedshiftRetrievalJob(
query=query,
query=query_generator,
redshift_client=redshift_client,
s3_resource=s3_resource,
config=config,
Expand All @@ -161,7 +175,7 @@ def get_historical_features(
class RedshiftRetrievalJob(RetrievalJob):
def __init__(
self,
query: str,
query: Union[str, Callable[[], ContextManager[str]]],
redshift_client,
s3_resource,
config: RepoConfig,
Expand All @@ -170,14 +184,23 @@ def __init__(
"""Initialize RedshiftRetrievalJob object.
Args:
query: Redshift SQL query to execute.
query: Redshift SQL query to execute. Either a string, or a generator function that handles the artifact cleanup.
redshift_client: boto3 redshift-data client
s3_resource: boto3 s3 resource object
config: Feast repo config
drop_columns: Optionally a list of columns to drop before unloading to S3.
This is a convenient field, since "SELECT ... EXCEPT col" isn't supported in Redshift.
"""
self.query = query
if not isinstance(query, str):
self._query_generator = query
else:

@contextlib.contextmanager
def query_generator() -> Iterator[str]:
assert isinstance(query, str)
yield query

self._query_generator = query_generator
self._redshift_client = redshift_client
self._s3_resource = s3_resource
self._config = config
Expand All @@ -189,59 +212,63 @@ def __init__(
self._drop_columns = drop_columns

def to_df(self) -> pd.DataFrame:
return aws_utils.unload_redshift_query_to_df(
self._redshift_client,
self._config.offline_store.cluster_id,
self._config.offline_store.database,
self._config.offline_store.user,
self._s3_resource,
self._s3_path,
self._config.offline_store.iam_role,
self.query,
self._drop_columns,
)
with self._query_generator() as query:
return aws_utils.unload_redshift_query_to_df(
self._redshift_client,
self._config.offline_store.cluster_id,
self._config.offline_store.database,
self._config.offline_store.user,
self._s3_resource,
self._s3_path,
self._config.offline_store.iam_role,
query,
self._drop_columns,
)

def to_arrow(self) -> pa.Table:
return aws_utils.unload_redshift_query_to_pa(
self._redshift_client,
self._config.offline_store.cluster_id,
self._config.offline_store.database,
self._config.offline_store.user,
self._s3_resource,
self._s3_path,
self._config.offline_store.iam_role,
self.query,
self._drop_columns,
)
with self._query_generator() as query:
return aws_utils.unload_redshift_query_to_pa(
self._redshift_client,
self._config.offline_store.cluster_id,
self._config.offline_store.database,
self._config.offline_store.user,
self._s3_resource,
self._s3_path,
self._config.offline_store.iam_role,
query,
self._drop_columns,
)

def to_s3(self) -> str:
""" Export dataset to S3 in Parquet format and return path """
aws_utils.execute_redshift_query_and_unload_to_s3(
self._redshift_client,
self._config.offline_store.cluster_id,
self._config.offline_store.database,
self._config.offline_store.user,
self._s3_path,
self._config.offline_store.iam_role,
self.query,
self._drop_columns,
)
return self._s3_path
with self._query_generator() as query:
aws_utils.execute_redshift_query_and_unload_to_s3(
self._redshift_client,
self._config.offline_store.cluster_id,
self._config.offline_store.database,
self._config.offline_store.user,
self._s3_path,
self._config.offline_store.iam_role,
query,
self._drop_columns,
)
return self._s3_path

def to_redshift(self, table_name: str) -> None:
""" Save dataset as a new Redshift table """
query = f'CREATE TABLE "{table_name}" AS ({self.query});\n'
if self._drop_columns is not None:
for column in self._drop_columns:
query += f"ALTER TABLE {table_name} DROP COLUMN {column};\n"

aws_utils.execute_redshift_statement(
self._redshift_client,
self._config.offline_store.cluster_id,
self._config.offline_store.database,
self._config.offline_store.user,
query,
)
with self._query_generator() as query:
query = f'CREATE TABLE "{table_name}" AS ({query});\n'
if self._drop_columns is not None:
for column in self._drop_columns:
query += f"ALTER TABLE {table_name} DROP COLUMN {column};\n"

aws_utils.execute_redshift_statement(
self._redshift_client,
self._config.offline_store.cluster_id,
self._config.offline_store.database,
self._config.offline_store.user,
query,
)


def _upload_entity_df_and_get_entity_schema(
Expand Down
4 changes: 2 additions & 2 deletions sdk/python/feast/infra/utils/aws_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import os
import tempfile
import uuid
from typing import Generator, List, Optional, Tuple
from typing import Iterator, List, Optional, Tuple

import pandas as pd
import pyarrow as pa
Expand Down Expand Up @@ -222,7 +222,7 @@ def temporarily_upload_df_to_redshift(
iam_role: str,
table_name: str,
df: pd.DataFrame,
) -> Generator[None, None, None]:
) -> Iterator[None]:
"""Uploads a Pandas DataFrame to Redshift as a new table with cleanup logic.
This is essentially the same as upload_df_to_redshift (check out its docstring for full details),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -864,18 +864,18 @@ def test_historical_features_from_redshift_sources(
f"order_is_success, {timestamp_column} FROM {table_name}"
)
# Rename the join key; this should now raise an error.
assertpy.assert_that(store.get_historical_features).raises(
errors.FeastEntityDFMissingColumnsError
).when_called_with(
entity_df=entity_df_query_with_invalid_join_key,
features=[
"driver_stats:conv_rate",
"driver_stats:avg_daily_trips",
"customer_profile:current_balance",
"customer_profile:avg_passenger_count",
"customer_profile:lifetime_trip_count",
],
)
assertpy.assert_that(
store.get_historical_features(
entity_df=entity_df_query_with_invalid_join_key,
features=[
"driver_stats:conv_rate",
"driver_stats:avg_daily_trips",
"customer_profile:current_balance",
"customer_profile:avg_passenger_count",
"customer_profile:lifetime_trip_count",
],
).to_df
).raises(errors.FeastEntityDFMissingColumnsError).when_called_with()

job_from_df = store.get_historical_features(
entity_df=orders_df,
Expand All @@ -893,18 +893,18 @@ def test_historical_features_from_redshift_sources(
orders_df_with_invalid_join_key = orders_df.rename(
{"customer_id": "customer"}, axis="columns"
)
assertpy.assert_that(store.get_historical_features).raises(
errors.FeastEntityDFMissingColumnsError
).when_called_with(
entity_df=orders_df_with_invalid_join_key,
features=[
"driver_stats:conv_rate",
"driver_stats:avg_daily_trips",
"customer_profile:current_balance",
"customer_profile:avg_passenger_count",
"customer_profile:lifetime_trip_count",
],
)
assertpy.assert_that(
store.get_historical_features(
entity_df=orders_df_with_invalid_join_key,
features=[
"driver_stats:conv_rate",
"driver_stats:avg_daily_trips",
"customer_profile:current_balance",
"customer_profile:avg_passenger_count",
"customer_profile:lifetime_trip_count",
],
).to_df
).raises(errors.FeastEntityDFMissingColumnsError).when_called_with()

start_time = datetime.utcnow()
actual_df_from_df_entities = job_from_df.to_df()
Expand Down

0 comments on commit 6713384

Please sign in to comment.