Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(rust, python): groupby rolling with negative offset #9428

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 37 additions & 43 deletions polars/polars-time/src/windows/groupby.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,8 +223,8 @@ pub fn groupby_windows(
(groups, lower_bound, upper_bound)
}

// this assumes that the starting point is alwa
pub(crate) fn groupby_values_iter_full_lookbehind(
// this assumes that the given time point is the right endpoint of the window
pub(crate) fn groupby_values_iter_lookbehind(
period: Duration,
offset: Duration,
time: &[i64],
Expand All @@ -233,7 +233,7 @@ pub(crate) fn groupby_values_iter_full_lookbehind(
tz: Option<Tz>,
start_offset: usize,
) -> impl Iterator<Item = PolarsResult<(IdxSize, IdxSize)>> + TrustedLen + '_ {
debug_assert!(offset.duration_ns() >= period.duration_ns());
debug_assert!(offset.duration_ns() == period.duration_ns());
debug_assert!(offset.negative);
let add = match tu {
TimeUnit::Nanoseconds => Duration::add_ns,
Expand Down Expand Up @@ -465,8 +465,7 @@ pub(crate) fn groupby_values_iter<'a>(
offset.negative = !period.negative;
if offset.duration_ns() > 0 {
// t is at the right endpoint of the window
let iter =
groupby_values_iter_full_lookbehind(period, offset, time, closed_window, tu, tz, 0);
let iter = groupby_values_iter_lookbehind(period, offset, time, closed_window, tu, tz, 0);
Box::new(iter)
} else if closed_window == ClosedWindow::Right || closed_window == ClosedWindow::None {
// only lookahead
Expand Down Expand Up @@ -514,49 +513,44 @@ pub fn groupby_values(

// we have a (partial) lookbehind window
if offset.negative {
if offset.duration_ns() >= period.duration_ns() {
Copy link
Collaborator Author

@MarcoGorelli MarcoGorelli Jun 18, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here, previously there were two paths:

  • offset >= period, offset < period * 2: groupby_values_iter_full_lookbehind
  • offset >= period, offset >= period * 2: groupby_values_iter_window_behind_t
  • offset < period: groupby_values_iter_partial_lookbehind

I don't get why there's the < period * 2 check. Looks like it comes from https://github.com/pola-rs/polars/pull/4010/files, but I don't see why

Anyway, groupby_values_iter_full_lookbehind assumes t is at the end of the window (i.e. period == offset), so changing the logic to

  • offset == period: groupby_values_iter_full_lookbehind
  • offset > period: groupby_values_iter_window_behind_t (slower, but this is quite unusual anyway?)
  • offset < period: groupby_values_iter_partial_lookbehind

fixes all the test cases

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I belive groupby_values_iter_full_lookbehind assumes that t is completely behind the window. So there are more cases where we have that besides period == offset.

I will have to dive into it which cases it were again. Do you have on top of mind which predicate would inlcude all cases where t is full lookbehind?

This is beneficial as in that case we can parallelize over t and then look from that point backwards in the slice to find the window.

Copy link
Collaborator Author

@MarcoGorelli MarcoGorelli Jun 20, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assumes that t is completely behind the window

If period == offset and closed =='right', then t is indeed included in the window (it's the right endpoint). For example the window could be (2020-01-01, 2020-01-02] and t could be 2020-01-02.

From testing, that function only works if offset== period. There's an explicit check for when closed=='right', i.e. when it's not a full lookbehind:

if matches!(closed_window, ClosedWindow::Right | ClosedWindow::Both) {
len += 1;
}

For offset > period, then it's incorrect for any value of closed: #9250

It may be possible to change it so it handles the case when offset > period. But for now, I'm suggesting to:

  • rename it, as when t is the right endpoint then if closed='right' then it's not a full lookbehind
  • only use it when offset == period (so at least the results are correct)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, let's first make it correct. We can try to find fast paths later if needed. 👍

Thanks!

// lookbehind
// window is within 2 periods length of t
// lookbehind
if offset.duration_ns() == period.duration_ns() {
// t is right at the end of the window
// ------t---
// [------]
if offset.duration_ns() < period.duration_ns() * 2 {
POOL.install(|| {
let vals = thread_offsets
.par_iter()
.copied()
.map(|(base_offset, len)| {
let upper_bound = base_offset + len;
let iter = groupby_values_iter_full_lookbehind(
period,
offset,
&time[..upper_bound],
closed_window,
tu,
tz,
base_offset,
);
iter.map(|result| result.map(|(offset, len)| [offset, len]))
.collect::<PolarsResult<Vec<_>>>()
})
.collect::<PolarsResult<Vec<_>>>()?;
Ok(flatten_par(&vals))
})
}
POOL.install(|| {
let vals = thread_offsets
.par_iter()
.copied()
.map(|(base_offset, len)| {
let upper_bound = base_offset + len;
let iter = groupby_values_iter_lookbehind(
period,
offset,
&time[..upper_bound],
closed_window,
tu,
tz,
base_offset,
);
iter.map(|result| result.map(|(offset, len)| [offset, len]))
.collect::<PolarsResult<Vec<_>>>()
})
.collect::<PolarsResult<Vec<_>>>()?;
Ok(flatten_par(&vals))
})
} else if ((offset.duration_ns() >= period.duration_ns())
&& matches!(closed_window, ClosedWindow::Left | ClosedWindow::None))
|| ((offset.duration_ns() > period.duration_ns())
&& matches!(closed_window, ClosedWindow::Right | ClosedWindow::Both))
{
// window is completely behind t and t itself is not a member
// ---------------t---
// [---]
else {
let iter = groupby_values_iter_window_behind_t(
period,
offset,
time,
closed_window,
tu,
tz,
);
iter.map(|result| result.map(|(offset, len)| [offset, len]))
.collect::<PolarsResult<_>>()
}
let iter =
groupby_values_iter_window_behind_t(period, offset, time, closed_window, tu, tz);
iter.map(|result| result.map(|(offset, len)| [offset, len]))
.collect::<PolarsResult<_>>()
}
// partial lookbehind
// this one is still single threaded
Expand Down
7 changes: 3 additions & 4 deletions polars/polars-time/src/windows/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -709,10 +709,9 @@ fn test_rolling_lookback() {
ClosedWindow::None,
] {
let offset = Duration::parse("-2h");
let g0 =
groupby_values_iter_full_lookbehind(period, offset, &dates, closed_window, tu, None, 0)
.collect::<PolarsResult<Vec<_>>>()
.unwrap();
let g0 = groupby_values_iter_lookbehind(period, offset, &dates, closed_window, tu, None, 0)
.collect::<PolarsResult<Vec<_>>>()
.unwrap();
let g1 =
groupby_values_iter_partial_lookbehind(period, offset, &dates, closed_window, tu, None)
.collect::<PolarsResult<Vec<_>>>()
Expand Down
51 changes: 49 additions & 2 deletions py-polars/polars/testing/parametric/primitives.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from dataclasses import dataclass
from math import isfinite
from textwrap import dedent
from typing import TYPE_CHECKING, Any, Collection, Sequence
from typing import TYPE_CHECKING, Any, Collection, Sequence, overload

from hypothesis.errors import InvalidArgument, NonInteractiveExampleWarning
from hypothesis.strategies import (
Expand Down Expand Up @@ -41,11 +41,18 @@
)

if TYPE_CHECKING:
import sys

from hypothesis.strategies import DrawFn, SearchStrategy

from polars import LazyFrame
from polars.type_aliases import OneOrMoreDataTypes, PolarsDataType

if sys.version_info >= (3, 8):
from typing import Literal
else:
from typing_extensions import Literal


_time_units = list(DTYPE_TEMPORAL_UNITS)

Expand Down Expand Up @@ -444,11 +451,51 @@ def draw_series(draw: DrawFn) -> Series:
_failed_frame_init_msgs_: set[str] = set()


@overload
def dataframes(
cols: int | column | Sequence[column] | None = None,
*,
lazy: Literal[False] = ...,
min_cols: int | None = 0,
max_cols: int | None = MAX_COLS,
size: int | None = None,
min_size: int | None = 0,
max_size: int | None = MAX_DATA_SIZE,
chunked: bool | None = None,
include_cols: Sequence[column] | column | None = None,
null_probability: float | dict[str, float] = 0.0,
allow_infinities: bool = True,
allowed_dtypes: Collection[PolarsDataType] | PolarsDataType | None = None,
excluded_dtypes: Collection[PolarsDataType] | PolarsDataType | None = None,
) -> SearchStrategy[DataFrame]:
...


@overload
def dataframes(
cols: int | column | Sequence[column] | None = None,
*,
lazy: Literal[True],
min_cols: int | None = 0,
max_cols: int | None = MAX_COLS,
size: int | None = None,
min_size: int | None = 0,
max_size: int | None = MAX_DATA_SIZE,
chunked: bool | None = None,
include_cols: Sequence[column] | column | None = None,
null_probability: float | dict[str, float] = 0.0,
allow_infinities: bool = True,
allowed_dtypes: Collection[PolarsDataType] | PolarsDataType | None = None,
excluded_dtypes: Collection[PolarsDataType] | PolarsDataType | None = None,
) -> SearchStrategy[LazyFrame]:
...


@defines_strategy()
def dataframes(
cols: int | column | Sequence[column] | None = None,
lazy: bool = False,
*,
lazy: bool = False,
min_cols: int | None = 0,
max_cols: int | None = MAX_COLS,
size: int | None = None,
Expand Down
2 changes: 2 additions & 0 deletions py-polars/polars/testing/parametric/strategies.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ def between(draw: DrawFn, type_: type, min_: Any, max_: Any) -> Any:
min_value=timedelta(microseconds=-(2**46)),
max_value=timedelta(microseconds=(2**46) - 1),
)
strategy_closed = sampled_from(["left", "right", "both", "none"])
strategy_time_unit = sampled_from(["ns", "us", "ms"])


@composite
Expand Down
76 changes: 76 additions & 0 deletions py-polars/tests/parametric/test_groupby_rolling.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
from __future__ import annotations

from datetime import timedelta
from typing import TYPE_CHECKING

import hypothesis.strategies as st
from hypothesis import given, reject

import polars as pl
from polars.testing import assert_frame_equal
from polars.testing.parametric.primitives import column, dataframes
from polars.testing.parametric.strategies import strategy_closed, strategy_time_unit
from polars.utils.convert import _timedelta_to_pl_duration

if TYPE_CHECKING:
from polars.type_aliases import ClosedInterval, TimeUnit


@given(
period=st.timedeltas(min_value=timedelta(microseconds=0)).map(
_timedelta_to_pl_duration
),
offset=st.timedeltas().map(_timedelta_to_pl_duration),
closed=strategy_closed,
data=st.data(),
time_unit=strategy_time_unit,
)
def test_groupby_rolling(
period: str,
offset: str,
closed: ClosedInterval,
data: st.DataObject,
time_unit: TimeUnit,
) -> None:
dataframe = data.draw(
dataframes(
[
column("ts", dtype=pl.Datetime(time_unit)),
column("value", dtype=pl.Int64),
],
)
)
df = dataframe.sort("ts").unique("ts")
try:
result = df.groupby_rolling(
"ts", period=period, offset=offset, closed=closed
).agg(pl.col("value"))
except pl.exceptions.PolarsPanicError as exc:
assert any( # noqa: PT017
msg in str(exc)
for msg in (
"attempt to multiply with overflow",
"attempt to add with overflow",
)
)
reject()

expected_dict: dict[str, list[object]] = {"ts": [], "value": []}
for ts, _ in df.iter_rows():
window = df.filter(
pl.col("ts").is_between(
pl.lit(ts, dtype=pl.Datetime(time_unit)).dt.offset_by(offset),
pl.lit(ts, dtype=pl.Datetime(time_unit))
.dt.offset_by(offset)
.dt.offset_by(period),
closed=closed,
)
)
value = window["value"].to_list()
expected_dict["ts"].append(ts)
expected_dict["value"].append(value)
expected = pl.DataFrame(expected_dict).select(
pl.col("ts").cast(pl.Datetime(time_unit)),
pl.col("value").cast(pl.List(pl.Int64)),
)
assert_frame_equal(result, expected)
90 changes: 90 additions & 0 deletions py-polars/tests/unit/operations/test_rolling.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,96 @@ def test_rolling_kernels_and_groupby_rolling(
assert_frame_equal(out1, out2)


@pytest.mark.parametrize(
("offset", "closed", "expected_values"),
[
pytest.param(
"-1d",
"left",
[[1], [1, 2], [2, 3], [3, 4]],
id="partial lookbehind, left",
),
pytest.param(
"-1d",
"right",
[[1, 2], [2, 3], [3, 4], [4]],
id="partial lookbehind, right",
),
pytest.param(
"-1d",
"both",
[[1, 2], [1, 2, 3], [2, 3, 4], [3, 4]],
id="partial lookbehind, both",
),
pytest.param(
"-1d",
"none",
[[1], [2], [3], [4]],
id="partial lookbehind, none",
),
pytest.param(
"-2d",
"left",
[[], [1], [1, 2], [2, 3]],
id="full lookbehind, left",
),
pytest.param(
"-3d",
"left",
[[], [], [1], [1, 2]],
id="full lookbehind, offset > period, left",
),
pytest.param(
"-3d",
"right",
[[], [1], [1, 2], [2, 3]],
id="full lookbehind, right",
),
pytest.param(
"-3d",
"both",
[[], [1], [1, 2], [1, 2, 3]],
id="full lookbehind, both",
),
pytest.param(
"-2d",
"none",
[[], [1], [2], [3]],
id="full lookbehind, none",
),
pytest.param(
"-3d",
"none",
[[], [], [1], [2]],
id="full lookbehind, offset > period, none",
),
],
)
def test_rolling_negative_offset(
offset: str, closed: ClosedInterval, expected_values: list[list[int]]
) -> None:
df = pl.DataFrame(
{
"ts": pl.date_range(
datetime(2021, 1, 1), datetime(2021, 1, 4), "1d", eager=True
),
"value": [1, 2, 3, 4],
}
)
result = df.groupby_rolling("ts", period="2d", offset=offset, closed=closed).agg(
pl.col("value")
)
expected = pl.DataFrame(
{
"ts": pl.date_range(
datetime(2021, 1, 1), datetime(2021, 1, 4), "1d", eager=True
),
"value": expected_values,
}
)
assert_frame_equal(result, expected)


def test_rolling_skew() -> None:
s = pl.Series([1, 2, 3, 3, 2, 10, 8])
assert s.rolling_skew(window_size=4, bias=True).to_list() == pytest.approx(
Expand Down