Skip to content

Commit

Permalink
fix: ensure first datapoint is always included in group_by_dynamic (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
MarcoGorelli authored Mar 28, 2024
1 parent c7cf650 commit 85a5e38
Show file tree
Hide file tree
Showing 7 changed files with 205 additions and 52 deletions.
10 changes: 9 additions & 1 deletion crates/polars-time/src/windows/bounds.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,15 @@ impl Bounds {
pub(crate) fn is_future(&self, t: i64, closed: ClosedWindow) -> bool {
match closed {
ClosedWindow::Left | ClosedWindow::None => self.stop <= t,
ClosedWindow::Both | ClosedWindow::Right => t > self.stop,
ClosedWindow::Both | ClosedWindow::Right => self.stop < t,
}
}

#[inline]
pub(crate) fn is_past(&self, t: i64, closed: ClosedWindow) -> bool {
match closed {
ClosedWindow::Left | ClosedWindow::Both => self.start > t,
ClosedWindow::None | ClosedWindow::Right => self.start >= t,
}
}
}
3 changes: 2 additions & 1 deletion crates/polars-time/src/windows/group_by.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ pub fn group_by_windows(
window
.get_overlapping_bounds_iter(
boundary,
closed_window,
tu,
tz.parse::<Tz>().ok().as_ref(),
start_by,
Expand All @@ -198,7 +199,7 @@ pub fn group_by_windows(
_ => {
update_groups_and_bounds(
window
.get_overlapping_bounds_iter(boundary, tu, None, start_by)
.get_overlapping_bounds_iter(boundary, closed_window, tu, None, start_by)
.unwrap(),
start_offset,
time,
Expand Down
40 changes: 26 additions & 14 deletions crates/polars-time/src/windows/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,8 @@ fn test_groups_large_interval() {
false,
Default::default(),
);
assert_eq!(groups.len(), 2);
assert_eq!(groups[1], [2, 2]);
assert_eq!(groups.len(), 3);
assert_eq!(groups[1], [1, 1]);
}

#[test]
Expand All @@ -167,7 +167,9 @@ fn test_offset() {
Duration::parse("-2m"),
);

let b = w.get_earliest_bounds_ns(t, None).unwrap();
let b = w
.get_earliest_bounds_ns(t, ClosedWindow::Left, None)
.unwrap();
let start = NaiveDate::from_ymd_opt(2020, 1, 1)
.unwrap()
.and_hms_opt(23, 58, 0)
Expand Down Expand Up @@ -209,7 +211,9 @@ fn test_boundaries() {
);

// earliest bound is first datapoint: 2021-12-16 00:00:00
let b = w.get_earliest_bounds_ns(ts[0], None).unwrap();
let b = w
.get_earliest_bounds_ns(ts[0], ClosedWindow::Both, None)
.unwrap();
assert_eq!(b.start, start.and_utc().timestamp_nanos_opt().unwrap());

// test closed: "both" (includes both ends of the interval)
Expand Down Expand Up @@ -340,9 +344,10 @@ fn test_boundaries() {
false,
Default::default(),
);
assert_eq!(groups[0], [1, 2]); // 00:00:00 -> 00:30:00
assert_eq!(groups[1], [3, 2]); // 01:00:00 -> 01:30:00
assert_eq!(groups[2], [5, 2]); // 02:00:00 -> 02:30:00
assert_eq!(groups[0], [0, 1]); // (2021-12-15 23:30, 2021-12-16 00:00]
assert_eq!(groups[1], [1, 2]); // (2021-12-16 00:00, 2021-12-16 00:30]
assert_eq!(groups[2], [3, 2]); // (2021-12-16 00:30, 2021-12-16 01:00]
assert_eq!(groups[3], [5, 2]); // (2021-12-16 01:00, 2021-12-16 01:30]

// test closed: "none" (should not include left or right end of interval)
let (groups, _, _) = group_by_windows(
Expand Down Expand Up @@ -388,14 +393,18 @@ fn test_boundaries_2() {
// period 1h
// offset 30m
let offset = Duration::parse("30m");
let w = Window::new(Duration::parse("2h"), Duration::parse("1h"), offset);
let every = Duration::parse("2h");
let w = Window::new(every, Duration::parse("1h"), offset);

// earliest bound is first datapoint: 2021-12-16 00:00:00 + 30m offset: 2021-12-16 00:30:00
let b = w.get_earliest_bounds_ns(ts[0], None).unwrap();
// We then shift back by `every` (2h): 2021-12-15 22:30:00
let b = w
.get_earliest_bounds_ns(ts[0], ClosedWindow::Both, None)
.unwrap();

assert_eq!(
b.start,
start.and_utc().timestamp_nanos_opt().unwrap() + offset.duration_ns()
start.and_utc().timestamp_nanos_opt().unwrap() + offset.duration_ns() - every.duration_ns()
);

let (groups, lower, higher) = group_by_windows(
Expand Down Expand Up @@ -520,7 +529,9 @@ fn test_boundaries_ms() {
);

// earliest bound is first datapoint: 2021-12-16 00:00:00
let b = w.get_earliest_bounds_ms(ts[0], None).unwrap();
let b = w
.get_earliest_bounds_ms(ts[0], ClosedWindow::Both, None)
.unwrap();
assert_eq!(b.start, start.and_utc().timestamp_millis());

// test closed: "both" (includes both ends of the interval)
Expand Down Expand Up @@ -651,9 +662,10 @@ fn test_boundaries_ms() {
false,
Default::default(),
);
assert_eq!(groups[0], [1, 2]); // 00:00:00 -> 00:30:00
assert_eq!(groups[1], [3, 2]); // 01:00:00 -> 01:30:00
assert_eq!(groups[2], [5, 2]); // 02:00:00 -> 02:30:00
assert_eq!(groups[0], [0, 1]); // (2021-12-15 23:30, 2021-12-16 00:00]
assert_eq!(groups[1], [1, 2]); // (2021-12-16 00:00, 2021-12-16 00:30]
assert_eq!(groups[2], [3, 2]); // (2021-12-16 00:30, 2021-12-16 01:00]
assert_eq!(groups[3], [5, 2]); // (2021-12-16 01:00, 2021-12-16 01:30]

// test closed: "none" (should not include left or right end of interval)
let (groups, _, _) = group_by_windows(
Expand Down
143 changes: 115 additions & 28 deletions crates/polars-time/src/windows/window.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,37 @@ use polars_core::prelude::*;

use crate::prelude::*;

/// Ensure that earliest datapoint (`t`) is in, or in front of, first window.
///
/// For example, if we have:
///
/// - first datapoint is `2020-01-01 01:00`
/// - `every` is `'1d'`
/// - `period` is `'2d'`
/// - `offset` is `'6h'`
///
/// then truncating the earliest datapoint by `every` and adding `offset` results
/// in the window `[2020-01-01 06:00, 2020-01-03 06:00)`. To give the earliest datapoint
/// a chance of being included, we then shift the window back by `every` to
/// `[2019-12-31 06:00, 2020-01-02 06:00)`.
pub(crate) fn ensure_t_in_or_in_front_of_window(
mut every: Duration,
t: i64,
offset_fn: fn(&Duration, i64, Option<&Tz>) -> PolarsResult<i64>,
period: Duration,
mut start: i64,
closed_window: ClosedWindow,
tz: Option<&Tz>,
) -> PolarsResult<Bounds> {
every.negative = !every.negative;
let mut stop = offset_fn(&period, start, tz)?;
while Bounds::new(start, stop).is_past(t, closed_window) {
start = offset_fn(&every, start, tz)?;
stop = offset_fn(&period, start, tz)?;
}
Ok(Bounds::new_checked(start, stop))
}

/// Represents a window in time
#[derive(Copy, Clone)]
pub struct Window {
Expand Down Expand Up @@ -82,24 +113,58 @@ impl Window {
/// returns the bounds for the earliest window bounds
/// that contains the given time t. For underlapping windows that
/// do not contain time t, the window directly after time t will be returned.
pub fn get_earliest_bounds_ns(&self, t: i64, tz: Option<&Tz>) -> PolarsResult<Bounds> {
pub fn get_earliest_bounds_ns(
&self,
t: i64,
closed_window: ClosedWindow,
tz: Option<&Tz>,
) -> PolarsResult<Bounds> {
let start = self.truncate_ns(t, tz)?;
let stop = self.period.add_ns(start, tz)?;

Ok(Bounds::new_checked(start, stop))
ensure_t_in_or_in_front_of_window(
self.every,
t,
Duration::add_ns,
self.period,
start,
closed_window,
tz,
)
}

pub fn get_earliest_bounds_us(&self, t: i64, tz: Option<&Tz>) -> PolarsResult<Bounds> {
pub fn get_earliest_bounds_us(
&self,
t: i64,
closed_window: ClosedWindow,
tz: Option<&Tz>,
) -> PolarsResult<Bounds> {
let start = self.truncate_us(t, tz)?;
let stop = self.period.add_us(start, tz)?;
Ok(Bounds::new_checked(start, stop))
ensure_t_in_or_in_front_of_window(
self.every,
t,
Duration::add_us,
self.period,
start,
closed_window,
tz,
)
}

pub fn get_earliest_bounds_ms(&self, t: i64, tz: Option<&Tz>) -> PolarsResult<Bounds> {
pub fn get_earliest_bounds_ms(
&self,
t: i64,
closed_window: ClosedWindow,
tz: Option<&Tz>,
) -> PolarsResult<Bounds> {
let start = self.truncate_ms(t, tz)?;
let stop = self.period.add_ms(start, tz)?;

Ok(Bounds::new_checked(start, stop))
ensure_t_in_or_in_front_of_window(
self.every,
t,
Duration::add_ms,
self.period,
start,
closed_window,
tz,
)
}

pub(crate) fn estimate_overlapping_bounds_ns(&self, boundary: Bounds) -> usize {
Expand All @@ -120,11 +185,12 @@ impl Window {
pub fn get_overlapping_bounds_iter<'a>(
&'a self,
boundary: Bounds,
closed_window: ClosedWindow,
tu: TimeUnit,
tz: Option<&'a Tz>,
start_by: StartBy,
) -> PolarsResult<BoundsIter> {
BoundsIter::new(*self, boundary, tu, tz, start_by)
BoundsIter::new(*self, closed_window, boundary, tu, tz, start_by)
}
}

Expand All @@ -140,6 +206,7 @@ pub struct BoundsIter<'a> {
impl<'a> BoundsIter<'a> {
fn new(
window: Window,
closed_window: ClosedWindow,
boundary: Bounds,
tu: TimeUnit,
tz: Option<&'a Tz>,
Expand All @@ -157,14 +224,20 @@ impl<'a> BoundsIter<'a> {
boundary
},
StartBy::WindowBound => match tu {
TimeUnit::Nanoseconds => window.get_earliest_bounds_ns(boundary.start, tz)?,
TimeUnit::Microseconds => window.get_earliest_bounds_us(boundary.start, tz)?,
TimeUnit::Milliseconds => window.get_earliest_bounds_ms(boundary.start, tz)?,
TimeUnit::Nanoseconds => {
window.get_earliest_bounds_ns(boundary.start, closed_window, tz)?
},
TimeUnit::Microseconds => {
window.get_earliest_bounds_us(boundary.start, closed_window, tz)?
},
TimeUnit::Milliseconds => {
window.get_earliest_bounds_ms(boundary.start, closed_window, tz)?
},
},
_ => {
{
#[allow(clippy::type_complexity)]
let (from, to, offset): (
let (from, to, offset_fn): (
fn(i64) -> NaiveDateTime,
fn(NaiveDateTime) -> i64,
fn(&Duration, i64, Option<&Tz>) -> PolarsResult<i64>,
Expand All @@ -186,26 +259,33 @@ impl<'a> BoundsIter<'a> {
),
};
// find beginning of the week.
let mut boundary = boundary;
let dt = from(boundary.start);
(boundary.start, boundary.stop) = match tz {
match tz {
#[cfg(feature = "timezones")]
Some(tz) => {
let dt = tz.from_utc_datetime(&dt);
let dt = dt.beginning_of_week();
let dt = dt.naive_utc();
let start = to(dt);
// adjust start of the week based on given day of the week
let start = offset(
let start = offset_fn(
&Duration::parse(&format!("{}d", start_by.weekday().unwrap())),
start,
Some(tz),
)?;
// apply the 'offset'
let start = offset(&window.offset, start, Some(tz))?;
let start = offset_fn(&window.offset, start, Some(tz))?;
// make sure the first datapoint has a chance to be included
// and compute the end of the window defined by the 'period'
let stop = offset(&window.period, start, Some(tz))?;
(start, stop)
ensure_t_in_or_in_front_of_window(
window.every,
boundary.start,
offset_fn,
window.period,
start,
closed_window,
Some(tz),
)?
},
_ => {
let tz = chrono::Utc;
Expand All @@ -214,20 +294,27 @@ impl<'a> BoundsIter<'a> {
let dt = dt.naive_utc();
let start = to(dt);
// adjust start of the week based on given day of the week
let start = offset(
let start = offset_fn(
&Duration::parse(&format!("{}d", start_by.weekday().unwrap())),
start,
None,
)
.unwrap();
// apply the 'offset'
let start = offset(&window.offset, start, None).unwrap();
let start = offset_fn(&window.offset, start, None).unwrap();
// make sure the first datapoint has a chance to be included
// and compute the end of the window defined by the 'period'
let stop = offset(&window.period, start, None).unwrap();
(start, stop)
ensure_t_in_or_in_front_of_window(
window.every,
boundary.start,
offset_fn,
window.period,
start,
closed_window,
None,
)?
},
};
boundary
}
}
},
};
Expand Down
12 changes: 9 additions & 3 deletions py-polars/polars/dataframe/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -5554,8 +5554,8 @@ def group_by_dynamic(
- [start + 2*every, start + 2*every + period)
- ...
where `start` is determined by `start_by`, `offset`, and `every` (see parameter
descriptions below).
where `start` is determined by `start_by`, `offset`, `every`, and the earliest
datapoint. See the `start_by` argument description for details.
.. warning::
The index column must be sorted in ascending order. If `by` is passed, then
Expand All @@ -5577,7 +5577,7 @@ def group_by_dynamic(
period
length of the window, if None it will equal 'every'
offset
offset of the window, only takes effect if `start_by` is `'window'`.
offset of the window, does not take effect if `start_by` is 'datapoint'.
Defaults to negative `every`.
truncate
truncate the time value to the window lower bound
Expand Down Expand Up @@ -5613,6 +5613,9 @@ def group_by_dynamic(
* 'tuesday': Start the window on the Tuesday before the first data point.
* ...
* 'sunday': Start the window on the Sunday before the first data point.
The resulting window is then shifted back until the earliest datapoint
is in or in front of it.
check_sorted
Check whether `index_column` is sorted (or, if `group_by` is given,
check whether it's sorted within each group).
Expand Down Expand Up @@ -10694,6 +10697,9 @@ def groupby_dynamic(
* 'tuesday': Start the window on the Tuesday before the first data point.
* ...
* 'sunday': Start the window on the Sunday before the first data point.
The resulting window is then shifted back until the earliest datapoint
is in or in front of it.
check_sorted
Check whether `index_column` is sorted (or, if `by` is given,
check whether it's sorted within each group).
Expand Down
Loading

0 comments on commit 85a5e38

Please sign in to comment.