From a0ba32c7f42114d9de5e48abe3ed7a038da6f654 Mon Sep 17 00:00:00 2001 From: MarcoGorelli <33491632+MarcoGorelli@users.noreply.github.com> Date: Mon, 19 Jun 2023 14:29:03 +0100 Subject: [PATCH] fix(rust, python) groupby rolling was producing wrong windows with non-default negative offsets --- polars/polars-time/src/windows/groupby.rs | 71 +++++++-------- .../polars/testing/parametric/strategies.py | 2 + .../tests/unit/operations/test_rolling.py | 90 +++++++++++++++++++ 3 files changed, 125 insertions(+), 38 deletions(-) diff --git a/polars/polars-time/src/windows/groupby.rs b/polars/polars-time/src/windows/groupby.rs index ab7bd2fbcd26..4ece84ac9e26 100644 --- a/polars/polars-time/src/windows/groupby.rs +++ b/polars/polars-time/src/windows/groupby.rs @@ -525,49 +525,44 @@ pub fn groupby_values( // we have a (partial) lookbehind window if offset.negative { - if offset.duration_ns() >= period.duration_ns() { - // lookbehind - // window is within 2 periods length of t + // lookbehind + if offset.duration_ns() == period.duration_ns() { + // t is right at the end of the window // ------t--- // [------] - if offset.duration_ns() < period.duration_ns() * 2 { - POOL.install(|| { - let vals = thread_offsets - .par_iter() - .copied() - .map(|(base_offset, len)| { - let upper_bound = base_offset + len; - let iter = groupby_values_iter_full_lookbehind( - period, - offset, - &time[..upper_bound], - closed_window, - tu, - tz, - base_offset, - ); - iter.map(|result| result.map(|(offset, len)| [offset, len])) - .collect::>>() - }) - .collect::>>()?; - Ok(flatten_par(&vals)) - }) - } + POOL.install(|| { + let vals = thread_offsets + .par_iter() + .copied() + .map(|(base_offset, len)| { + let upper_bound = base_offset + len; + let iter = groupby_values_iter_full_lookbehind( + period, + offset, + &time[..upper_bound], + closed_window, + tu, + tz, + base_offset, + ); + iter.map(|result| result.map(|(offset, len)| [offset, len])) + .collect::>>() + }) + .collect::>>()?; + Ok(flatten_par(&vals)) + }) + } else if ((offset.duration_ns() >= period.duration_ns()) + && matches!(closed_window, ClosedWindow::Left | ClosedWindow::None)) + || ((offset.duration_ns() > period.duration_ns()) + && matches!(closed_window, ClosedWindow::Right | ClosedWindow::Both)) + { // window is completely behind t and t itself is not a member // ---------------t--- // [---] - else { - let iter = groupby_values_iter_window_behind_t( - period, - offset, - time, - closed_window, - tu, - tz, - ); - iter.map(|result| result.map(|(offset, len)| [offset, len])) - .collect::>() - } + let iter = + groupby_values_iter_window_behind_t(period, offset, time, closed_window, tu, tz); + iter.map(|result| result.map(|(offset, len)| [offset, len])) + .collect::>() } // partial lookbehind // this one is still single threaded diff --git a/py-polars/polars/testing/parametric/strategies.py b/py-polars/polars/testing/parametric/strategies.py index a966b4792c22..39d6cb762af0 100644 --- a/py-polars/polars/testing/parametric/strategies.py +++ b/py-polars/polars/testing/parametric/strategies.py @@ -109,6 +109,8 @@ def between(draw: DrawFn, type_: type, min_: Any, max_: Any) -> Any: min_value=timedelta(microseconds=-(2**46)), max_value=timedelta(microseconds=(2**46) - 1), ) +strategy_closed = sampled_from(["left", "right", "both", "none"]) +strategy_time_unit = sampled_from(["ns", "us", "ms"]) @composite diff --git a/py-polars/tests/unit/operations/test_rolling.py b/py-polars/tests/unit/operations/test_rolling.py index 23cd0f8770f0..ccd9d46b45ba 100644 --- a/py-polars/tests/unit/operations/test_rolling.py +++ b/py-polars/tests/unit/operations/test_rolling.py @@ -70,6 +70,96 @@ def test_rolling_kernels_and_groupby_rolling( assert_frame_equal(out1, out2) +@pytest.mark.parametrize( + ("offset", "closed", "expected_values"), + [ + pytest.param( + "-1d", + "left", + [[1], [1, 2], [2, 3], [3, 4]], + id="partial lookbehind, left", + ), + pytest.param( + "-1d", + "right", + [[1, 2], [2, 3], [3, 4], [4]], + id="partial lookbehind, right", + ), + pytest.param( + "-1d", + "both", + [[1, 2], [1, 2, 3], [2, 3, 4], [3, 4]], + id="partial lookbehind, both", + ), + pytest.param( + "-1d", + "none", + [[1], [2], [3], [4]], + id="partial lookbehind, none", + ), + pytest.param( + "-2d", + "left", + [[], [1], [1, 2], [2, 3]], + id="full lookbehind, left", + ), + pytest.param( + "-3d", + "left", + [[], [], [1], [1, 2]], + id="full lookbehind, offset > period, left", + ), + pytest.param( + "-3d", + "right", + [[], [1], [1, 2], [2, 3]], + id="full lookbehind, right", + ), + pytest.param( + "-3d", + "both", + [[], [1], [1, 2], [1, 2, 3]], + id="full lookbehind, both", + ), + pytest.param( + "-2d", + "none", + [[], [1], [2], [3]], + id="full lookbehind, none", + ), + pytest.param( + "-3d", + "none", + [[], [], [1], [2]], + id="full lookbehind, offset > period, none", + ), + ], +) +def test_rolling_negative_offset( + offset: str, closed: ClosedInterval, expected_values: list[list[int]] +) -> None: + df = pl.DataFrame( + { + "ts": pl.date_range( + datetime(2021, 1, 1), datetime(2021, 1, 4), "1d", eager=True + ), + "value": [1, 2, 3, 4], + } + ) + result = df.groupby_rolling("ts", period="2d", offset=offset, closed=closed).agg( + pl.col("value") + ) + expected = pl.DataFrame( + { + "ts": pl.date_range( + datetime(2021, 1, 1), datetime(2021, 1, 4), "1d", eager=True + ), + "value": expected_values, + } + ) + assert_frame_equal(result, expected) + + def test_rolling_skew() -> None: s = pl.Series([1, 2, 3, 3, 2, 10, 8]) assert s.rolling_skew(window_size=4, bias=True).to_list() == pytest.approx(