Skip to content

Commit

Permalink
test(python): add read / write benchmarks (#933)
Browse files Browse the repository at this point in the history
# Description

Considering adding continuous benchmarks to Python reader / writer.

# Related Issue(s)
<!---
For example:

- closes #106
--->

# Documentation

<!---
Share links to useful documentation
--->
  • Loading branch information
wjones127 authored Jan 17, 2023
1 parent e3073eb commit 750f400
Show file tree
Hide file tree
Showing 5 changed files with 120 additions and 10 deletions.
46 changes: 46 additions & 0 deletions .github/workflows/python_build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,52 @@ jobs:
run: |
source venv/bin/activate
make build-documentation
benchmark:
name: Python Benchmark
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2

- name: Install latest nightly
uses: actions-rs/toolchain@v1
with:
toolchain: stable
override: true
components: rustfmt, clippy

- uses: Swatinem/rust-cache@v2

- uses: actions/setup-python@v4
with:
python-version: "3.10"

- name: Build deltalake in release mode
run: |
pip install virtualenv
virtualenv venv
source venv/bin/activate
MATURIN_EXTRA_ARGS=--release make develop
# Download previous benchmark result from cache (if exists)
- name: Download previous benchmark data
uses: actions/cache@v2
with:
path: ./cache
key: ${{ runner.os }}-benchmark

- name: Run benchmark
run: |
source venv/bin/activate
pytest tests/test_benchmark.py --benchmark-json output.json
- name: Store benchmark result
uses: benchmark-action/github-action-benchmark@v1
with:
tool: 'pytest'
output-file-path: python/output.json
external-data-json-path: ./cache/benchmark-data.json
fail-on-alert: true

test-pyspark:
name: PySpark Integration Tests
Expand Down
4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,7 @@ members = [
"dynamodb_lock",
]
exclude = ["proofs", "delta-inspect"]

[profile.release-with-debug]
inherits = "release"
debug = true
3 changes: 2 additions & 1 deletion python/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ devel = [
"sphinx<=4.5",
"sphinx-rtd-theme",
"toml",
"wheel"
"wheel",
"pytest-benchmark"
]
pyspark = [
"pyspark",
Expand Down
17 changes: 8 additions & 9 deletions python/src/filesystem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ impl DeltaFileSystemHandler {
let file = self
.rt
.block_on(ObjectInputFile::try_new(
self.rt.clone(),
Arc::clone(&self.rt),
self.inner.clone(),
path,
))
Expand All @@ -247,7 +247,7 @@ impl DeltaFileSystemHandler {
let file = self
.rt
.block_on(ObjectOutputStream::try_new(
self.rt.clone(),
Arc::clone(&self.rt),
self.inner.clone(),
path,
))
Expand Down Expand Up @@ -404,15 +404,14 @@ impl ObjectInputFile {
};
let nbytes = (range.end - range.start) as i64;
self.pos += nbytes;
let obj = if nbytes > 0 {
let data = if nbytes > 0 {
self.rt
.block_on(self.store.get_range(&self.path, range))
.map_err(PyDeltaTableError::from_object_store)?
.to_vec()
} else {
Vec::new()
"".into()
};
Python::with_gil(|py| Ok(PyBytes::new(py, &obj).into_py(py)))
Python::with_gil(|py| Ok(PyBytes::new(py, data.as_ref()).into_py(py)))
}

fn fileno(&self) -> PyResult<()> {
Expand Down Expand Up @@ -531,10 +530,10 @@ impl ObjectOutputStream {
Err(PyNotImplementedError::new_err("'read' not implemented"))
}

fn write(&mut self, data: Vec<u8>) -> PyResult<i64> {
fn write(&mut self, data: &PyBytes) -> PyResult<i64> {
self.check_closed()?;
let len = data.len() as i64;
match self.rt.block_on(self.writer.write_all(&data)) {
let len = data.as_bytes().len() as i64;
match self.rt.block_on(self.writer.write_all(data.as_bytes())) {
Ok(_) => Ok(len),
Err(err) => {
self.rt
Expand Down
60 changes: 60 additions & 0 deletions python/tests/test_benchmark.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import pyarrow as pa
import pyarrow.fs as pa_fs
import pytest
from numpy.random import standard_normal

from deltalake import DeltaTable, write_deltalake

# NOTE: make sure to run these in release mode with
# MATURIN_EXTRA_ARGS=--release make develop


@pytest.fixture()
def sample_table() -> pa.Table:
max_size_bytes = 1024 * 1024 * 1024
ncols = 20
nrows = max_size_bytes // 20 // 64
tab = pa.table({f"x{i}": standard_normal(nrows) for i in range(ncols)})
# Add index column for sorting
tab = tab.append_column("i", pa.array(range(nrows), type=pa.int64()))
return tab


@pytest.mark.benchmark(group="write")
def test_benchmark_write(benchmark, sample_table, tmp_path):
benchmark(write_deltalake, str(tmp_path), sample_table, mode="overwrite")

dt = DeltaTable(str(tmp_path))
assert dt.to_pyarrow_table().sort_by("i") == sample_table


# TODO: support wrapping PyArrow filesystems
# @pytest.mark.benchmark(
# group="write"
# )
# def test_benchmark_write_pyarrow(benchmark, sample_table, tmp_path):
# fs = pa_fs.SubTreeFileSystem(str(tmp_path), pa_fs.LocalFileSystem())

# benchmark(write_deltalake, str(tmp_path), sample_table, mode="overwrite", filesystem=fs)

# dt = DeltaTable(str(tmp_path))
# assert dt.to_pyarrow_table(filesystem=fs).sort_by("i") == sample_table


@pytest.mark.benchmark(group="read")
def test_benchmark_read(benchmark, sample_table, tmp_path):
write_deltalake(str(tmp_path), sample_table)
dt = DeltaTable(str(tmp_path))

result = benchmark(dt.to_pyarrow_table)
assert result.sort_by("i") == sample_table


@pytest.mark.benchmark(group="read")
def test_benchmark_read_pyarrow(benchmark, sample_table, tmp_path):
write_deltalake(str(tmp_path), sample_table)
dt = DeltaTable(str(tmp_path))

fs = pa_fs.SubTreeFileSystem(str(tmp_path), pa_fs.LocalFileSystem())
result = benchmark(dt.to_pyarrow_table, filesystem=fs)
assert result.sort_by("i") == sample_table

0 comments on commit 750f400

Please sign in to comment.