Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CHORE] Adding more test fixtures for different I/O sources #1083

Merged
merged 31 commits into from
Jun 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
0858293
[CHORE] Init with mock minio instance
Jun 22, 2023
7e5b135
Split tests into separate files for minio and http
Jun 22, 2023
c4d567a
Add local file tests
Jun 22, 2023
c57693d
Add corner-case testing
Jun 22, 2023
dbe49c0
Add native downloader http tests
Jun 22, 2023
afe113d
Code cleanup fixtures
Jun 22, 2023
4b01bfe
Disable all tests to see if CI is happy
Jun 23, 2023
ed19c71
Move tests into io integration tests folder and add a github workflow…
Jun 23, 2023
f18466f
Add unpinned boto3 to deps
Jun 23, 2023
4d29fbd
Add docker-compose setup to spin up local test environments
Jun 23, 2023
9955696
Clean fixtures to use docker-compose services instead of spinning up …
Jun 23, 2023
758a7fa
Use nginx for simulating http error codes as well
Jun 24, 2023
eeef554
Perform a docker-compose up before running integration suite
Jun 24, 2023
9694a2b
Install boto3 after s3fs
Jun 24, 2023
ebe91e5
Add test for minio and a custom s3 config
Jun 24, 2023
8605d27
Pin urllib3
Jun 24, 2023
96ef726
Use daft IOConfig instead of custom S3Config dataclass
Jun 24, 2023
60724b8
Add test hitting public s3 bucket
Jun 24, 2023
960ee47
Try to use a venv?
Jun 24, 2023
4c86df8
Upgrade pip
Jun 24, 2023
5b393b3
Follow other steps for setting up virtual env
Jun 24, 2023
a037537
Use s3fs instead of boto3
Jun 24, 2023
1642639
pin urllib3
Jun 24, 2023
13eaa3f
Install wheel together with requirements.txt
Jun 25, 2023
7d5bf90
Fix old s3fs kwargs
Jun 26, 2023
430f789
Add chmod to nginx folder
Jun 26, 2023
60e9a1e
Add region to config
Jun 27, 2023
f3d295d
Rollback region name
Jun 27, 2023
1370504
Skip http error tests if running on ray runner
Jun 28, 2023
1dae606
Properly skip test
Jun 28, 2023
6ab0d71
Mark tests as integration tests which skips locally
Jun 28, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 69 additions & 0 deletions .github/workflows/python-package.yml
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,75 @@ jobs:
SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK_URL }}
SLACK_WEBHOOK_TYPE: INCOMING_WEBHOOK

integration-test-io:
runs-on: ubuntu-latest
timeout-minutes: 30
needs:
- integration-test-build
env:
package-name: getdaft
strategy:
fail-fast: false
matrix:
python-version: ['3.7']
daft-runner: [py, ray]
steps:
- uses: actions/checkout@v3
with:
submodules: true
fetch-depth: 0
- uses: actions/setup-python@v4
with:
python-version: ${{ matrix.python-version }}
architecture: x64
- name: Download built wheels
uses: actions/download-artifact@v3
with:
name: wheels
path: dist
- name: Setup Virtual Env
run: |
python -m venv venv
echo "$GITHUB_WORKSPACE/venv/bin" >> $GITHUB_PATH
- name: Install Daft and dev dependencies
run: |
pip install --upgrade pip
pip install -r requirements-dev.txt dist/${{ env.package-name }}-*x86_64*.whl --force-reinstall
rm -rf daft
- name: Prepare tmpdirs for IO services
run: |
mkdir -p /tmp/daft-integration-testing/nginx
chmod +rw /tmp/daft-integration-testing/nginx
- name: Spin up IO services
uses: isbang/compose-action@v1.4.1
with:
compose-file: ./tests/integration/docker-compose/docker-compose.yml
down-flags: --volumes
- name: Run IO integration tests
run: |
pytest tests/integration/io -m 'integration' --durations=50
env:
DAFT_RUNNER: ${{ matrix.daft-runner }}
- name: Send Slack notification on failure
uses: slackapi/slack-github-action@v1.24.0
if: ${{ failure() && (github.ref == 'refs/heads/main') }}
with:
payload: |
{
"blocks": [
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": ":rotating_light: [CI] IO Integration Tests <${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}|workflow> *FAILED on main* :rotating_light:"
}
}
]
}
env:
SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK_URL }}
SLACK_WEBHOOK_TYPE: INCOMING_WEBHOOK

rust-tests:
runs-on: ubuntu-latest
timeout-minutes: 15
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ module = 'daft.*'
warn_return_any = false

