Skip to content

Commit

Permalink
fix: Subtraction with overflow on negative slice offset in Parquet (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
nameexhaustion committed Aug 4, 2024
1 parent 9aa91a7 commit 55fc85a
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 9 deletions.
36 changes: 28 additions & 8 deletions crates/polars-mem-engine/src/executors/scan/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ impl ParquetExec {
} else {
// Walk the files in reverse until we find the first file, and then translate the
// slice into a positive-offset equivalent.
let n_from_end = -slice.0 as usize;
let slice_start_as_n_from_end = -slice.0 as usize;
let mut cum_rows = 0;
let chunk_size = 8;
POOL.install(|| {
Expand All @@ -90,7 +90,7 @@ impl ParquetExec {
for (path_idx, rc) in path_indexes.iter().zip(row_counts) {
cum_rows += rc;

if cum_rows >= n_from_end {
if cum_rows >= slice_start_as_n_from_end {
first_file = *path_idx;
break;
}
Expand All @@ -104,8 +104,18 @@ impl ParquetExec {
PolarsResult::Ok(())
})?;

let start = cum_rows - n_from_end;
(start, start + slice.1)
let (start, len) = if slice_start_as_n_from_end > cum_rows {
// We need to trim the slice, e.g. SLICE[offset: -100, len: 75] on a file of 50
// rows should only give the first 25 rows.
let first_file_position = slice_start_as_n_from_end - cum_rows;
(0, slice.1.saturating_sub(first_file_position))
} else {
(cum_rows - slice_start_as_n_from_end, slice.1)
};

let end = start.saturating_add(len);

(start, end)
}
} else {
(0, usize::MAX)
Expand Down Expand Up @@ -256,7 +266,7 @@ impl ParquetExec {
} else {
// Walk the files in reverse until we find the first file, and then translate the
// slice into a positive-offset equivalent.
let n_from_end = -slice.0 as usize;
let slice_start_as_n_from_end = -slice.0 as usize;
let mut cum_rows = 0;

let paths = &self.paths;
Expand Down Expand Up @@ -290,14 +300,24 @@ impl ParquetExec {

cum_rows += num_rows;

if cum_rows >= n_from_end {
if cum_rows >= slice_start_as_n_from_end {
first_file_idx = path_idx;
break;
}
}

let start = cum_rows - n_from_end;
(start, start + slice.1)
let (start, len) = if slice_start_as_n_from_end > cum_rows {
// We need to trim the slice, e.g. SLICE[offset: -100, len: 75] on a file of 50
// rows should only give the first 25 rows.
let first_file_position = slice_start_as_n_from_end - cum_rows;
(0, slice.1.saturating_sub(first_file_position))
} else {
(cum_rows - slice_start_as_n_from_end, slice.1)
};

let end = start.saturating_add(len);

(start, end)
}
} else {
(0, usize::MAX)
Expand Down
15 changes: 14 additions & 1 deletion py-polars/tests/unit/io/test_lazy_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -479,7 +479,9 @@ def test_predicate_push_down_categorical_17744(tmp_path: Path) -> None:


@pytest.mark.parametrize("streaming", [True, False])
def test_parquet_slice_pushdown_nonzero_offset(tmp_path: Path, streaming: bool) -> None:
def test_parquet_slice_pushdown_non_zero_offset(
tmp_path: Path, streaming: bool
) -> None:
paths = [tmp_path / "1", tmp_path / "2", tmp_path / "3"]
dfs = [pl.DataFrame({"x": i}) for i in range(len(paths))]

Expand Down Expand Up @@ -512,3 +514,14 @@ def trim_to_metadata(path: str | Path) -> None:
if not streaming:
assert_frame_equal(pl.scan_parquet(paths).slice(-2, 1).collect(), df)
assert_frame_equal(pl.scan_parquet(paths[:2]).tail(1).collect(), df)
assert_frame_equal(
pl.scan_parquet(paths[1:]).slice(-99, 1).collect(), df.clear()
)

path = tmp_path / "data"
df = pl.select(x=pl.int_range(0, 50))
df.write_parquet(path)
assert_frame_equal(pl.scan_parquet(path).slice(-100, 75).collect(), df.head(25))
assert_frame_equal(
pl.scan_parquet(path).slice(-1, (1 << 32) - 1).collect(), df.tail(1)
)

0 comments on commit 55fc85a

Please sign in to comment.