Skip to content

Commit

Permalink
Move/mark more streaming tests
Browse files Browse the repository at this point in the history
  • Loading branch information
stinodego committed Jan 24, 2024
1 parent f6f46c0 commit 5ee547a
Show file tree
Hide file tree
Showing 7 changed files with 144 additions and 139 deletions.
3 changes: 3 additions & 0 deletions py-polars/tests/unit/io/test_hive.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
@pytest.mark.skip(
reason="Broken by pyarrow 15 release: https://github.com/pola-rs/polars/issues/13892"
)
@pytest.mark.xdist_group("streaming")
@pytest.mark.write_disk()
def test_hive_partitioned_predicate_pushdown(
io_files_path: Path, tmp_path: Path, monkeypatch: Any, capfd: Any
Expand Down Expand Up @@ -91,6 +92,7 @@ def test_hive_partitioned_predicate_pushdown_skips_correct_number_of_files(
@pytest.mark.skip(
reason="Broken by pyarrow 15 release: https://github.com/pola-rs/polars/issues/13892"
)
@pytest.mark.xdist_group("streaming")
@pytest.mark.write_disk()
def test_hive_partitioned_slice_pushdown(io_files_path: Path, tmp_path: Path) -> None:
df = pl.read_ipc(io_files_path / "*.ipc")
Expand Down Expand Up @@ -127,6 +129,7 @@ def test_hive_partitioned_slice_pushdown(io_files_path: Path, tmp_path: Path) ->
@pytest.mark.skip(
reason="Broken by pyarrow 15 release: https://github.com/pola-rs/polars/issues/13892"
)
@pytest.mark.xdist_group("streaming")
@pytest.mark.write_disk()
def test_hive_partitioned_projection_pushdown(
io_files_path: Path, tmp_path: Path
Expand Down
41 changes: 1 addition & 40 deletions py-polars/tests/unit/io/test_lazy_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,45 +201,6 @@ def test_row_index_schema_parquet(parquet_file_path: Path) -> None:
).dtypes == [pl.UInt32, pl.String]


@pytest.mark.write_disk()
def test_parquet_eq_statistics(monkeypatch: Any, capfd: Any, tmp_path: Path) -> None:
tmp_path.mkdir(exist_ok=True)

monkeypatch.setenv("POLARS_VERBOSE", "1")

df = pl.DataFrame({"idx": pl.arange(100, 200, eager=True)}).with_columns(
(pl.col("idx") // 25).alias("part")
)
df = pl.concat(df.partition_by("part", as_dict=False), rechunk=False)
assert df.n_chunks("all") == [4, 4]

file_path = tmp_path / "stats.parquet"
df.write_parquet(file_path, statistics=True, use_pyarrow=False)

file_path = tmp_path / "stats.parquet"
df.write_parquet(file_path, statistics=True, use_pyarrow=False)

for streaming in [False, True]:
for pred in [
pl.col("idx") == 50,
pl.col("idx") == 150,
pl.col("idx") == 210,
]:
result = (
pl.scan_parquet(file_path).filter(pred).collect(streaming=streaming)
)
assert_frame_equal(result, df.filter(pred))

captured = capfd.readouterr().err
assert (
"parquet file must be read, statistics not sufficient for predicate."
in captured
)
assert (
"parquet file can be skipped, the statistics were sufficient"
" to apply the predicate." in captured
)


@pytest.mark.write_disk()
def test_parquet_is_in_statistics(monkeypatch: Any, capfd: Any, tmp_path: Path) -> None:
Expand Down Expand Up @@ -314,7 +275,7 @@ def test_parquet_statistics(monkeypatch: Any, capfd: Any, tmp_path: Path) -> Non


@pytest.mark.write_disk()
def test_streaming_categorical(tmp_path: Path) -> None:
def test_categorical(tmp_path: Path) -> None:
tmp_path.mkdir(exist_ok=True)

df = pl.DataFrame(
Expand Down
16 changes: 0 additions & 16 deletions py-polars/tests/unit/operations/test_group_by.py
Original file line number Diff line number Diff line change
Expand Up @@ -831,22 +831,6 @@ def test_group_by_rolling_deprecated() -> None:
assert_frame_equal(result_lazy, expected, check_row_order=False)


def test_group_by_multiple_keys_one_literal() -> None:
df = pl.DataFrame({"a": [1, 1, 2], "b": [4, 5, 6]})

expected = {"a": [1, 2], "literal": [1, 1], "b": [5, 6]}
for streaming in [True, False]:
assert (
df.lazy()
.group_by("a", pl.lit(1))
.agg(pl.col("b").max())
.sort(["a", "b"])
.collect(streaming=streaming)
.to_dict(as_series=False)
== expected
)


def test_group_by_list_scalar_11749() -> None:
df = pl.DataFrame(
{
Expand Down
82 changes: 0 additions & 82 deletions py-polars/tests/unit/operations/test_join.py
Original file line number Diff line number Diff line change
Expand Up @@ -738,88 +738,6 @@ def test_outer_join_bool() -> None:
}


@pytest.mark.parametrize("streaming", [False, True])
def test_join_null_matches(streaming: bool) -> None:
# null values in joins should never find a match.
df_a = pl.LazyFrame(
{
"idx_a": [0, 1, 2],
"a": [None, 1, 2],
}
)

df_b = pl.LazyFrame(
{
"idx_b": [0, 1, 2, 3],
"a": [None, 2, 1, None],
}
)

expected = pl.DataFrame({"idx_a": [2, 1], "a": [2, 1], "idx_b": [1, 2]})
assert_frame_equal(
df_a.join(df_b, on="a", how="inner").collect(streaming=streaming), expected
)
expected = pl.DataFrame(
{"idx_a": [0, 1, 2], "a": [None, 1, 2], "idx_b": [None, 2, 1]}
)
assert_frame_equal(
df_a.join(df_b, on="a", how="left").collect(streaming=streaming), expected
)
expected = pl.DataFrame(
{
"idx_a": [None, 2, 1, None, 0],
"a": [None, 2, 1, None, None],
"idx_b": [0, 1, 2, 3, None],
"a_right": [None, 2, 1, None, None],
}
)
assert_frame_equal(df_a.join(df_b, on="a", how="outer").collect(), expected)


@pytest.mark.parametrize("streaming", [False, True])
def test_join_null_matches_multiple_keys(streaming: bool) -> None:
df_a = pl.LazyFrame(
{
"a": [None, 1, 2],
"idx": [0, 1, 2],
}
)

df_b = pl.LazyFrame(
{
"a": [None, 2, 1, None, 1],
"idx": [0, 1, 2, 3, 1],
"c": [10, 20, 30, 40, 50],
}
)

expected = pl.DataFrame({"a": [1], "idx": [1], "c": [50]})
assert_frame_equal(
df_a.join(df_b, on=["a", "idx"], how="inner").collect(streaming=streaming),
expected,
)
expected = pl.DataFrame(
{"a": [None, 1, 2], "idx": [0, 1, 2], "c": [None, 50, None]}
)
assert_frame_equal(
df_a.join(df_b, on=["a", "idx"], how="left").collect(streaming=streaming),
expected,
)

expected = pl.DataFrame(
{
"a": [None, None, None, None, None, 1, 2],
"idx": [None, None, None, None, 0, 1, 2],
"a_right": [None, 2, 1, None, None, 1, None],
"idx_right": [0, 1, 2, 3, None, 1, None],
"c": [10, 20, 30, 40, None, 50, None],
}
)
assert_frame_equal(
df_a.join(df_b, on=["a", "idx"], how="outer").sort("a").collect(), expected
)


def test_outer_join_coalesce_different_names_13450() -> None:
df1 = pl.DataFrame({"L1": ["a", "b", "c"], "L3": ["b", "c", "d"], "L2": [1, 2, 3]})
df2 = pl.DataFrame({"L3": ["a", "c", "d"], "R2": [7, 8, 9]})
Expand Down
16 changes: 16 additions & 0 deletions py-polars/tests/unit/streaming/test_streaming_group_by.py
Original file line number Diff line number Diff line change
Expand Up @@ -420,3 +420,19 @@ def test_streaming_group_by_literal(literal: Any) -> None:
"a_count": [20],
"a_sum": [190],
}


@pytest.mark.parametrize("streaming", [True, False])
def test_group_by_multiple_keys_one_literal(streaming: bool) -> None:
df = pl.DataFrame({"a": [1, 1, 2], "b": [4, 5, 6]})

expected = {"a": [1, 2], "literal": [1, 1], "b": [5, 6]}
assert (
df.lazy()
.group_by("a", pl.lit(1))
.agg(pl.col("b").max())
.sort(["a", "b"])
.collect(streaming=streaming)
.to_dict(as_series=False)
== expected
)
42 changes: 41 additions & 1 deletion py-polars/tests/unit/streaming/test_streaming_io.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from __future__ import annotations

from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Any
from unittest.mock import patch

import pytest
Expand Down Expand Up @@ -213,3 +213,43 @@ def test_sink_ndjson_should_write_same_data(
df = pl.read_ndjson(target_path)

assert_frame_equal(df, expected)


@pytest.mark.write_disk()
def test_parquet_eq_statistics(monkeypatch: Any, capfd: Any, tmp_path: Path) -> None:
tmp_path.mkdir(exist_ok=True)

monkeypatch.setenv("POLARS_VERBOSE", "1")

df = pl.DataFrame({"idx": pl.arange(100, 200, eager=True)}).with_columns(
(pl.col("idx") // 25).alias("part")
)
df = pl.concat(df.partition_by("part", as_dict=False), rechunk=False)
assert df.n_chunks("all") == [4, 4]

file_path = tmp_path / "stats.parquet"
df.write_parquet(file_path, statistics=True, use_pyarrow=False)

file_path = tmp_path / "stats.parquet"
df.write_parquet(file_path, statistics=True, use_pyarrow=False)

for streaming in [False, True]:
for pred in [
pl.col("idx") == 50,
pl.col("idx") == 150,
pl.col("idx") == 210,
]:
result = (
pl.scan_parquet(file_path).filter(pred).collect(streaming=streaming)
)
assert_frame_equal(result, df.filter(pred))

captured = capfd.readouterr().err
assert (
"parquet file must be read, statistics not sufficient for predicate."
in captured
)
assert (
"parquet file can be skipped, the statistics were sufficient"
" to apply the predicate." in captured
)
83 changes: 83 additions & 0 deletions py-polars/tests/unit/streaming/test_streaming_join.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import numpy as np
import pandas as pd
import pytest

import polars as pl
from polars.testing import assert_frame_equal
Expand Down Expand Up @@ -105,3 +106,85 @@ def test_streaming_join_rechunk_12498() -> None:
"A": [0, 1, 0, 1],
"B": [0, 0, 1, 1],
}


@pytest.mark.parametrize("streaming", [False, True])
def test_join_null_matches(streaming: bool) -> None:
# null values in joins should never find a match.
df_a = pl.LazyFrame(
{
"idx_a": [0, 1, 2],
"a": [None, 1, 2],
}
)

df_b = pl.LazyFrame(
{
"idx_b": [0, 1, 2, 3],
"a": [None, 2, 1, None],
}
)

expected = pl.DataFrame({"idx_a": [2, 1], "a": [2, 1], "idx_b": [1, 2]})
assert_frame_equal(
df_a.join(df_b, on="a", how="inner").collect(streaming=streaming), expected
)
expected = pl.DataFrame(
{"idx_a": [0, 1, 2], "a": [None, 1, 2], "idx_b": [None, 2, 1]}
)
assert_frame_equal(
df_a.join(df_b, on="a", how="left").collect(streaming=streaming), expected
)
expected = pl.DataFrame(
{
"idx_a": [None, 2, 1, None, 0],
"a": [None, 2, 1, None, None],
"idx_b": [0, 1, 2, 3, None],
"a_right": [None, 2, 1, None, None],
}
)
assert_frame_equal(df_a.join(df_b, on="a", how="outer").collect(), expected)


@pytest.mark.parametrize("streaming", [False, True])
def test_join_null_matches_multiple_keys(streaming: bool) -> None:
df_a = pl.LazyFrame(
{
"a": [None, 1, 2],
"idx": [0, 1, 2],
}
)

df_b = pl.LazyFrame(
{
"a": [None, 2, 1, None, 1],
"idx": [0, 1, 2, 3, 1],
"c": [10, 20, 30, 40, 50],
}
)

expected = pl.DataFrame({"a": [1], "idx": [1], "c": [50]})
assert_frame_equal(
df_a.join(df_b, on=["a", "idx"], how="inner").collect(streaming=streaming),
expected,
)
expected = pl.DataFrame(
{"a": [None, 1, 2], "idx": [0, 1, 2], "c": [None, 50, None]}
)
assert_frame_equal(
df_a.join(df_b, on=["a", "idx"], how="left").collect(streaming=streaming),
expected,
)

expected = pl.DataFrame(
{
"a": [None, None, None, None, None, 1, 2],
"idx": [None, None, None, None, 0, 1, 2],
"a_right": [None, 2, 1, None, None, 1, None],
"idx_right": [0, 1, 2, 3, None, 1, None],
"c": [10, 20, 30, 40, None, 50, None],
}
)
assert_frame_equal(
df_a.join(df_b, on=["a", "idx"], how="outer").sort("a").collect(), expected
)

0 comments on commit 5ee547a

Please sign in to comment.