Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test(python): add read / write benchmarks #933

Merged
merged 10 commits into from
Jan 17, 2023
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions .github/workflows/python_build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,52 @@ jobs:
run: |
source venv/bin/activate
make build-documentation

benchmark:
name: Python Benchmark
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2

- name: Install latest nightly
uses: actions-rs/toolchain@v1
with:
toolchain: stable
override: true
components: rustfmt, clippy

- uses: Swatinem/rust-cache@v1

- uses: actions/setup-python@v3
with:
python-version: "3.10"

- name: Build deltalake in release mode
run: |
pip install virtualenv
virtualenv venv
source venv/bin/activate
MATURIN_EXTRA_ARGS=--release make develop

# Download previous benchmark result from cache (if exists)
- name: Download previous benchmark data
uses: actions/cache@v1
with:
path: ./cache
key: ${{ runner.os }}-benchmark

- name: Run benchmark
run: |
source venv/bin/activate
pytest tests/test_benchmark.py --benchmark-json output.json

- name: Store benchmark result
uses: benchmark-action/github-action-benchmark@v1
with:
tool: 'pytest'
output-file-path: output.json
external-data-json-path: ./cache/benchmark-data.json
fail-on-alert: true

test-pyspark:
name: PySpark Integration Tests
Expand Down
7 changes: 7 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,10 @@ members = [
"dynamodb_lock",
]
exclude = ["proofs", "delta-inspect"]

[profile.dev]
split-debuginfo = "unpacked"

[profile.release-with-debug]
wjones127 marked this conversation as resolved.
Show resolved Hide resolved
inherits = "release"
debug = true
3 changes: 2 additions & 1 deletion python/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ devel = [
"sphinx<=4.5",
"sphinx-rtd-theme",
"toml",
"wheel"
"wheel",
"pytest-benchmark"
]
pyspark = [
"pyspark",
Expand Down
17 changes: 8 additions & 9 deletions python/src/filesystem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ impl DeltaFileSystemHandler {
let file = self
.rt
.block_on(ObjectInputFile::try_new(
self.rt.clone(),
Arc::clone(&self.rt),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just out of curiosity, I would have thought the statements are equivalent, since rt is inside an Arc ... ?

self.inner.clone(),
path,
))
Expand All @@ -247,7 +247,7 @@ impl DeltaFileSystemHandler {
let file = self
.rt
.block_on(ObjectOutputStream::try_new(
self.rt.clone(),
Arc::clone(&self.rt),
self.inner.clone(),
path,
))
Expand Down Expand Up @@ -404,15 +404,14 @@ impl ObjectInputFile {
};
let nbytes = (range.end - range.start) as i64;
self.pos += nbytes;
let obj = if nbytes > 0 {
let data = if nbytes > 0 {
self.rt
.block_on(self.store.get_range(&self.path, range))
.map_err(PyDeltaTableError::from_object_store)?
.to_vec()
} else {
Vec::new()
"".into()
};
Python::with_gil(|py| Ok(PyBytes::new(py, &obj).into_py(py)))
Python::with_gil(|py| Ok(PyBytes::new(py, data.as_ref()).into_py(py)))
}

fn fileno(&self) -> PyResult<()> {
Expand Down Expand Up @@ -531,10 +530,10 @@ impl ObjectOutputStream {
Err(PyNotImplementedError::new_err("'read' not implemented"))
}

fn write(&mut self, data: Vec<u8>) -> PyResult<i64> {
fn write(&mut self, data: &PyBytes) -> PyResult<i64> {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using Vec<u8> meant PyO3 was cloning all the input data. By using PyBytes we avoid the copy.

self.check_closed()?;
let len = data.len() as i64;
match self.rt.block_on(self.writer.write_all(&data)) {
let len = data.as_bytes().len() as i64;
match self.rt.block_on(self.writer.write_all(data.as_bytes())) {
Ok(_) => Ok(len),
Err(err) => {
self.rt
Expand Down
60 changes: 60 additions & 0 deletions python/tests/test_benchmark.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import pyarrow as pa
import pyarrow.fs as pa_fs
import pytest
from numpy.random import standard_normal

from deltalake import DeltaTable, write_deltalake

# NOTE: make sure to run these in release mode with
# MATURIN_EXTRA_ARGS=--release make develop


@pytest.fixture()
def sample_table() -> pa.Table:
max_size_bytes = 1024 * 1024 * 1024
ncols = 20
nrows = max_size_bytes // 20 // 64
tab = pa.table({f"x{i}": standard_normal(nrows) for i in range(ncols)})
# Add index column for sorting
tab = tab.append_column("i", pa.array(range(nrows), type=pa.int64()))
return tab


@pytest.mark.benchmark(group="write")
def test_benchmark_write(benchmark, sample_table, tmp_path):
benchmark(write_deltalake, str(tmp_path), sample_table, mode="overwrite")

dt = DeltaTable(str(tmp_path))
assert dt.to_pyarrow_table().sort_by("i") == sample_table


# TODO: support wrapping PyArrow filesystems
# @pytest.mark.benchmark(
# group="write"
# )
# def test_benchmark_write_pyarrow(benchmark, sample_table, tmp_path):
# fs = pa_fs.SubTreeFileSystem(str(tmp_path), pa_fs.LocalFileSystem())

# benchmark(write_deltalake, str(tmp_path), sample_table, mode="overwrite", filesystem=fs)

# dt = DeltaTable(str(tmp_path))
# assert dt.to_pyarrow_table(filesystem=fs).sort_by("i") == sample_table


@pytest.mark.benchmark(group="read")
def test_benchmark_read(benchmark, sample_table, tmp_path):
write_deltalake(str(tmp_path), sample_table)
dt = DeltaTable(str(tmp_path))

result = benchmark(dt.to_pyarrow_table)
assert result.sort_by("i") == sample_table


@pytest.mark.benchmark(group="read")
def test_benchmark_read_pyarrow(benchmark, sample_table, tmp_path):
write_deltalake(str(tmp_path), sample_table)
dt = DeltaTable(str(tmp_path))

fs = pa_fs.SubTreeFileSystem(str(tmp_path), pa_fs.LocalFileSystem())
result = benchmark(dt.to_pyarrow_table, filesystem=fs)
assert result.sort_by("i") == sample_table