Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement materialization for RedshiftOfflineStore & RedshiftRetrievalJob #1680

Merged
merged 5 commits into from
Jul 15, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file added dump.rdb
Binary file not shown.
74 changes: 25 additions & 49 deletions sdk/python/feast/data_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,10 @@
from typing import Callable, Dict, Iterable, Optional, Tuple

from pyarrow.parquet import ParquetFile
from tenacity import retry, retry_unless_exception_type, wait_exponential

from feast import type_map
from feast.data_format import FileFormat, StreamFormat
from feast.errors import (
DataSourceNotFoundException,
RedshiftCredentialsError,
RedshiftQueryError,
)
from feast.errors import DataSourceNotFoundException, RedshiftCredentialsError
from feast.protos.feast.core.DataSource_pb2 import DataSource as DataSourceProto
from feast.repo_config import RepoConfig
from feast.value_type import ValueType
Expand Down Expand Up @@ -1062,7 +1057,7 @@ def validate(self, config: RepoConfig):
def get_table_query_string(self) -> str:
"""Returns a string that can directly be used to reference this table in SQL"""
if self.table:
return f"`{self.table}`"
return f'"{self.table}"'
else:
return f"({self.query})"

Expand All @@ -1073,62 +1068,43 @@ def source_datatype_to_feast_value_type() -> Callable[[str], ValueType]:
def get_table_column_names_and_types(
self, config: RepoConfig
) -> Iterable[Tuple[str, str]]:
import boto3
from botocore.config import Config
from botocore.exceptions import ClientError

from feast.infra.offline_stores.redshift import RedshiftOfflineStoreConfig
from feast.infra.utils import aws_utils

assert isinstance(config.offline_store, RedshiftOfflineStoreConfig)

client = boto3.client(
"redshift-data", config=Config(region_name=config.offline_store.region)
)
client = aws_utils.get_redshift_data_client(config.offline_store.region)

try:
if self.table is not None:
if self.table is not None:
try:
table = client.describe_table(
ClusterIdentifier=config.offline_store.cluster_id,
Database=config.offline_store.database,
DbUser=config.offline_store.user,
Table=self.table,
)
# The API returns valid JSON with empty column list when the table doesn't exist
if len(table["ColumnList"]) == 0:
raise DataSourceNotFoundException(self.table)
except ClientError as e:
if e.response["Error"]["Code"] == "ValidationException":
raise RedshiftCredentialsError() from e
raise

columns = table["ColumnList"]
else:
statement = client.execute_statement(
ClusterIdentifier=config.offline_store.cluster_id,
Database=config.offline_store.database,
DbUser=config.offline_store.user,
Sql=f"SELECT * FROM ({self.query}) LIMIT 1",
)
# The API returns valid JSON with empty column list when the table doesn't exist
if len(table["ColumnList"]) == 0:
raise DataSourceNotFoundException(self.table)

# Need to retry client.describe_statement(...) until the task is finished. We don't want to bombard
# Redshift with queries, and neither do we want to wait for a long time on the initial call.
# The solution is exponential backoff. The backoff starts with 0.1 seconds and doubles exponentially
# until reaching 30 seconds, at which point the backoff is fixed.
@retry(
wait=wait_exponential(multiplier=0.1, max=30),
retry=retry_unless_exception_type(RedshiftQueryError),
)
def wait_for_statement():
desc = client.describe_statement(Id=statement["Id"])
if desc["Status"] in ("SUBMITTED", "STARTED", "PICKED"):
raise Exception # Retry
if desc["Status"] != "FINISHED":
raise RedshiftQueryError(desc) # Don't retry. Raise exception.

wait_for_statement()

result = client.get_statement_result(Id=statement["Id"])

columns = result["ColumnMetadata"]
except ClientError as e:
if e.response["Error"]["Code"] == "ValidationException":
raise RedshiftCredentialsError() from e
raise
columns = table["ColumnList"]
else:
statement_id = aws_utils.execute_redshift_statement(
client,
config.offline_store.cluster_id,
config.offline_store.database,
config.offline_store.user,
f"SELECT * FROM ({self.query}) LIMIT 1",
)
columns = aws_utils.get_redshift_statement_result(client, statement_id)[
"ColumnMetadata"
]

return [(column["name"], column["typeName"].upper()) for column in columns]
118 changes: 115 additions & 3 deletions sdk/python/feast/infra/offline_stores/redshift.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
import uuid
from datetime import datetime
from typing import List, Optional, Union

import pandas as pd
import pyarrow as pa
from pydantic import StrictStr
from pydantic.typing import Literal

