Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test(python): Fix spurious test failures #13961

Merged
merged 4 commits into from
Jan 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions py-polars/tests/unit/io/test_hive.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
@pytest.mark.skip(
reason="Broken by pyarrow 15 release: https://github.com/pola-rs/polars/issues/13892"
)
@pytest.mark.xdist_group("streaming")
@pytest.mark.write_disk()
def test_hive_partitioned_predicate_pushdown(
io_files_path: Path, tmp_path: Path, monkeypatch: Any, capfd: Any
Expand Down Expand Up @@ -91,6 +92,7 @@ def test_hive_partitioned_predicate_pushdown_skips_correct_number_of_files(
@pytest.mark.skip(
reason="Broken by pyarrow 15 release: https://github.com/pola-rs/polars/issues/13892"
)
@pytest.mark.xdist_group("streaming")
@pytest.mark.write_disk()
def test_hive_partitioned_slice_pushdown(io_files_path: Path, tmp_path: Path) -> None:
df = pl.read_ipc(io_files_path / "*.ipc")
Expand Down Expand Up @@ -127,6 +129,7 @@ def test_hive_partitioned_slice_pushdown(io_files_path: Path, tmp_path: Path) ->
@pytest.mark.skip(
reason="Broken by pyarrow 15 release: https://github.com/pola-rs/polars/issues/13892"
)
@pytest.mark.xdist_group("streaming")
@pytest.mark.write_disk()
def test_hive_partitioned_projection_pushdown(
io_files_path: Path, tmp_path: Path
Expand Down
16 changes: 0 additions & 16 deletions py-polars/tests/unit/io/test_lazy_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,19 +140,3 @@ def test_anonymous_scan_explain(io_files_path: Path) -> None:
q = pl.scan_ndjson(source=file)
assert "Anonymous" in q.explain()
assert "Anonymous" in q.show_graph(raw_output=True) # type: ignore[operator]


def test_sink_ndjson_should_write_same_data(
io_files_path: Path, tmp_path: Path
) -> None:
tmp_path.mkdir(exist_ok=True)
# Arrange
source_path = io_files_path / "foods1.csv"
target_path = tmp_path / "foods_test.ndjson"
expected = pl.read_csv(source_path)
lf = pl.scan_csv(source_path)
# Act
lf.sink_ndjson(target_path)
df = pl.read_ndjson(target_path)
# Assert
assert_frame_equal(df, expected)
42 changes: 1 addition & 41 deletions py-polars/tests/unit/io/test_lazy_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,46 +201,6 @@ def test_row_index_schema_parquet(parquet_file_path: Path) -> None:
).dtypes == [pl.UInt32, pl.String]


@pytest.mark.write_disk()
def test_parquet_eq_statistics(monkeypatch: Any, capfd: Any, tmp_path: Path) -> None:
tmp_path.mkdir(exist_ok=True)

monkeypatch.setenv("POLARS_VERBOSE", "1")

df = pl.DataFrame({"idx": pl.arange(100, 200, eager=True)}).with_columns(
(pl.col("idx") // 25).alias("part")
)
df = pl.concat(df.partition_by("part", as_dict=False), rechunk=False)
assert df.n_chunks("all") == [4, 4]

file_path = tmp_path / "stats.parquet"
df.write_parquet(file_path, statistics=True, use_pyarrow=False)

file_path = tmp_path / "stats.parquet"
df.write_parquet(file_path, statistics=True, use_pyarrow=False)

for streaming in [False, True]:
for pred in [
pl.col("idx") == 50,
pl.col("idx") == 150,
pl.col("idx") == 210,
]:
result = (
pl.scan_parquet(file_path).filter(pred).collect(streaming=streaming)
)
assert_frame_equal(result, df.filter(pred))

captured = capfd.readouterr().err
assert (
"parquet file must be read, statistics not sufficient for predicate."
in captured
)
assert (
"parquet file can be skipped, the statistics were sufficient"
" to apply the predicate." in captured
)


@pytest.mark.write_disk()
def test_parquet_is_in_statistics(monkeypatch: Any, capfd: Any, tmp_path: Path) -> None:
tmp_path.mkdir(exist_ok=True)
Expand Down Expand Up @@ -314,7 +274,7 @@ def test_parquet_statistics(monkeypatch: Any, capfd: Any, tmp_path: Path) -> Non


@pytest.mark.write_disk()
def test_streaming_categorical(tmp_path: Path) -> None:
def test_categorical(tmp_path: Path) -> None:
tmp_path.mkdir(exist_ok=True)

df = pl.DataFrame(
Expand Down
16 changes: 0 additions & 16 deletions py-polars/tests/unit/operations/test_group_by.py
Original file line number Diff line number Diff line change
Expand Up @@ -831,22 +831,6 @@ def test_group_by_rolling_deprecated() -> None:
assert_frame_equal(result_lazy, expected, check_row_order=False)


def test_group_by_multiple_keys_one_literal() -> None:
df = pl.DataFrame({"a": [1, 1, 2], "b": [4, 5, 6]})

expected = {"a": [1, 2], "literal": [1, 1], "b": [5, 6]}
for streaming in [True, False]:
assert (
df.lazy()
.group_by("a", pl.lit(1))
.agg(pl.col("b").max())
.sort(["a", "b"])
.collect(streaming=streaming)
.to_dict(as_series=False)
== expected
)


def test_group_by_list_scalar_11749() -> None:
df = pl.DataFrame(
{
Expand Down
82 changes: 0 additions & 82 deletions py-polars/tests/unit/operations/test_join.py
Original file line number Diff line number Diff line change
Expand Up @@ -738,88 +738,6 @@ def test_outer_join_bool() -> None:
}


@pytest.mark.parametrize("streaming", [False, True])
def test_join_null_matches(streaming: bool) -> None:
# null values in joins should never find a match.
df_a = pl.LazyFrame(
{
"idx_a": [0, 1, 2],
"a": [None, 1, 2],
}
)

df_b = pl.LazyFrame(
{
"idx_b": [0, 1, 2, 3],
"a": [None, 2, 1, None],
}
)

expected = pl.DataFrame({"idx_a": [2, 1], "a": [2, 1], "idx_b": [1, 2]})
assert_frame_equal(
df_a.join(df_b, on="a", how="inner").collect(streaming=streaming), expected
)
expected = pl.DataFrame(
{"idx_a": [0, 1, 2], "a": [None, 1, 2], "idx_b": [None, 2, 1]}
)
assert_frame_equal(
df_a.join(df_b, on="a", how="left").collect(streaming=streaming), expected
)
expected = pl.DataFrame(
{
"idx_a": [None, 2, 1, None, 0],
"a": [None, 2, 1, None, None],
"idx_b": [0, 1, 2, 3, None],
"a_right": [None, 2, 1, None, None],
}
)
assert_frame_equal(df_a.join(df_b, on="a", how="outer").collect(), expected)


@pytest.mark.parametrize("streaming", [False, True])
def test_join_null_matches_multiple_keys(streaming: bool) -> None:
df_a = pl.LazyFrame(
{
"a": [None, 1, 2],
"idx": [0, 1, 2],
}
)

df_b = pl.LazyFrame(
{
"a": [None, 2, 1, None, 1],
"idx": [0, 1, 2, 3, 1],
"c": [10, 20, 30, 40, 50],
}
)

expected = pl.DataFrame({"a": [1], "idx": [1], "c": [50]})
assert_frame_equal(
df_a.join(df_b, on=["a", "idx"], how="inner").collect(streaming=streaming),
expected,
)
expected = pl.DataFrame(
{"a": [None, 1, 2], "idx": [0, 1, 2], "c": [None, 50, None]}
)
assert_frame_equal(
df_a.join(df_b, on=["a", "idx"], how="left").collect(streaming=streaming),
expected,
)

expected = pl.DataFrame(
{
"a": [None, None, None, None, None, 1, 2],
"idx": [None, None, None, None, 0, 1, 2],
"a_right": [None, 2, 1, None, None, 1, None],
"idx_right": [0, 1, 2, 3, None, 1, None],
"c": [10, 20, 30, 40, None, 50, None],
}
)
assert_frame_equal(
df_a.join(df_b, on=["a", "idx"], how="outer").sort("a").collect(), expected
)


def test_outer_join_coalesce_different_names_13450() -> None:
df1 = pl.DataFrame({"L1": ["a", "b", "c"], "L3": ["b", "c", "d"], "L2": [1, 2, 3]})
df2 = pl.DataFrame({"L3": ["a", "c", "d"], "R2": [7, 8, 9]})
Expand Down
3 changes: 3 additions & 0 deletions py-polars/tests/unit/streaming/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
import pytest

pytestmark = pytest.mark.xdist_group("streaming")
2 changes: 0 additions & 2 deletions py-polars/tests/unit/streaming/test_streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
if TYPE_CHECKING:
from polars.type_aliases import JoinStrategy

pytestmark = pytest.mark.xdist_group("streaming")


def test_streaming_categoricals_5921() -> None:
with pl.StringCache():
Expand Down
2 changes: 0 additions & 2 deletions py-polars/tests/unit/streaming/test_streaming_cse.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@
import polars as pl
from polars.testing import assert_frame_equal

pytestmark = pytest.mark.xdist_group("streaming")


def test_cse_expr_selection_streaming(monkeypatch: Any, capfd: Any) -> None:
monkeypatch.setenv("POLARS_VERBOSE", "1")
Expand Down
18 changes: 16 additions & 2 deletions py-polars/tests/unit/streaming/test_streaming_group_by.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@
import polars as pl
from polars.testing import assert_frame_equal

pytestmark = pytest.mark.xdist_group("streaming")


@pytest.mark.slow()
def test_streaming_group_by_sorted_fast_path_nulls_10273() -> None:
Expand Down Expand Up @@ -422,3 +420,19 @@ def test_streaming_group_by_literal(literal: Any) -> None:
"a_count": [20],
"a_sum": [190],
}


@pytest.mark.parametrize("streaming", [True, False])
def test_group_by_multiple_keys_one_literal(streaming: bool) -> None:
df = pl.DataFrame({"a": [1, 1, 2], "b": [4, 5, 6]})

expected = {"a": [1, 2], "literal": [1, 1], "b": [5, 6]}
assert (
df.lazy()
.group_by("a", pl.lit(1))
.agg(pl.col("b").max())
.sort(["a", "b"])
.collect(streaming=streaming)
.to_dict(as_series=False)
== expected
)
67 changes: 61 additions & 6 deletions py-polars/tests/unit/streaming/test_streaming_io.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from __future__ import annotations

import unittest
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Any
from unittest.mock import patch

import pytest

Expand All @@ -12,9 +12,6 @@
from pathlib import Path


pytestmark = pytest.mark.xdist_group("streaming")


@pytest.mark.write_disk()
def test_streaming_parquet_glob_5900(df: pl.DataFrame, tmp_path: Path) -> None:
tmp_path.mkdir(exist_ok=True)
Expand Down Expand Up @@ -122,7 +119,7 @@ def test_sink_csv_with_options() -> None:
passed into the rust-polars correctly.
"""
df = pl.LazyFrame({"dummy": ["abc"]})
with unittest.mock.patch.object(df, "_ldf") as ldf:
with patch.object(df, "_ldf") as ldf:
df.sink_csv(
"path",
include_bom=True,
Expand Down Expand Up @@ -198,3 +195,61 @@ def test_streaming_cross_join_schema(tmp_path: Path) -> None:
a.join(b, how="cross").sink_parquet(file_path)
read = pl.read_parquet(file_path, parallel="none")
assert read.to_dict(as_series=False) == {"a": [1, 2], "b": ["b", "b"]}


@pytest.mark.write_disk()
def test_sink_ndjson_should_write_same_data(
io_files_path: Path, tmp_path: Path
) -> None:
tmp_path.mkdir(exist_ok=True)

source_path = io_files_path / "foods1.csv"
target_path = tmp_path / "foods_test.ndjson"

expected = pl.read_csv(source_path)

lf = pl.scan_csv(source_path)
lf.sink_ndjson(target_path)
df = pl.read_ndjson(target_path)

assert_frame_equal(df, expected)


@pytest.mark.write_disk()
def test_parquet_eq_statistics(monkeypatch: Any, capfd: Any, tmp_path: Path) -> None:
tmp_path.mkdir(exist_ok=True)

monkeypatch.setenv("POLARS_VERBOSE", "1")

df = pl.DataFrame({"idx": pl.arange(100, 200, eager=True)}).with_columns(
(pl.col("idx") // 25).alias("part")
)
df = pl.concat(df.partition_by("part", as_dict=False), rechunk=False)
assert df.n_chunks("all") == [4, 4]

file_path = tmp_path / "stats.parquet"
df.write_parquet(file_path, statistics=True, use_pyarrow=False)

file_path = tmp_path / "stats.parquet"
df.write_parquet(file_path, statistics=True, use_pyarrow=False)

for streaming in [False, True]:
for pred in [
pl.col("idx") == 50,
pl.col("idx") == 150,
pl.col("idx") == 210,
]:
result = (
pl.scan_parquet(file_path).filter(pred).collect(streaming=streaming)
)
assert_frame_equal(result, df.filter(pred))

captured = capfd.readouterr().err
assert (
"parquet file must be read, statistics not sufficient for predicate."
in captured
)
assert (
"parquet file can be skipped, the statistics were sufficient"
" to apply the predicate." in captured
)
Loading