diff --git a/sdk/python/feast/infra/offline_stores/bigquery.py b/sdk/python/feast/infra/offline_stores/bigquery.py index cb5b3a045a..3bf340acf9 100644 --- a/sdk/python/feast/infra/offline_stores/bigquery.py +++ b/sdk/python/feast/infra/offline_stores/bigquery.py @@ -57,6 +57,7 @@ from google.cloud import bigquery from google.cloud.bigquery import Client, SchemaField, Table from google.cloud.bigquery._pandas_helpers import ARROW_SCALAR_IDS_TO_BQ + from google.cloud.storage import Client as StorageClient except ImportError as e: from feast.errors import FeastExtrasDependencyImportError @@ -83,6 +84,9 @@ class BigQueryOfflineStoreConfig(FeastConfigBaseModel): For more information on BigQuery data locations see: https://cloud.google.com/bigquery/docs/locations """ + gcs_staging_location: Optional[str] = None + """ (optional) GCS location used for offloading BigQuery results as parquet files.""" + class BigQueryOfflineStore(OfflineStore): @staticmethod @@ -386,6 +390,14 @@ def query_generator() -> Iterator[str]: on_demand_feature_views if on_demand_feature_views else [] ) self._metadata = metadata + if self.config.offline_store.gcs_staging_location: + self._gcs_path = ( + self.config.offline_store.gcs_staging_location + + f"/{self.config.project}/export/" + + str(uuid.uuid4()) + ) + else: + self._gcs_path = None @property def full_feature_names(self) -> bool: @@ -478,6 +490,43 @@ def persist(self, storage: SavedDatasetStorage): def metadata(self) -> Optional[RetrievalMetadata]: return self._metadata + def supports_remote_storage_export(self) -> bool: + return self._gcs_path is not None + + def to_remote_storage(self) -> List[str]: + if not self._gcs_path: + raise ValueError( + "gcs_staging_location needs to be specified for the big query " + "offline store when executing `to_remote_storage()`" + ) + + table = self.to_bigquery() + + job_config = bigquery.job.ExtractJobConfig() + job_config.destination_format = "PARQUET" + + extract_job = self.client.extract_table( + table, + destination_uris=[f"{self._gcs_path}/*.parquet"], + location=self.config.offline_store.location, + job_config=job_config, + ) + extract_job.result() + + bucket: str + prefix: str + storage_client = StorageClient(project=self.client.project) + bucket, prefix = self._gcs_path[len("gs://") :].split("/", 1) + prefix = prefix.rsplit("/", 1)[0] + if prefix.startswith("/"): + prefix = prefix[1:] + + blobs = storage_client.list_blobs(bucket, prefix=prefix) + results = [] + for b in blobs: + results.append(f"gs://{b.bucket.name}/{b.name}") + return results + def block_until_done( client: Client, diff --git a/sdk/python/feast/infra/offline_stores/file.py b/sdk/python/feast/infra/offline_stores/file.py index d60d468174..ae98f8d0c2 100644 --- a/sdk/python/feast/infra/offline_stores/file.py +++ b/sdk/python/feast/infra/offline_stores/file.py @@ -105,6 +105,9 @@ def persist(self, storage: SavedDatasetStorage): def metadata(self) -> Optional[RetrievalMetadata]: return self._metadata + def supports_remote_storage_export(self) -> bool: + return False + class FileOfflineStore(OfflineStore): @staticmethod diff --git a/sdk/python/feast/infra/offline_stores/redshift.py b/sdk/python/feast/infra/offline_stores/redshift.py index 5f071a814f..1d7b79727e 100644 --- a/sdk/python/feast/infra/offline_stores/redshift.py +++ b/sdk/python/feast/infra/offline_stores/redshift.py @@ -490,6 +490,13 @@ def persist(self, storage: SavedDatasetStorage): def metadata(self) -> Optional[RetrievalMetadata]: return self._metadata + def supports_remote_storage_export(self) -> bool: + return True + + def to_remote_storage(self) -> List[str]: + path = self.to_s3() + return aws_utils.list_s3_files(self._config.offline_store.region, path) + def _upload_entity_df( entity_df: Union[pd.DataFrame, str], diff --git a/sdk/python/feast/infra/offline_stores/snowflake.py b/sdk/python/feast/infra/offline_stores/snowflake.py index a5befc33e2..71394c4403 100644 --- a/sdk/python/feast/infra/offline_stores/snowflake.py +++ b/sdk/python/feast/infra/offline_stores/snowflake.py @@ -1,5 +1,6 @@ import contextlib import os +import uuid from datetime import datetime from pathlib import Path from typing import ( @@ -90,6 +91,12 @@ class SnowflakeOfflineStoreConfig(FeastConfigBaseModel): schema_: Optional[str] = Field(None, alias="schema") """ Snowflake schema name """ + storage_integration_name: Optional[str] = None + """ Storage integration name in snowflake """ + + blob_export_location: Optional[str] = None + """ Location (in S3, Google storage or Azure storage) where data is offloaded """ + class Config: allow_population_by_field_name = True @@ -378,6 +385,11 @@ def query_generator() -> Iterator[str]: on_demand_feature_views if on_demand_feature_views else [] ) self._metadata = metadata + self.export_path: Optional[str] + if self.config.offline_store.blob_export_location: + self.export_path = f"{self.config.offline_store.blob_export_location}/{self.config.project}/{uuid.uuid4()}" + else: + self.export_path = None @property def full_feature_names(self) -> bool: @@ -413,7 +425,7 @@ def _to_arrow_internal(self) -> pa.Table: pd.DataFrame(columns=[md.name for md in empty_result.description]) ) - def to_snowflake(self, table_name: str) -> None: + def to_snowflake(self, table_name: str, temporary=False) -> None: """Save dataset as a new Snowflake table""" if self.on_demand_feature_views is not None: transformed_df = self.to_df() @@ -425,7 +437,7 @@ def to_snowflake(self, table_name: str) -> None: return None with self._query_generator() as query: - query = f'CREATE TABLE IF NOT EXISTS "{table_name}" AS ({query});\n' + query = f'CREATE {"TEMPORARY" if temporary else ""} TABLE IF NOT EXISTS "{table_name}" AS ({query});\n' execute_snowflake_statement(self.snowflake_conn, query) @@ -453,6 +465,41 @@ def persist(self, storage: SavedDatasetStorage): def metadata(self) -> Optional[RetrievalMetadata]: return self._metadata + def supports_remote_storage_export(self) -> bool: + return ( + self.config.offline_store.storage_integration_name + and self.config.offline_store.blob_export_location + ) + + def to_remote_storage(self) -> List[str]: + if not self.export_path: + raise ValueError( + "to_remote_storage() requires `blob_export_location` to be specified in config" + ) + if not self.config.offline_store.storage_integration_name: + raise ValueError( + "to_remote_storage() requires `storage_integration_name` to be specified in config" + ) + + table = f"temporary_{uuid.uuid4().hex}" + self.to_snowflake(table) + + copy_into_query = f"""copy into '{self.config.offline_store.blob_export_location}/{table}' from "{self.config.offline_store.database}"."{self.config.offline_store.schema_}"."{table}"\n + storage_integration = {self.config.offline_store.storage_integration_name}\n + file_format = (TYPE = PARQUET)\n + DETAILED_OUTPUT = TRUE\n + HEADER = TRUE;\n + """ + + cursor = execute_snowflake_statement(self.snowflake_conn, copy_into_query) + all_rows = ( + cursor.fetchall() + ) # This may be need pagination at some point in the future. + file_name_column_index = [ + idx for idx, rm in enumerate(cursor.description) if rm.name == "FILE_NAME" + ][0] + return [f"{self.export_path}/{row[file_name_column_index]}" for row in all_rows] + def _get_entity_schema( entity_df: Union[pd.DataFrame, str], diff --git a/sdk/python/feast/infra/utils/aws_utils.py b/sdk/python/feast/infra/utils/aws_utils.py index 7badda9846..51aecbf8a7 100644 --- a/sdk/python/feast/infra/utils/aws_utils.py +++ b/sdk/python/feast/infra/utils/aws_utils.py @@ -3,7 +3,7 @@ import tempfile import uuid from pathlib import Path -from typing import Any, Dict, Iterator, Optional, Tuple, Union +from typing import Any, Dict, Iterator, List, Optional, Tuple, Union import pandas as pd import pyarrow @@ -473,7 +473,7 @@ def execute_redshift_query_and_unload_to_s3( # Run the query, unload the results to S3 unique_table_name = "_" + str(uuid.uuid4()).replace("-", "") query = f"CREATE TEMPORARY TABLE {unique_table_name} AS ({query});\n" - query += f"UNLOAD ('SELECT * FROM {unique_table_name}') TO '{s3_path}/' IAM_ROLE '{iam_role}' PARQUET" + query += f"UNLOAD ('SELECT * FROM {unique_table_name}') TO '{s3_path}/' IAM_ROLE '{iam_role}' FORMAT AS PARQUET" execute_redshift_statement(redshift_data_client, cluster_id, database, user, query) @@ -632,3 +632,14 @@ def delete_api_gateway(api_gateway_client, api_gateway_id: str) -> Dict: def get_account_id() -> str: """Get AWS Account ID""" return boto3.client("sts").get_caller_identity().get("Account") + + +def list_s3_files(aws_region: str, path: str) -> List[str]: + s3 = boto3.client("s3", config=Config(region_name=aws_region)) + if path.startswith("s3://"): + path = path[len("s3://") :] + bucket, prefix = path.split("/", 1) + objects = s3.list_objects_v2(Bucket=bucket, Prefix=prefix) + contents = objects["Contents"] + files = [f"s3://{bucket}/{content['Key']}" for content in contents] + return files diff --git a/sdk/python/tests/integration/feature_repos/universal/data_sources/bigquery.py b/sdk/python/tests/integration/feature_repos/universal/data_sources/bigquery.py index 620f444159..83bc1ef308 100644 --- a/sdk/python/tests/integration/feature_repos/universal/data_sources/bigquery.py +++ b/sdk/python/tests/integration/feature_repos/universal/data_sources/bigquery.py @@ -51,7 +51,9 @@ def teardown(self): self.dataset = None def create_offline_store_config(self): - return BigQueryOfflineStoreConfig() + return BigQueryOfflineStoreConfig( + location="US", gcs_staging_location="gs://feast-export/" + ) def create_data_source( self, diff --git a/sdk/python/tests/integration/feature_repos/universal/data_sources/snowflake.py b/sdk/python/tests/integration/feature_repos/universal/data_sources/snowflake.py index 23466bc00c..ae83ea8eb0 100644 --- a/sdk/python/tests/integration/feature_repos/universal/data_sources/snowflake.py +++ b/sdk/python/tests/integration/feature_repos/universal/data_sources/snowflake.py @@ -34,6 +34,8 @@ def __init__(self, project_name: str, *args, **kwargs): warehouse=os.environ["SNOWFLAKE_CI_WAREHOUSE"], database="FEAST", schema="OFFLINE", + storage_integration_name="FEAST_S3", + blob_export_location="s3://feast-snowflake-offload/export", ) def create_data_source( diff --git a/sdk/python/tests/integration/offline_store/test_universal_historical_retrieval.py b/sdk/python/tests/integration/offline_store/test_universal_historical_retrieval.py index 0b2965084d..abaf1622c0 100644 --- a/sdk/python/tests/integration/offline_store/test_universal_historical_retrieval.py +++ b/sdk/python/tests/integration/offline_store/test_universal_historical_retrieval.py @@ -370,6 +370,11 @@ def test_historical_features(environment, universal_data_sources, full_feature_n full_feature_names=full_feature_names, ) + if job_from_df.supports_remote_storage_export(): + files = job_from_df.to_remote_storage() + print(files) + assert len(files) > 0 # This test should be way more detailed + start_time = datetime.utcnow() actual_df_from_df_entities = job_from_df.to_df()