Skip to content

Commit

Permalink
test: Ensure we hit the spilled source path in ooc sort test (#15010)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 authored Mar 12, 2024
1 parent 3839e97 commit 1829468
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 2 deletions.
26 changes: 26 additions & 0 deletions crates/polars-pipe/src/executors/sinks/sort/ooc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,24 @@ impl PartitionSpiller {
pub(crate) fn len(&self) -> usize {
self.partitions.len()
}

#[cfg(debug_assertions)]
// Used in testing only.
fn spill_all(&self, io_thread: &IOThread) {
let min_len = std::cmp::max(self.partitions.len() / POOL.current_num_threads(), 2);
POOL.install(|| {
self.partitions
.par_iter()
.with_min_len(min_len)
.enumerate()
.for_each(|(part, part_buf)| {
if let Some(df) = part_buf.finish() {
io_thread.dump_partition_local(part as IdxSize, df)
}
})
});
eprintln!("PARTITIONED FORCE SPILLED")
}
}

#[allow(clippy::too_many_arguments)]
Expand Down Expand Up @@ -162,6 +180,14 @@ pub(super) fn sort_ooc(
eprintln!("partitioning sort took: {:?}", now.elapsed());
}

// Branch for testing so we hit different parts in the Source phase.
#[cfg(debug_assertions)]
{
if std::env::var("POLARS_SPILL_SORT_PARTITIONS").is_ok() {
partitions_spiller.spill_all(&io_thread)
}
}

let files = std::fs::read_dir(dir)?
.flat_map(|entry| {
entry
Expand Down
16 changes: 14 additions & 2 deletions py-polars/tests/unit/streaming/test_streaming_sort.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,11 +93,16 @@ def test_ooc_sort(tmp_path: Path, monkeypatch: Any) -> None:


@pytest.mark.write_disk()
def test_streaming_sort(tmp_path: Path, monkeypatch: Any, capfd: Any) -> None:
@pytest.mark.parametrize("spill_source", [True, False])
def test_streaming_sort(
tmp_path: Path, monkeypatch: Any, capfd: Any, spill_source: bool
) -> None:
tmp_path.mkdir(exist_ok=True)
monkeypatch.setenv("POLARS_TEMP_DIR", str(tmp_path))
monkeypatch.setenv("POLARS_FORCE_OOC", "1")
monkeypatch.setenv("POLARS_VERBOSE", "1")
if spill_source:
monkeypatch.setenv("POLARS_SPILL_SORT_PARTITIONS", "1")
# this creates a lot of duplicate partitions and triggers: #7568
assert (
pl.Series(np.random.randint(0, 100, 100))
Expand All @@ -109,13 +114,20 @@ def test_streaming_sort(tmp_path: Path, monkeypatch: Any, capfd: Any) -> None:
)
(_, err) = capfd.readouterr()
assert "df -> sort" in err
if spill_source:
assert "PARTITIONED FORCE SPILLED" in err


@pytest.mark.write_disk()
def test_out_of_core_sort_9503(tmp_path: Path, monkeypatch: Any) -> None:
@pytest.mark.parametrize("spill_source", [True, False])
def test_out_of_core_sort_9503(
tmp_path: Path, monkeypatch: Any, spill_source: bool
) -> None:
tmp_path.mkdir(exist_ok=True)
monkeypatch.setenv("POLARS_TEMP_DIR", str(tmp_path))
monkeypatch.setenv("POLARS_FORCE_OOC", "1")
if spill_source:
monkeypatch.setenv("POLARS_SPILL_SORT_PARTITIONS", "1")
np.random.seed(0)

num_rows = 100_000
Expand Down

0 comments on commit 1829468

Please sign in to comment.