Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Implement to_remote_storage for supported offline stores #2918

Merged
merged 14 commits into from
Jul 7, 2022
49 changes: 49 additions & 0 deletions sdk/python/feast/infra/offline_stores/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
from google.cloud import bigquery
from google.cloud.bigquery import Client, SchemaField, Table
from google.cloud.bigquery._pandas_helpers import ARROW_SCALAR_IDS_TO_BQ
from google.cloud.storage import Client as StorageClient

except ImportError as e:
from feast.errors import FeastExtrasDependencyImportError
Expand All @@ -83,6 +84,9 @@ class BigQueryOfflineStoreConfig(FeastConfigBaseModel):
For more information on BigQuery data locations see: https://cloud.google.com/bigquery/docs/locations
"""

gcs_staging_location: Optional[str] = None
""" (optional) GCS location used for offloading BigQuery results as parquet files."""


class BigQueryOfflineStore(OfflineStore):
@staticmethod
Expand Down Expand Up @@ -386,6 +390,14 @@ def query_generator() -> Iterator[str]:
on_demand_feature_views if on_demand_feature_views else []
)
self._metadata = metadata
if self.config.offline_store.gcs_staging_location:
self._gcs_path = (
self.config.offline_store.gcs_staging_location
+ f"/{self.config.project}/export/"
+ str(uuid.uuid4())
)
else:
self._gcs_path = None

@property
def full_feature_names(self) -> bool:
Expand Down Expand Up @@ -478,6 +490,43 @@ def persist(self, storage: SavedDatasetStorage):
def metadata(self) -> Optional[RetrievalMetadata]:
return self._metadata

def supports_remote_storage_export(self) -> bool:
return self._gcs_path
achals marked this conversation as resolved.
Show resolved Hide resolved

def to_remote_storage(self) -> List[str]:
if not self._gcs_path:
raise ValueError(
"gcs_staging_location needs to be specified for the big query "
"offline store when executing `to_remote_storage()`"
)

table = self.to_bigquery()

job_config = bigquery.job.ExtractJobConfig()
job_config.destination_format = "PARQUET"

extract_job = self.client.extract_table(
table,
destination_uris=[f"{self._gcs_path}/*.parquet"],
location=self.config.offline_store.location,
job_config=job_config,
)
extract_job.result()

bucket: str
prefix: str
storage_client = StorageClient(project=self.client.project)
bucket, prefix = self._gcs_path[len("gs://") :].split("/", 1)
prefix = prefix.rsplit("/", 1)[0]
if prefix.startswith("/"):
prefix = prefix[1:]

blobs = storage_client.list_blobs(bucket, prefix=prefix)
results = []
for b in blobs:
results.append(f"gs://{b.bucket.name}/{b.name}")
return results


def block_until_done(
client: Client,
Expand Down
3 changes: 3 additions & 0 deletions sdk/python/feast/infra/offline_stores/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,9 @@ def persist(self, storage: SavedDatasetStorage):
def metadata(self) -> Optional[RetrievalMetadata]:
return self._metadata

def supports_remote_storage_export(self) -> bool:
return False


class FileOfflineStore(OfflineStore):
@staticmethod
Expand Down
7 changes: 7 additions & 0 deletions sdk/python/feast/infra/offline_stores/redshift.py
Original file line number Diff line number Diff line change
Expand Up @@ -490,6 +490,13 @@ def persist(self, storage: SavedDatasetStorage):
def metadata(self) -> Optional[RetrievalMetadata]:
return self._metadata

def supports_remote_storage_export(self) -> bool:
return True

def to_remote_storage(self) -> List[str]:
path = self.to_s3()
return aws_utils.list_s3_files(self._config.offline_store.region, path)


def _upload_entity_df(
entity_df: Union[pd.DataFrame, str],
Expand Down
51 changes: 49 additions & 2 deletions sdk/python/feast/infra/offline_stores/snowflake.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import contextlib
import os
import uuid
from datetime import datetime
from pathlib import Path
from typing import (
Expand Down Expand Up @@ -90,6 +91,12 @@ class SnowflakeOfflineStoreConfig(FeastConfigBaseModel):
schema_: Optional[str] = Field(None, alias="schema")
""" Snowflake schema name """

storage_integration_name: Optional[str] = None
achals marked this conversation as resolved.
Show resolved Hide resolved
""" Storage integration name in snowflake """

blob_export_location: Optional[str] = None
""" Location (in S3, Google storage or Azure storage) where data is offloaded """

class Config:
allow_population_by_field_name = True

Expand Down Expand Up @@ -378,6 +385,11 @@ def query_generator() -> Iterator[str]:
on_demand_feature_views if on_demand_feature_views else []
)
self._metadata = metadata
self.export_path: Optional[str]
if self.config.offline_store.blob_export_location:
self.export_path = f"{self.config.offline_store.blob_export_location}/{self.config.project}/{uuid.uuid4()}"
else:
self.export_path = None

@property
def full_feature_names(self) -> bool:
Expand Down Expand Up @@ -413,7 +425,7 @@ def _to_arrow_internal(self) -> pa.Table:
pd.DataFrame(columns=[md.name for md in empty_result.description])
)

def to_snowflake(self, table_name: str) -> None:
def to_snowflake(self, table_name: str, temporary=False) -> None:
"""Save dataset as a new Snowflake table"""
if self.on_demand_feature_views is not None:
transformed_df = self.to_df()
Expand All @@ -425,7 +437,7 @@ def to_snowflake(self, table_name: str) -> None:
return None

with self._query_generator() as query:
query = f'CREATE TABLE IF NOT EXISTS "{table_name}" AS ({query});\n'
query = f'CREATE {"TEMPORARY" if temporary else ""} TABLE IF NOT EXISTS "{table_name}" AS ({query});\n'

execute_snowflake_statement(self.snowflake_conn, query)

Expand Down Expand Up @@ -453,6 +465,41 @@ def persist(self, storage: SavedDatasetStorage):
def metadata(self) -> Optional[RetrievalMetadata]:
return self._metadata

def supports_remote_storage_export(self) -> bool:
achals marked this conversation as resolved.
Show resolved Hide resolved
return (
self.config.offline_store.storage_integration_name
and self.config.offline_store.blob_export_location
)

def to_remote_storage(self) -> List[str]:
if not self.export_path:
raise ValueError(
"to_remote_storage() requires `blob_export_location` to be specified in config"
)
if not self.config.offline_store.storage_integration_name:
raise ValueError(
"to_remote_storage() requires `storage_integration_name` to be specified in config"
)

table = f"temporary_{uuid.uuid4().hex}"
self.to_snowflake(table)

copy_into_query = f"""copy into '{self.config.offline_store.blob_export_location}/{table}' from "{self.config.offline_store.database}"."{self.config.offline_store.schema_}"."{table}"\n
storage_integration = {self.config.offline_store.storage_integration_name}\n
file_format = (TYPE = PARQUET)\n
DETAILED_OUTPUT = TRUE\n
HEADER = TRUE;\n
"""

cursor = execute_snowflake_statement(self.snowflake_conn, copy_into_query)
all_rows = (
cursor.fetchall()
) # This may be need pagination at some point in the future.
file_name_column_index = [
idx for idx, rm in enumerate(cursor.description) if rm.name == "FILE_NAME"
][0]
return [f"{self.export_path}/{row[file_name_column_index]}" for row in all_rows]


def _get_entity_schema(
entity_df: Union[pd.DataFrame, str],
Expand Down
15 changes: 13 additions & 2 deletions sdk/python/feast/infra/utils/aws_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import tempfile
import uuid
from pathlib import Path
from typing import Any, Dict, Iterator, Optional, Tuple, Union
from typing import Any, Dict, Iterator, List, Optional, Tuple, Union

import pandas as pd
import pyarrow
Expand Down Expand Up @@ -473,7 +473,7 @@ def execute_redshift_query_and_unload_to_s3(
# Run the query, unload the results to S3
unique_table_name = "_" + str(uuid.uuid4()).replace("-", "")
query = f"CREATE TEMPORARY TABLE {unique_table_name} AS ({query});\n"
query += f"UNLOAD ('SELECT * FROM {unique_table_name}') TO '{s3_path}/' IAM_ROLE '{iam_role}' PARQUET"
query += f"UNLOAD ('SELECT * FROM {unique_table_name}') TO '{s3_path}/' IAM_ROLE '{iam_role}' FORMAT AS PARQUET"
execute_redshift_statement(redshift_data_client, cluster_id, database, user, query)


Expand Down Expand Up @@ -632,3 +632,14 @@ def delete_api_gateway(api_gateway_client, api_gateway_id: str) -> Dict:
def get_account_id() -> str:
"""Get AWS Account ID"""
return boto3.client("sts").get_caller_identity().get("Account")


def list_s3_files(aws_region: str, path: str) -> List[str]:
s3 = boto3.client("s3", config=Config(region_name=aws_region))
if path.startswith("s3://"):
path = path[len("s3://") :]
bucket, prefix = path.split("/", 1)
objects = s3.list_objects_v2(Bucket=bucket, Prefix=prefix)
contents = objects["Contents"]
files = [f"s3://{bucket}/{content['Key']}" for content in contents]
return files
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,9 @@ def teardown(self):
self.dataset = None

def create_offline_store_config(self):
return BigQueryOfflineStoreConfig()
return BigQueryOfflineStoreConfig(
location="US", gcs_staging_location="gs://feast-export/"
)

def create_data_source(
self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ def __init__(self, project_name: str, *args, **kwargs):
warehouse=os.environ["SNOWFLAKE_CI_WAREHOUSE"],
database="FEAST",
schema="OFFLINE",
storage_integration_name="FEAST_S3",
blob_export_location="s3://feast-snowflake-offload/export",
)

def create_data_source(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,11 @@ def test_historical_features(environment, universal_data_sources, full_feature_n
full_feature_names=full_feature_names,
)

if job_from_df.supports_remote_storage_export():
files = job_from_df.to_remote_storage()
print(files)
assert len(files) > 0 # This test should be way more detailed

start_time = datetime.utcnow()
actual_df_from_df_entities = job_from_df.to_df()

Expand Down