diff --git a/sdk/python/feast/pyspark/launchers/k8s/k8s.py b/sdk/python/feast/pyspark/launchers/k8s/k8s.py index b9ac6ab809..fc53ce5769 100644 --- a/sdk/python/feast/pyspark/launchers/k8s/k8s.py +++ b/sdk/python/feast/pyspark/launchers/k8s/k8s.py @@ -9,7 +9,6 @@ import yaml from kubernetes.client.api import CustomObjectsApi -from feast.constants import ConfigOptions as opt from feast.pyspark.abc import ( BQ_SPARK_PACKAGE, BatchIngestionJob, @@ -196,7 +195,7 @@ def _get_azure_credentials(self): account_key = self._azure_account_key if account_name is None or account_key is None: raise Exception( - f"Using Azure blob storage requires Azure blob account name and access key to be set in config" + "Using Azure blob storage requires Azure blob account name and access key to be set in config" ) return { f"spark.hadoop.fs.azure.account.key.{account_name}.blob.core.windows.net": f"{account_key}" diff --git a/sdk/python/feast/staging/storage_client.py b/sdk/python/feast/staging/storage_client.py index 07027447d6..63d574460f 100644 --- a/sdk/python/feast/staging/storage_client.py +++ b/sdk/python/feast/staging/storage_client.py @@ -397,7 +397,7 @@ def upload_fileobj( ) bucket, key = self._uri_to_bucket_key(remote_uri) container_client = self.blob_service_client.get_container_client(bucket) - container_client.upload_blob(name=key, data=fileobj) + container_client.upload_blob(name=key, data=fileobj, overwrite=True) return remote_uri diff --git a/sdk/python/requirements-ci.txt b/sdk/python/requirements-ci.txt index 1f6810a804..8897021099 100644 --- a/sdk/python/requirements-ci.txt +++ b/sdk/python/requirements-ci.txt @@ -21,4 +21,5 @@ pytest-timeout==1.4.2 pytest-ordering==0.6.* pytest-mock==1.10.4 PyYAML==5.3.1 -great-expectations==0.13.2 \ No newline at end of file +great-expectations==0.13.2 +adlfs==0.5.9 diff --git a/tests/e2e/test_historical_features.py b/tests/e2e/test_historical_features.py index fb6b969972..b2aa2b6819 100644 --- a/tests/e2e/test_historical_features.py +++ b/tests/e2e/test_historical_features.py @@ -11,13 +11,14 @@ from pyarrow import parquet from feast import Client, Entity, Feature, FeatureTable, ValueType +from feast.constants import ConfigOptions as opt from feast.data_source import BigQuerySource, FileSource from feast.pyspark.abc import SparkJobStatus np.random.seed(0) -def read_parquet(uri): +def read_parquet(uri, azure_account_name=None, azure_account_key=None): parsed_uri = urlparse(uri) if parsed_uri.scheme == "file": return pd.read_parquet(parsed_uri.path) @@ -42,6 +43,16 @@ def read_parquet(uri): files = ["s3://" + path for path in fs.glob(s3uri + "/part-*")] ds = parquet.ParquetDataset(files, filesystem=fs) return ds.read().to_pandas() + elif parsed_uri.scheme == "wasbs": + import adlfs + + fs = adlfs.AzureBlobFileSystem( + account_name=azure_account_name, account_key=azure_account_key + ) + uripath = parsed_uri.username + parsed_uri.path + files = fs.glob(uripath + "/part-*") + ds = parquet.ParquetDataset(files, filesystem=fs) + return ds.read().to_pandas() else: raise ValueError(f"Unsupported URL scheme {uri}") @@ -75,6 +86,13 @@ def generate_data(): return transactions_df, customer_df +def _get_azure_creds(feast_client: Client): + return ( + feast_client._config.get(opt.AZURE_BLOB_ACCOUNT_NAME, None), + feast_client._config.get(opt.AZURE_BLOB_ACCOUNT_ACCESS_KEY, None), + ) + + def test_historical_features( feast_client: Client, tfrecord_feast_client: Client, @@ -108,7 +126,13 @@ def test_historical_features( job = feast_client.get_historical_features(feature_refs, customers_df) output_dir = job.get_output_file_uri() - joined_df = read_parquet(output_dir) + + # will both be None if not using Azure blob storage + account_name, account_key = _get_azure_creds(feast_client) + + joined_df = read_parquet( + output_dir, azure_account_name=account_name, azure_account_key=account_key + ) expected_joined_df = pd.DataFrame( {