Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix historical test for azure #1262

Merged
merged 4 commits into from
Jan 8, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions sdk/python/feast/pyspark/launchers/k8s/k8s.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import yaml
from kubernetes.client.api import CustomObjectsApi

from feast.constants import ConfigOptions as opt
from feast.pyspark.abc import (
BQ_SPARK_PACKAGE,
BatchIngestionJob,
Expand Down Expand Up @@ -196,7 +195,7 @@ def _get_azure_credentials(self):
account_key = self._azure_account_key
if account_name is None or account_key is None:
raise Exception(
f"Using Azure blob storage requires Azure blob account name and access key to be set in config"
"Using Azure blob storage requires Azure blob account name and access key to be set in config"
)
return {
f"spark.hadoop.fs.azure.account.key.{account_name}.blob.core.windows.net": f"{account_key}"
Expand Down
2 changes: 1 addition & 1 deletion sdk/python/feast/staging/storage_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,7 @@ def upload_fileobj(
)
bucket, key = self._uri_to_bucket_key(remote_uri)
container_client = self.blob_service_client.get_container_client(bucket)
container_client.upload_blob(name=key, data=fileobj)
container_client.upload_blob(name=key, data=fileobj, overwrite=True)
return remote_uri


Expand Down
3 changes: 2 additions & 1 deletion sdk/python/requirements-ci.txt
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,5 @@ pytest-timeout==1.4.2
pytest-ordering==0.6.*
pytest-mock==1.10.4
PyYAML==5.3.1
great-expectations==0.13.2
great-expectations==0.13.2
adlfs==0.5.9
28 changes: 26 additions & 2 deletions tests/e2e/test_historical_features.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,14 @@
from pyarrow import parquet

from feast import Client, Entity, Feature, FeatureTable, ValueType
from feast.constants import ConfigOptions as opt
from feast.data_source import BigQuerySource, FileSource
from feast.pyspark.abc import SparkJobStatus

np.random.seed(0)


def read_parquet(uri):
def read_parquet(uri, azure_account_name=None, azure_account_key=None):
parsed_uri = urlparse(uri)
if parsed_uri.scheme == "file":
return pd.read_parquet(parsed_uri.path)
Expand All @@ -42,6 +43,16 @@ def read_parquet(uri):
files = ["s3://" + path for path in fs.glob(s3uri + "/part-*")]
ds = parquet.ParquetDataset(files, filesystem=fs)
return ds.read().to_pandas()
elif parsed_uri.scheme == "wasbs":
import adlfs

fs = adlfs.AzureBlobFileSystem(
account_name=azure_account_name, account_key=azure_account_key
)
uripath = parsed_uri.username + parsed_uri.path
files = fs.glob(uripath + "/part-*")
ds = parquet.ParquetDataset(files, filesystem=fs)
return ds.read().to_pandas()
else:
raise ValueError(f"Unsupported URL scheme {uri}")

Expand Down Expand Up @@ -75,6 +86,13 @@ def generate_data():
return transactions_df, customer_df


def _get_azure_creds(feast_client: Client):
return (
feast_client._config.get(opt.AZURE_BLOB_ACCOUNT_NAME, None),
feast_client._config.get(opt.AZURE_BLOB_ACCOUNT_ACCESS_KEY, None),
)


def test_historical_features(
feast_client: Client,
tfrecord_feast_client: Client,
Expand Down Expand Up @@ -108,7 +126,13 @@ def test_historical_features(

job = feast_client.get_historical_features(feature_refs, customers_df)
output_dir = job.get_output_file_uri()
joined_df = read_parquet(output_dir)

# will both be None if not using Azure blob storage
account_name, account_key = _get_azure_creds(feast_client)

joined_df = read_parquet(
output_dir, azure_account_name=account_name, azure_account_key=account_key
)

expected_joined_df = pd.DataFrame(
{
Expand Down