[tool.pytest.ini_options]
addopts = "--benchmark-skip -m 'not hypothesis'"
addopts = "--benchmark-skip -m 'not hypothesis' -m 'not integration'"
minversion = "6.0"
testpaths = [
"tests"
Expand Down
4 changes: 4 additions & 0 deletions requirements-dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
ipdb
maturin
pre-commit
docker

# Tracing
orjson==3.9.1 # orjson recommended for viztracer
Expand Down Expand Up @@ -33,6 +34,9 @@ ray[data, default]==2.4.0
# AWS
s3fs==2023.1.0; python_version < '3.8'
s3fs==2023.6.0; python_version >= '3.8'
# on old versions of s3fs's pinned botocore, they neglected to pin urllib3<2 which leads to:
# "ImportError: cannot import name 'DEFAULT_CIPHERS' from 'urllib3.util.ssl_'"
urllib3<2; python_version < '3.8'

# Documentation
myst-nb>=0.16.0
Expand Down
28 changes: 28 additions & 0 deletions src/daft-io/src/python.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,17 @@ impl PyIOConfig {
},
}
}

pub fn __repr__(&self) -> PyResult<String> {
Ok(format!("{}", self.config))
}

#[getter]
pub fn s3(&self) -> PyResult<PyS3Config> {
Ok(PyS3Config {
config: self.config.s3.clone(),
})
}
}

#[pymethods]
Expand All @@ -50,6 +58,26 @@ impl PyS3Config {
pub fn __repr__(&self) -> PyResult<String> {
Ok(format!("{}", self.config))
}

#[getter]
pub fn region_name(&self) -> PyResult<Option<String>> {
Ok(self.config.region_name.clone())
}

#[getter]
pub fn endpoint_url(&self) -> PyResult<Option<String>> {
Ok(self.config.endpoint_url.clone())
}

#[getter]
pub fn key_id(&self) -> PyResult<Option<String>> {
Ok(self.config.key_id.clone())
}

#[getter]
pub fn access_key(&self) -> PyResult<Option<String>> {
Ok(self.config.access_key.clone())
}
}

