Skip to content

Commit

Permalink
[CHORE] Adding more test fixtures for different I/O sources (#1083)
Browse files Browse the repository at this point in the history
Adds test fixtures for testing I/O from local/http/S3 (using MinIO)

---------

Co-authored-by: Jay Chia <jaychia94@gmail.com@users.noreply.github.com>
  • Loading branch information
jaychia and Jay Chia authored Jun 28, 2023
1 parent e067317 commit e883d0e
Show file tree
Hide file tree
Showing 13 changed files with 482 additions and 1 deletion.
69 changes: 69 additions & 0 deletions .github/workflows/python-package.yml
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,75 @@ jobs:
SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK_URL }}
SLACK_WEBHOOK_TYPE: INCOMING_WEBHOOK

integration-test-io:
runs-on: ubuntu-latest
timeout-minutes: 30
needs:
- integration-test-build
env:
package-name: getdaft
strategy:
fail-fast: false
matrix:
python-version: ['3.7']
daft-runner: [py, ray]
steps:
- uses: actions/checkout@v3
with:
submodules: true
fetch-depth: 0
- uses: actions/setup-python@v4
with:
python-version: ${{ matrix.python-version }}
architecture: x64
- name: Download built wheels
uses: actions/download-artifact@v3
with:
name: wheels
path: dist
- name: Setup Virtual Env
run: |
python -m venv venv
echo "$GITHUB_WORKSPACE/venv/bin" >> $GITHUB_PATH
- name: Install Daft and dev dependencies
run: |
pip install --upgrade pip
pip install -r requirements-dev.txt dist/${{ env.package-name }}-*x86_64*.whl --force-reinstall
rm -rf daft
- name: Prepare tmpdirs for IO services
run: |
mkdir -p /tmp/daft-integration-testing/nginx
chmod +rw /tmp/daft-integration-testing/nginx
- name: Spin up IO services
uses: isbang/compose-action@v1.4.1
with:
compose-file: ./tests/integration/docker-compose/docker-compose.yml
down-flags: --volumes
- name: Run IO integration tests
run: |
pytest tests/integration/io -m 'integration' --durations=50
env:
DAFT_RUNNER: ${{ matrix.daft-runner }}
- name: Send Slack notification on failure
uses: slackapi/slack-github-action@v1.24.0
if: ${{ failure() && (github.ref == 'refs/heads/main') }}
with:
payload: |
{
"blocks": [
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": ":rotating_light: [CI] IO Integration Tests <${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}|workflow> *FAILED on main* :rotating_light:"
}
}
]
}
env:
SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK_URL }}
SLACK_WEBHOOK_TYPE: INCOMING_WEBHOOK

rust-tests:
runs-on: ubuntu-latest
timeout-minutes: 15
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ module = 'daft.*'
warn_return_any = false

[tool.pytest.ini_options]
addopts = "--benchmark-skip -m 'not hypothesis'"
addopts = "--benchmark-skip -m 'not hypothesis' -m 'not integration'"
minversion = "6.0"
testpaths = [
"tests"
Expand Down
4 changes: 4 additions & 0 deletions requirements-dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
ipdb
maturin
pre-commit
docker

# Tracing
orjson==3.9.1 # orjson recommended for viztracer
Expand Down Expand Up @@ -33,6 +34,9 @@ ray[data, default]==2.4.0
# AWS
s3fs==2023.1.0; python_version < '3.8'
s3fs==2023.6.0; python_version >= '3.8'
# on old versions of s3fs's pinned botocore, they neglected to pin urllib3<2 which leads to:
# "ImportError: cannot import name 'DEFAULT_CIPHERS' from 'urllib3.util.ssl_'"
urllib3<2; python_version < '3.8'

# Documentation
myst-nb>=0.16.0
Expand Down
28 changes: 28 additions & 0 deletions src/daft-io/src/python.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,17 @@ impl PyIOConfig {
},
}
}

pub fn __repr__(&self) -> PyResult<String> {
Ok(format!("{}", self.config))
}

#[getter]
pub fn s3(&self) -> PyResult<PyS3Config> {
Ok(PyS3Config {
config: self.config.s3.clone(),
})
}
}

