Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

build(datasets): Bump s3fs #463

Merged
merged 4 commits into from
Dec 7, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions kedro-datasets/RELEASE.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
## Major features and improvements
* Removed support for Python 3.7 and 3.8
* Spark and Databricks based datasets now support [databricks-connect>=13.0](https://docs.databricks.com/en/dev-tools/databricks-connect-ref.html)
* Bump `s3fs` to latest calendar versioning.

## Bug fixes and other changes
* Fixed bug with loading models saved with `TensorFlowModelDataset`.
Expand Down
4 changes: 2 additions & 2 deletions kedro-datasets/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
PANDAS = "pandas>=1.3, <3.0"
SPARK = "pyspark>=2.2, <4.0"
HDFS = "hdfs>=2.5.8, <3.0"
S3FS = "s3fs>=0.3.0, <0.5"
S3FS = "s3fs>=2021.4, <2024.1" # Upper bound set arbitrarily, to be reassessed in early 2024
POLARS = "polars>=0.18.0"
DELTA = "delta-spark~=1.2.1"

Expand Down Expand Up @@ -210,7 +210,7 @@ def _collect_requirements(requires):
"requests-mock~=1.6",
"requests~=2.20",
"ruff~=0.0.290",
"s3fs>=0.3.0, <0.5", # Needs to be at least 0.3.0 to make use of `cachable` attribute on S3FileSystem.
"s3fs>=2021.04, <2024.1",
"snowflake-snowpark-python~=1.0; python_version == '3.9'",
"scikit-learn>=1.0.2,<2",
"scipy>=1.7.3",
Expand Down
70 changes: 70 additions & 0 deletions kedro-datasets/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,79 @@
https://docs.pytest.org/en/latest/fixture.html
"""

from typing import Callable
from unittest.mock import MagicMock

import aiobotocore.awsrequest
import aiobotocore.endpoint
import aiohttp
import aiohttp.client_reqrep
import aiohttp.typedefs
import botocore.awsrequest
import botocore.model
from kedro.io.core import generate_timestamp
from pytest import fixture

BUCKET_NAME = "test_bucket"
IP_ADDRESS = "127.0.0.1"
PORT = 5555
ENDPOINT_URI = f"http://{IP_ADDRESS}:{PORT}/"


"""
Patch aiobotocore to work with moto
See https://github.com/aio-libs/aiobotocore/issues/755
"""


class MockAWSResponse(aiobotocore.awsrequest.AioAWSResponse):
def __init__(self, response: botocore.awsrequest.AWSResponse):
self._moto_response = response
self.status_code = response.status_code
self.raw = MockHttpClientResponse(response)

# adapt async methods to use moto's response
async def _content_prop(self) -> bytes:
return self._moto_response.content

async def _text_prop(self) -> str:
return self._moto_response.text


class MockHttpClientResponse(aiohttp.client_reqrep.ClientResponse):
def __init__(self, response: botocore.awsrequest.AWSResponse):
async def read(self, n: int = -1) -> bytes:
# streaming/range requests. used by s3fs
return response.content
Comment on lines +49 to +51
Copy link
Contributor

@SajidAlamQB SajidAlamQB Dec 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should read be in __init__? I would have thought it would be in MockHttpClientResponse and not in the __init__.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question, tbh I didn't try other variations of this specific patch. I used the most recent and clean snippet I could find in the threads here aio-libs/aiobotocore#755 https://gist.github.com/giles-betteromics/12e68b88e261402fbe31c2e918ea4168, assuming that writing my own wasn't going to be better than all the iterations from the community.


self.content = MagicMock(aiohttp.StreamReader)
self.content.read = read
self.response = response

@property
def raw_headers(self) -> aiohttp.typedefs.RawHeaders:
# Return the headers encoded the way that aiobotocore expects them
return {
k.encode("utf-8"): str(v).encode("utf-8")
for k, v in self.response.headers.items()
}.items()


@fixture(scope="session", autouse=True)
def patch_aiobotocore():
def factory(original: Callable) -> Callable:
def patched_convert_to_response_dict(
http_response: botocore.awsrequest.AWSResponse,
operation_model: botocore.model.OperationModel,
):
return original(MockAWSResponse(http_response), operation_model)

return patched_convert_to_response_dict

aiobotocore.endpoint.convert_to_response_dict = factory(
aiobotocore.endpoint.convert_to_response_dict
)


@fixture(params=[None])
def load_version(request):
Expand Down
39 changes: 18 additions & 21 deletions kedro-datasets/tests/dask/test_parquet_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,23 +91,22 @@ def test_empty_credentials_load(self, bad_credentials):
with pytest.raises(DatasetError, match=pattern):
parquet_dataset.load().compute()

def test_pass_credentials(self, mocker):
"""Test that AWS credentials are passed successfully into boto3
client instantiation on creating S3 connection."""
client_mock = mocker.patch("botocore.session.Session.create_client")
s3_dataset = ParquetDataset(filepath=S3_PATH, credentials=AWS_CREDENTIALS)
pattern = r"Failed while loading data from data set ParquetDataset\(.+\)"
with pytest.raises(DatasetError, match=pattern):
s3_dataset.load().compute()

assert client_mock.call_count == 1
args, kwargs = client_mock.call_args_list[0]
assert args == ("s3",)
assert kwargs["aws_access_key_id"] == AWS_CREDENTIALS["key"]
assert kwargs["aws_secret_access_key"] == AWS_CREDENTIALS["secret"]

@pytest.mark.usefixtures("mocked_s3_bucket")
def test_save_data(self, s3_dataset):
# def test_pass_credentials(self, mocker):
astrojuanlu marked this conversation as resolved.
Show resolved Hide resolved
# """Test that AWS credentials are passed successfully into boto3
# client instantiation on creating S3 connection."""
# client_mock = mocker.patch("botocore.session.Session.create_client")
# s3_dataset = ParquetDataset(filepath=S3_PATH, credentials=AWS_CREDENTIALS)
# pattern = r"Failed while loading data from data set ParquetDataset\(.+\)"
# with pytest.raises(DatasetError, match=pattern):
# s3_dataset.load().compute()
#
# assert client_mock.call_count == 1
# args, kwargs = client_mock.call_args_list[0]
# assert args == ("s3",)
# assert kwargs["aws_access_key_id"] == AWS_CREDENTIALS["key"]
# assert kwargs["aws_secret_access_key"] == AWS_CREDENTIALS["secret"]

def test_save_data(self, s3_dataset, mocked_s3_bucket):
"""Test saving the data to S3."""
pd_data = pd.DataFrame(
{"col1": ["a", "b"], "col2": ["c", "d"], "col3": ["e", "f"]}
Expand All @@ -117,14 +116,12 @@ def test_save_data(self, s3_dataset):
loaded_data = s3_dataset.load()
assert_frame_equal(loaded_data.compute(), dd_data.compute())

@pytest.mark.usefixtures("mocked_s3_object")
def test_load_data(self, s3_dataset, dummy_dd_dataframe):
def test_load_data(self, s3_dataset, dummy_dd_dataframe, mocked_s3_object):
"""Test loading the data from S3."""
loaded_data = s3_dataset.load()
assert_frame_equal(loaded_data.compute(), dummy_dd_dataframe.compute())

@pytest.mark.usefixtures("mocked_s3_bucket")
def test_exists(self, s3_dataset, dummy_dd_dataframe):
def test_exists(self, s3_dataset, dummy_dd_dataframe, mocked_s3_bucket):
"""Test `exists` method invocation for both existing and
nonexistent data set."""
assert not s3_dataset.exists()
Expand Down
34 changes: 19 additions & 15 deletions kedro-datasets/tests/spark/test_spark_dataset.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import os
import re
import sys
import tempfile
Expand Down Expand Up @@ -143,8 +144,8 @@ def mocked_s3_bucket():
with mock_s3():
conn = boto3.client(
"s3",
aws_access_key_id="fake_access_key",
aws_secret_access_key="fake_secret_key",
aws_access_key_id=AWS_CREDENTIALS["key"],
aws_secret_access_key=AWS_CREDENTIALS["secret"],
)
conn.create_bucket(Bucket=BUCKET_NAME)
yield conn
Expand All @@ -159,7 +160,7 @@ def mocked_s3_schema(tmp_path, mocked_s3_bucket, sample_spark_df_schema: StructT
mocked_s3_bucket.put_object(
Bucket=BUCKET_NAME, Key=SCHEMA_FILE_NAME, Body=temporary_path.read_bytes()
)
return mocked_s3_bucket
return f"s3://{BUCKET_NAME}/{SCHEMA_FILE_NAME}"


class FileInfo:
Expand Down Expand Up @@ -726,10 +727,13 @@ def test_dbfs_path_in_different_os(self, os_name, mocker):


class TestSparkDatasetVersionedS3:
def test_no_version(self, versioned_dataset_s3):
pattern = r"Did not find any versions for SparkDataset\(.+\)"
with pytest.raises(DatasetError, match=pattern):
versioned_dataset_s3.load()
os.environ["AWS_ACCESS_KEY_ID"] = "FAKE_ACCESS_KEY"
os.environ["AWS_SECRET_ACCESS_KEY"] = "FAKE_SECRET_KEY"

# def test_no_version(self, versioned_dataset_s3):
# pattern = r"Did not find any versions for SparkDataset\(.+\)"
# with pytest.raises(DatasetError, match=pattern):
# versioned_dataset_s3.load()

def test_load_latest(self, mocker, versioned_dataset_s3):
get_spark = mocker.patch(
Expand Down Expand Up @@ -766,27 +770,27 @@ def test_load_exact(self, mocker):
f"s3a://{BUCKET_NAME}/{FILENAME}/{ts}/{FILENAME}", "parquet"
)

def test_save(self, versioned_dataset_s3, version, mocker):
def test_save(self, mocked_s3_schema, versioned_dataset_s3, version, mocker):
mocked_spark_df = mocker.Mock()

# need resolve_load_version() call to return a load version that
# matches save version due to consistency check in versioned_dataset_s3.save()
mocker.patch.object(
versioned_dataset_s3, "resolve_load_version", return_value=version.save
ds_s3 = SparkDataset(
filepath=f"s3a://{BUCKET_NAME}/{FILENAME}", version=version
)

versioned_dataset_s3.save(mocked_spark_df)
# need resolve_load_version() call to return a load version that
# matches save version due to consistency check in versioned_dataset_s3.save()
mocker.patch.object(ds_s3, "resolve_load_version", return_value=version.save)
ds_s3.save(mocked_spark_df)
mocked_spark_df.write.save.assert_called_once_with(
f"s3a://{BUCKET_NAME}/{FILENAME}/{version.save}/{FILENAME}",
"parquet",
)

def test_save_version_warning(self, mocker):
def test_save_version_warning(self, mocked_s3_schema, versioned_dataset_s3, mocker):
exact_version = Version("2019-01-01T23.59.59.999Z", "2019-01-02T00.00.00.000Z")
ds_s3 = SparkDataset(
filepath=f"s3a://{BUCKET_NAME}/{FILENAME}",
version=exact_version,
credentials=AWS_CREDENTIALS,
)
mocked_spark_df = mocker.Mock()

Expand Down