from feast.data_source import DataSource
from feast.data_source import DataSource, RedshiftSource
from feast.feature_view import FeatureView
from feast.infra.offline_stores.offline_store import OfflineStore, RetrievalJob
from feast.infra.utils import aws_utils
from feast.registry import Registry
from feast.repo_config import FeastConfigBaseModel, RepoConfig

Expand All @@ -30,9 +33,12 @@ class RedshiftOfflineStoreConfig(FeastConfigBaseModel):
database: StrictStr
""" Redshift database name """

s3_path: StrictStr
s3_staging_location: StrictStr
""" S3 path for importing & exporting data to Redshift """

iam_role: StrictStr
""" IAM Role for Redshift, granting it access to S3 """


class RedshiftOfflineStore(OfflineStore):
@staticmethod
Expand All @@ -46,7 +52,45 @@ def pull_latest_from_table_or_query(
start_date: datetime,
end_date: datetime,
) -> RetrievalJob:
pass
assert isinstance(data_source, RedshiftSource)
assert isinstance(config.offline_store, RedshiftOfflineStoreConfig)

from_expression = data_source.get_table_query_string()

partition_by_join_key_string = ", ".join(join_key_columns)
if partition_by_join_key_string != "":
partition_by_join_key_string = (
"PARTITION BY " + partition_by_join_key_string
)
timestamp_columns = [event_timestamp_column]
if created_timestamp_column:
timestamp_columns.append(created_timestamp_column)
timestamp_desc_string = " DESC, ".join(timestamp_columns) + " DESC"
field_string = ", ".join(
join_key_columns + feature_name_columns + timestamp_columns
)

redshift_client = aws_utils.get_redshift_data_client(
config.offline_store.region
)
s3_resource = aws_utils.get_s3_resource(config.offline_store.region)

query = f"""
SELECT {field_string}
FROM (
SELECT {field_string},
ROW_NUMBER() OVER({partition_by_join_key_string} ORDER BY {timestamp_desc_string}) AS _feast_row
FROM {from_expression}
WHERE {event_timestamp_column} BETWEEN TIMESTAMP '{start_date}' AND TIMESTAMP '{end_date}'
)
WHERE _feast_row = 1
"""
return RedshiftRetrievalJob(
query=query,
redshift_client=redshift_client,
s3_resource=s3_resource,
config=config,
)

@staticmethod
def get_historical_features(
Expand All @@ -59,3 +103,71 @@ def get_historical_features(
full_feature_names: bool = False,
) -> RetrievalJob:
pass


class RedshiftRetrievalJob(RetrievalJob):
def __init__(self, query: str, redshift_client, s3_resource, config: RepoConfig):
woop marked this conversation as resolved.
Show resolved Hide resolved
"""Initialize RedshiftRetrievalJob object.

Args:
query: Redshift SQL query to execute.
redshift_client: boto3 redshift-data client
s3_resource: boto3 s3 resource object
config: Feast repo config
"""
self.query = query
self._redshift_client = redshift_client
self._s3_resource = s3_resource
self._config = config
self._s3_path = (
self._config.offline_store.s3_staging_location
+ "/unload/"
+ str(uuid.uuid4())
)

def to_df(self) -> pd.DataFrame:
woop marked this conversation as resolved.
Show resolved Hide resolved
return aws_utils.unload_redshift_query_to_df(
self._redshift_client,
self._config.offline_store.cluster_id,
self._config.offline_store.database,
self._config.offline_store.user,
self._s3_resource,
self._s3_path,
self._config.offline_store.iam_role,
self.query,
)

def to_arrow(self) -> pa.Table:
return aws_utils.unload_redshift_query_to_pa(
self._redshift_client,
self._config.offline_store.cluster_id,
self._config.offline_store.database,
self._config.offline_store.user,
self._s3_resource,
self._s3_path,
self._config.offline_store.iam_role,
self.query,
)

def to_s3(self) -> str:
""" Export dataset to S3 in Parquet format and return path """
aws_utils.execute_redshift_query_and_unload_to_s3(
self._redshift_client,
self._config.offline_store.cluster_id,
self._config.offline_store.database,
self._config.offline_store.user,
self._s3_path,
self._config.offline_store.iam_role,
self.query,
)
return self._s3_path

def to_redshift(self, table_name: str) -> None:
""" Save dataset as a new Redshift table """
aws_utils.execute_redshift_statement(
self._redshift_client,
self._config.offline_store.cluster_id,
self._config.offline_store.database,
self._config.offline_store.user,
f'CREATE TABLE "{table_name}" AS ({self.query})',
)
Empty file.
Loading