From 750f4008eaf8a91afe11f18eb92910562edc3843 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Tue, 17 Jan 2023 10:00:45 -0800 Subject: [PATCH] test(python): add read / write benchmarks (#933) # Description Considering adding continuous benchmarks to Python reader / writer. # Related Issue(s) # Documentation --- .github/workflows/python_build.yml | 46 +++++++++++++++++++++++ Cargo.toml | 4 ++ python/pyproject.toml | 3 +- python/src/filesystem.rs | 17 ++++----- python/tests/test_benchmark.py | 60 ++++++++++++++++++++++++++++++ 5 files changed, 120 insertions(+), 10 deletions(-) create mode 100644 python/tests/test_benchmark.py diff --git a/.github/workflows/python_build.yml b/.github/workflows/python_build.yml index 1da748b3a3..29cbbfdb71 100644 --- a/.github/workflows/python_build.yml +++ b/.github/workflows/python_build.yml @@ -120,6 +120,52 @@ jobs: run: | source venv/bin/activate make build-documentation + + benchmark: + name: Python Benchmark + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + + - name: Install latest nightly + uses: actions-rs/toolchain@v1 + with: + toolchain: stable + override: true + components: rustfmt, clippy + + - uses: Swatinem/rust-cache@v2 + + - uses: actions/setup-python@v4 + with: + python-version: "3.10" + + - name: Build deltalake in release mode + run: | + pip install virtualenv + virtualenv venv + source venv/bin/activate + MATURIN_EXTRA_ARGS=--release make develop + + # Download previous benchmark result from cache (if exists) + - name: Download previous benchmark data + uses: actions/cache@v2 + with: + path: ./cache + key: ${{ runner.os }}-benchmark + + - name: Run benchmark + run: | + source venv/bin/activate + pytest tests/test_benchmark.py --benchmark-json output.json + + - name: Store benchmark result + uses: benchmark-action/github-action-benchmark@v1 + with: + tool: 'pytest' + output-file-path: python/output.json + external-data-json-path: ./cache/benchmark-data.json + fail-on-alert: true test-pyspark: name: PySpark Integration Tests diff --git a/Cargo.toml b/Cargo.toml index 8c9f633ce0..06b3cd40b8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,3 +8,7 @@ members = [ "dynamodb_lock", ] exclude = ["proofs", "delta-inspect"] + +[profile.release-with-debug] +inherits = "release" +debug = true diff --git a/python/pyproject.toml b/python/pyproject.toml index c66f70dc42..0bf2ef6b4f 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -35,7 +35,8 @@ devel = [ "sphinx<=4.5", "sphinx-rtd-theme", "toml", - "wheel" + "wheel", + "pytest-benchmark" ] pyspark = [ "pyspark", diff --git a/python/src/filesystem.rs b/python/src/filesystem.rs index f9eb817586..2cd30c1a59 100644 --- a/python/src/filesystem.rs +++ b/python/src/filesystem.rs @@ -229,7 +229,7 @@ impl DeltaFileSystemHandler { let file = self .rt .block_on(ObjectInputFile::try_new( - self.rt.clone(), + Arc::clone(&self.rt), self.inner.clone(), path, )) @@ -247,7 +247,7 @@ impl DeltaFileSystemHandler { let file = self .rt .block_on(ObjectOutputStream::try_new( - self.rt.clone(), + Arc::clone(&self.rt), self.inner.clone(), path, )) @@ -404,15 +404,14 @@ impl ObjectInputFile { }; let nbytes = (range.end - range.start) as i64; self.pos += nbytes; - let obj = if nbytes > 0 { + let data = if nbytes > 0 { self.rt .block_on(self.store.get_range(&self.path, range)) .map_err(PyDeltaTableError::from_object_store)? - .to_vec() } else { - Vec::new() + "".into() }; - Python::with_gil(|py| Ok(PyBytes::new(py, &obj).into_py(py))) + Python::with_gil(|py| Ok(PyBytes::new(py, data.as_ref()).into_py(py))) } fn fileno(&self) -> PyResult<()> { @@ -531,10 +530,10 @@ impl ObjectOutputStream { Err(PyNotImplementedError::new_err("'read' not implemented")) } - fn write(&mut self, data: Vec) -> PyResult { + fn write(&mut self, data: &PyBytes) -> PyResult { self.check_closed()?; - let len = data.len() as i64; - match self.rt.block_on(self.writer.write_all(&data)) { + let len = data.as_bytes().len() as i64; + match self.rt.block_on(self.writer.write_all(data.as_bytes())) { Ok(_) => Ok(len), Err(err) => { self.rt diff --git a/python/tests/test_benchmark.py b/python/tests/test_benchmark.py new file mode 100644 index 0000000000..66dc02d9b6 --- /dev/null +++ b/python/tests/test_benchmark.py @@ -0,0 +1,60 @@ +import pyarrow as pa +import pyarrow.fs as pa_fs +import pytest +from numpy.random import standard_normal + +from deltalake import DeltaTable, write_deltalake + +# NOTE: make sure to run these in release mode with +# MATURIN_EXTRA_ARGS=--release make develop + + +@pytest.fixture() +def sample_table() -> pa.Table: + max_size_bytes = 1024 * 1024 * 1024 + ncols = 20 + nrows = max_size_bytes // 20 // 64 + tab = pa.table({f"x{i}": standard_normal(nrows) for i in range(ncols)}) + # Add index column for sorting + tab = tab.append_column("i", pa.array(range(nrows), type=pa.int64())) + return tab + + +@pytest.mark.benchmark(group="write") +def test_benchmark_write(benchmark, sample_table, tmp_path): + benchmark(write_deltalake, str(tmp_path), sample_table, mode="overwrite") + + dt = DeltaTable(str(tmp_path)) + assert dt.to_pyarrow_table().sort_by("i") == sample_table + + +# TODO: support wrapping PyArrow filesystems +# @pytest.mark.benchmark( +# group="write" +# ) +# def test_benchmark_write_pyarrow(benchmark, sample_table, tmp_path): +# fs = pa_fs.SubTreeFileSystem(str(tmp_path), pa_fs.LocalFileSystem()) + +# benchmark(write_deltalake, str(tmp_path), sample_table, mode="overwrite", filesystem=fs) + +# dt = DeltaTable(str(tmp_path)) +# assert dt.to_pyarrow_table(filesystem=fs).sort_by("i") == sample_table + + +@pytest.mark.benchmark(group="read") +def test_benchmark_read(benchmark, sample_table, tmp_path): + write_deltalake(str(tmp_path), sample_table) + dt = DeltaTable(str(tmp_path)) + + result = benchmark(dt.to_pyarrow_table) + assert result.sort_by("i") == sample_table + + +@pytest.mark.benchmark(group="read") +def test_benchmark_read_pyarrow(benchmark, sample_table, tmp_path): + write_deltalake(str(tmp_path), sample_table) + dt = DeltaTable(str(tmp_path)) + + fs = pa_fs.SubTreeFileSystem(str(tmp_path), pa_fs.LocalFileSystem()) + result = benchmark(dt.to_pyarrow_table, filesystem=fs) + assert result.sort_by("i") == sample_table