#[pymethods]
Expand All @@ -52,6 +60,26 @@ impl PyS3Config {
pub fn __repr__(&self) -> PyResult<String> {
Ok(format!("{}", self.config))
}

#[getter]
pub fn region_name(&self) -> PyResult<Option<String>> {
Ok(self.config.region_name.clone())
}

#[getter]
pub fn endpoint_url(&self) -> PyResult<Option<String>> {
Ok(self.config.endpoint_url.clone())
}

#[getter]
pub fn key_id(&self) -> PyResult<Option<String>> {
Ok(self.config.key_id.clone())
}

#[getter]
pub fn access_key(&self) -> PyResult<Option<String>> {
Ok(self.config.access_key.clone())
}
}

pub fn register_modules(_py: Python, parent: &PyModule) -> PyResult<()> {
Expand Down
4 changes: 4 additions & 0 deletions tests/integration/docker-compose/Dockerfile.nginx
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
FROM nginx:alpine

WORKDIR /app
COPY ./nginx-serve-static-files.conf /etc/nginx/nginx.conf
28 changes: 28 additions & 0 deletions tests/integration/docker-compose/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
services:

# MinIO is an S3-compatible API object store
# Test fixtures should populate data in MinIO directly using S3 APIs such as boto3 or s3fs
minio:
image: quay.io/minio/minio
volumes:
- /data
ports:
- 9000:9000
environment:
MINIO_ROOT_USER: minioadmin
MINIO_ROOT_PASSWORD: minioadmin
command:
- server
- /data

# Use nginx to serve static files
# Test fixtures should dump data in the `/tmp/daft-integration-testing/nginx` folder
nginx:
image: nginx-serve-static-files
build:
context: .
dockerfile: Dockerfile.nginx
volumes:
- /tmp/daft-integration-testing/nginx:/app/static:rw
ports:
- 8080:8080
44 changes: 44 additions & 0 deletions tests/integration/docker-compose/nginx-serve-static-files.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
events {
worker_connections 1024;
}

http {
include mime.types;
sendfile on;

server {
listen 8080;
listen [::]:8080;

resolver 127.0.0.11;
autoindex off;

server_name _;
server_tokens off;

root /app/static;
gzip_static on;

location /400.html {
return 400 "<b>Sike it's a hardcoded 400!</b>";
}
location /401.html {
return 401 "<b>Sike it's a hardcoded 401!</b>";
}
location /403.html {
return 403 "<b>Sike it's a hardcoded 403!</b>";
}
location /404.html {
return 404 "<b>Sike it's a hardcoded 404!</b>";
}
location /429.html {
return 429 "<b>Sike it's a hardcoded 429!</b>";
}
location /500.html {
return 500 "<b>Sike it's a hardcoded 500!</b>";
}
location /503.html {
return 503 "<b>Sike it's a hardcoded 503!</b>";
}
}
}
Empty file.
115 changes: 115 additions & 0 deletions tests/integration/io/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
from __future__ import annotations

import io
import pathlib
from typing import Generator, TypeVar

import numpy as np
import pytest
import s3fs
from PIL import Image

import daft

T = TypeVar("T")

YieldFixture = Generator[T, None, None]


@pytest.fixture(scope="session")
def minio_io_config() -> daft.io.IOConfig:
return daft.io.IOConfig(
s3=daft.io.S3Config(
endpoint_url="http://127.0.0.1:9000",
key_id="minioadmin",
access_key="minioadmin",
)
)


@pytest.fixture(scope="session")
def aws_public_s3_config() -> daft.io.IOConfig:
return daft.io.IOConfig(
s3=daft.io.S3Config(
# NOTE: no keys or endpoints specified for an AWS public s3 bucket
region_name="us-west-2",
)
)


@pytest.fixture(scope="session")
def nginx_config() -> tuple[str, pathlib.Path]:
"""Returns the (nginx_server_url, static_files_tmpdir) as a tuple"""
return (
"http://127.0.0.1:8080",
pathlib.Path("/tmp/daft-integration-testing/nginx"),
)


@pytest.fixture(scope="session")
def image_data() -> YieldFixture[bytes]:
"""A small bit of fake image JPEG data"""
bio = io.BytesIO()
image = Image.fromarray(np.ones((3, 3)).astype(np.uint8))
image.save(bio, format="JPEG")
return bio.getvalue()


###
# NGINX-based fixtures
###


@pytest.fixture(scope="function")
def mock_http_image_urls(nginx_config, image_data) -> YieldFixture[str]:
"""Uses the docker-compose Nginx server to serve HTTP image URLs
This fixture yields:
list[str]: URLs of files available on the HTTP server
"""
server_url, static_assets_tmpdir = nginx_config

# Add image files to the tmpdir
urls = []
for i in range(10):
image_filepath = static_assets_tmpdir / f"{i}.jpeg"
image_filepath.write_bytes(image_data)
urls.append(f"{server_url}/{image_filepath.relative_to(static_assets_tmpdir)}")

try:
yield urls
# Remember to cleanup!
finally:
for child in static_assets_tmpdir.glob("*"):
child.unlink()


###
# S3-based fixtures
###


@pytest.fixture(scope="function")
def minio_image_data_fixture(minio_io_config, image_data) -> YieldFixture[list[str]]:
"""Populates the minio session with some fake data and yields (S3Config, paths)"""
fs = s3fs.S3FileSystem(
key=minio_io_config.s3.key_id,
password=minio_io_config.s3.access_key,
client_kwargs={"endpoint_url": minio_io_config.s3.endpoint_url},
)
bucket = "image-bucket"
fs.mkdir(bucket)

# Add some images into `s3://image-bucket`
urls = []
for i in range(10):
key = f"{i}.jpeg"
url = f"s3://{bucket}/{key}"
fs.write_bytes(url, image_data)
urls.append(url)

try:
yield urls
# Remember to cleanup!
finally:
fs.rm(bucket, recursive=True)
48 changes: 48 additions & 0 deletions tests/integration/io/test_url_download_http.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
from __future__ import annotations

import pytest
from aiohttp.client_exceptions import ClientResponseError

import daft


@pytest.mark.integration()
@pytest.mark.parametrize("use_native_downloader", [True, False])
def test_url_download_http(mock_http_image_urls, image_data, use_native_downloader):
data = {"urls": mock_http_image_urls}
df = daft.from_pydict(data)
df = df.with_column("data", df["urls"].url.download(use_native_downloader=use_native_downloader))
assert df.to_pydict() == {**data, "data": [image_data for _ in range(len(mock_http_image_urls))]}


@pytest.mark.integration()
@pytest.mark.parametrize("status_code", [400, 401, 403, 404, 429, 500, 503])
@pytest.mark.parametrize("use_native_downloader", [True, False])
def test_url_download_http_error_codes(nginx_config, use_native_downloader, status_code):
server_url, _ = nginx_config
data = {"urls": [f"{server_url}/{status_code}.html"]}
df = daft.from_pydict(data)
df = df.with_column("data", df["urls"].url.download(on_error="raise", use_native_downloader=use_native_downloader))

skip_fsspec_downloader = daft.context.get_context().runner_config.name == "ray"

# 404 should always be corner-cased to return FileNotFoundError regardless of I/O implementation
if status_code == 404:
with pytest.raises(FileNotFoundError):
df.collect()
# When using fsspec, other error codes are bubbled up to the user as aiohttp.client_exceptions.ClientResponseError
elif not use_native_downloader:
# Ray runner has a pretty catastrophic failure when raising non-pickleable exceptions (ClientResponseError is not pickleable)
# See Ray issue: https://github.com/ray-project/ray/issues/36893
if skip_fsspec_downloader:
pytest.skip()
with pytest.raises(ClientResponseError) as e:
df.collect()
assert e.value.code == status_code
# When using native downloader, we throw a ValueError
else:
with pytest.raises(ValueError) as e:
df.collect()
# NOTE: We may want to add better errors in the future to provide a better
# user-facing I/O error with the error code
assert f"Status({status_code})" in str(e.value)
Loading

0 comments on commit e883d0e

Please sign in to comment.