pub fn register_modules(_py: Python, parent: &PyModule) -> PyResult<()> {
Expand Down
4 changes: 4 additions & 0 deletions tests/integration/docker-compose/Dockerfile.nginx
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
FROM nginx:alpine

WORKDIR /app
COPY ./nginx-serve-static-files.conf /etc/nginx/nginx.conf
28 changes: 28 additions & 0 deletions tests/integration/docker-compose/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
services:

# MinIO is an S3-compatible API object store
# Test fixtures should populate data in MinIO directly using S3 APIs such as boto3 or s3fs
minio:
image: quay.io/minio/minio
volumes:
- /data
ports:
- 9000:9000
environment:
MINIO_ROOT_USER: minioadmin
MINIO_ROOT_PASSWORD: minioadmin
command:
- server
- /data

# Use nginx to serve static files
# Test fixtures should dump data in the `/tmp/daft-integration-testing/nginx` folder
nginx:
image: nginx-serve-static-files
build:
context: .
dockerfile: Dockerfile.nginx
volumes:
- /tmp/daft-integration-testing/nginx:/app/static:rw
ports:
- 8080:8080
44 changes: 44 additions & 0 deletions tests/integration/docker-compose/nginx-serve-static-files.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
events {
worker_connections 1024;
}

http {
include mime.types;
sendfile on;

server {
listen 8080;
listen [::]:8080;

resolver 127.0.0.11;
autoindex off;

server_name _;
server_tokens off;

root /app/static;
gzip_static on;

location /400.html {
return 400 "<b>Sike it's a hardcoded 400!</b>";
}
location /401.html {
return 401 "<b>Sike it's a hardcoded 401!</b>";
}
location /403.html {
return 403 "<b>Sike it's a hardcoded 403!</b>";
}
location /404.html {
return 404 "<b>Sike it's a hardcoded 404!</b>";
}
location /429.html {
return 429 "<b>Sike it's a hardcoded 429!</b>";
}
location /500.html {
return 500 "<b>Sike it's a hardcoded 500!</b>";
}
location /503.html {
return 503 "<b>Sike it's a hardcoded 503!</b>";
}
}
}
Empty file.
115 changes: 115 additions & 0 deletions tests/integration/io/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
from __future__ import annotations

import io
import pathlib
from typing import Generator, TypeVar

import numpy as np
import pytest
import s3fs
from PIL import Image

import daft

T = TypeVar("T")

YieldFixture = Generator[T, None, None]


@pytest.fixture(scope="session")
def minio_io_config() -> daft.io.IOConfig:
return daft.io.IOConfig(
s3=daft.io.S3Config(
endpoint_url="http://127.0.0.1:9000",
key_id="minioadmin",
access_key="minioadmin",
)
)


@pytest.fixture(scope="session")
def aws_public_s3_config() -> daft.io.IOConfig:
return daft.io.IOConfig(
s3=daft.io.S3Config(
# NOTE: no keys or endpoints specified for an AWS public s3 bucket
region_name="us-west-2",
)
)


@pytest.fixture(scope="session")
def nginx_config() -> tuple[str, pathlib.Path]:
"""Returns the (nginx_server_url, static_files_tmpdir) as a tuple"""
return (
"http://127.0.0.1:8080",
pathlib.Path("/tmp/daft-integration-testing/nginx"),
)


@pytest.fixture(scope="session")
def image_data() -> YieldFixture[bytes]:
"""A small bit of fake image JPEG data"""
bio = io.BytesIO()
image = Image.fromarray(np.ones((3, 3)).astype(np.uint8))
image.save(bio, format="JPEG")
return bio.getvalue()


###
# NGINX-based fixtures
###


@pytest.fixture(scope="function")
def mock_http_image_urls(nginx_config, image_data) -> YieldFixture[str]:
"""Uses the docker-compose Nginx server to serve HTTP image URLs
This fixture yields:
list[str]: URLs of files available on the HTTP server
"""
server_url, static_assets_tmpdir = nginx_config

# Add image files to the tmpdir
urls = []
for i in range(10):
image_filepath = static_assets_tmpdir / f"{i}.jpeg"
image_filepath.write_bytes(image_data)
urls.append(f"{server_url}/{image_filepath.relative_to(static_assets_tmpdir)}")

try:
yield urls
# Remember to cleanup!
finally:
for child in static_assets_tmpdir.glob("*"):
child.unlink()


###
# S3-based fixtures
###


@pytest.fixture(scope="function")
def minio_image_data_fixture(minio_io_config, image_data) -> YieldFixture[list[str]]:
"""Populates the minio session with some fake data and yields (S3Config, paths)"""
fs = s3fs.S3FileSystem(
key=minio_io_config.s3.key_id,
password=minio_io_config.s3.access_key,
client_kwargs={"endpoint_url": minio_io_config.s3.endpoint_url},
)
bucket = "image-bucket"
fs.mkdir(bucket)

# Add some images into `s3://image-bucket`
urls = []
for i in range(10):
key = f"{i}.jpeg"
url = f"s3://{bucket}/{key}"
fs.write_bytes(url, image_data)
urls.append(url)

try:
yield urls
# Remember to cleanup!
finally:
fs.rm(bucket, recursive=True)
48 changes: 48 additions & 0 deletions tests/integration/io/test_url_download_http.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
from __future__ import annotations

import pytest
from aiohttp.client_exceptions import ClientResponseError

import daft


@pytest.mark.integration()
@pytest.mark.parametrize("use_native_downloader", [True, False])
def test_url_download_http(mock_http_image_urls, image_data, use_native_downloader):
data = {"urls": mock_http_image_urls}
df = daft.from_pydict(data)
df = df.with_column("data", df["urls"].url.download(use_native_downloader=use_native_downloader))
assert df.to_pydict() == {**data, "data": [image_data for _ in range(len(mock_http_image_urls))]}


@pytest.mark.integration()
@pytest.mark.parametrize("status_code", [400, 401, 403, 404, 429, 500, 503])
@pytest.mark.parametrize("use_native_downloader", [True, False])
def test_url_download_http_error_codes(nginx_config, use_native_downloader, status_code):
server_url, _ = nginx_config
data = {"urls": [f"{server_url}/{status_code}.html"]}
df = daft.from_pydict(data)
df = df.with_column("data", df["urls"].url.download(on_error="raise", use_native_downloader=use_native_downloader))

skip_fsspec_downloader = daft.context.get_context().runner_config.name == "ray"

# 404 should always be corner-cased to return FileNotFoundError regardless of I/O implementation
if status_code == 404:
with pytest.raises(FileNotFoundError):
df.collect()
# When using fsspec, other error codes are bubbled up to the user as aiohttp.client_exceptions.ClientResponseError
elif not use_native_downloader:
# Ray runner has a pretty catastrophic failure when raising non-pickleable exceptions (ClientResponseError is not pickleable)
# See Ray issue: https://github.com/ray-project/ray/issues/36893
if skip_fsspec_downloader:
pytest.skip()
with pytest.raises(ClientResponseError) as e:
df.collect()
assert e.value.code == status_code
# When using native downloader, we throw a ValueError
else:
with pytest.raises(ValueError) as e:
df.collect()
# NOTE: We may want to add better errors in the future to provide a better
# user-facing I/O error with the error code
assert f"Status({status_code})" in str(e.value)
Loading